/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza;

import javax.annotation.Nullable;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.samza.SamzaExecutionContext;
import org.apache.beam.runners.samza.SamzaPipelineLifeCycleListener;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.runtime.ApplicationRunner;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SamzaPipelineResult
implements PipelineResult {
    private static final Logger LOG = LoggerFactory.getLogger(SamzaPipelineResult.class);
    private static final long DEFAULT_SHUTDOWN_MS = 5000L;
    private static final long SHUTDOWN_TIMEOUT_BUFFER = 5000L;
    private final SamzaExecutionContext executionContext;
    private final ApplicationRunner runner;
    private final StreamApplication app;
    private final SamzaPipelineLifeCycleListener listener;
    private final long shutdownTiemoutMs;

    public SamzaPipelineResult(StreamApplication app, ApplicationRunner runner, SamzaExecutionContext executionContext, SamzaPipelineLifeCycleListener listener, Config config) {
        this.executionContext = executionContext;
        this.runner = runner;
        this.app = app;
        this.listener = listener;
        this.shutdownTiemoutMs = config.getLong(TaskConfig.SHUTDOWN_MS(), 5000L) + 5000L;
    }

    public PipelineResult.State getState() {
        return this.getStateInfo().state;
    }

    public PipelineResult.State cancel() {
        LOG.info("Start to cancel samza pipeline...");
        this.runner.kill();
        LOG.info("Start awaiting finish for {} ms.", (Object)this.shutdownTiemoutMs);
        return this.waitUntilFinish(Duration.millis((long)this.shutdownTiemoutMs));
    }

    public PipelineResult.State waitUntilFinish(@Nullable Duration duration) {
        try {
            if (duration == null) {
                this.runner.waitForFinish();
            } else {
                this.runner.waitForFinish(java.time.Duration.ofMillis(duration.getMillis()));
            }
        }
        catch (Exception e) {
            throw new Pipeline.PipelineExecutionException((Throwable)e);
        }
        StateInfo stateInfo = this.getStateInfo();
        if (this.listener != null && (stateInfo.state == PipelineResult.State.DONE || stateInfo.state == PipelineResult.State.FAILED)) {
            this.listener.onFinish();
        }
        if (stateInfo.state == PipelineResult.State.FAILED) {
            throw stateInfo.error;
        }
        LOG.info("Pipeline finished. Final state: {}", (Object)stateInfo.state);
        return stateInfo.state;
    }

    public PipelineResult.State waitUntilFinish() {
        return this.waitUntilFinish(null);
    }

    public MetricResults metrics() {
        return MetricsContainerStepMap.asAttemptedOnlyMetricResults((MetricsContainerStepMap)this.executionContext.getMetricsContainer().getContainers());
    }

    private StateInfo getStateInfo() {
        ApplicationStatus status = this.runner.status();
        switch (status.getStatusCode()) {
            case New: {
                return new StateInfo(PipelineResult.State.STOPPED);
            }
            case Running: {
                return new StateInfo(PipelineResult.State.RUNNING);
            }
            case SuccessfulFinish: {
                return new StateInfo(PipelineResult.State.DONE);
            }
            case UnsuccessfulFinish: {
                LOG.error(status.getThrowable().getMessage(), status.getThrowable());
                return new StateInfo(PipelineResult.State.FAILED, new Pipeline.PipelineExecutionException(SamzaPipelineResult.getUserCodeException(status.getThrowable())));
            }
        }
        return new StateInfo(PipelineResult.State.UNKNOWN);
    }

    private static Throwable getUserCodeException(Throwable throwable) {
        for (Throwable t = throwable; t != null; t = t.getCause()) {
            if (!(t instanceof UserCodeException)) continue;
            return t;
        }
        return throwable;
    }

    private static class StateInfo {
        private final PipelineResult.State state;
        private final Pipeline.PipelineExecutionException error;

        private StateInfo(PipelineResult.State state) {
            this(state, (Pipeline.PipelineExecutionException)null);
        }

        private StateInfo(PipelineResult.State state, Pipeline.PipelineExecutionException error) {
            this.state = state;
            this.error = error;
        }
    }
}

