/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza;

import java.util.function.Consumer;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.SamzaPipelineResult;
import org.apache.beam.runners.samza.SamzaRunner;
import org.apache.beam.runners.samza.util.PortablePipelineDotRenderer;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SamzaJobInvocation
implements JobInvocation {
    private static final Logger LOG = LoggerFactory.getLogger(SamzaJobInvocation.class);
    private static final IdGenerator idGenerator = IdGenerators.incrementingLongs();
    private final SamzaPipelineOptions options;
    private final RunnerApi.Pipeline originalPipeline;
    private volatile SamzaPipelineResult pipelineResult;
    private final String id;

    public SamzaJobInvocation(RunnerApi.Pipeline pipeline, SamzaPipelineOptions options, String id) {
        this.originalPipeline = pipeline;
        this.options = options;
        this.id = id;
    }

    private SamzaPipelineResult invokeSamzaJob() {
        RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse((RunnerApi.Pipeline)this.originalPipeline).toPipeline();
        LOG.info("Portable pipeline to run:");
        LOG.info(PortablePipelineDotRenderer.toDotString(fusedPipeline));
        this.options.setRunner(SamzaRunner.class);
        try {
            SamzaRunner runner = new SamzaRunner(this.options);
            return runner.runPortablePipeline(fusedPipeline);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to invoke samza job", e);
        }
    }

    public void start() {
        LOG.info("Starting job invocation {}", (Object)this.getId());
        this.pipelineResult = this.invokeSamzaJob();
    }

    public String getId() {
        return this.id;
    }

    public void cancel() {
        try {
            if (this.pipelineResult != null) {
                this.pipelineResult.cancel();
            }
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to cancel job.", e);
        }
    }

    public JobApi.JobState.Enum getState() {
        if (this.pipelineResult == null) {
            return JobApi.JobState.Enum.STARTING;
        }
        switch (this.pipelineResult.getState()) {
            case RUNNING: {
                return JobApi.JobState.Enum.RUNNING;
            }
            case FAILED: {
                return JobApi.JobState.Enum.FAILED;
            }
            case DONE: {
                return JobApi.JobState.Enum.DONE;
            }
            case STOPPED: {
                return JobApi.JobState.Enum.STOPPED;
            }
            case UPDATED: {
                return JobApi.JobState.Enum.UPDATED;
            }
            case CANCELLED: {
                return JobApi.JobState.Enum.CANCELLED;
            }
        }
        return JobApi.JobState.Enum.UNRECOGNIZED;
    }

    public void addStateListener(Consumer<JobApi.JobState.Enum> stateStreamObserver) {
        LOG.info("state listener not yet implemented. Directly use getState() instead");
    }

    public synchronized void addMessageListener(Consumer<JobApi.JobMessage> messageStreamObserver) {
        LOG.info("message listener not yet implemented.");
    }
}

