package _ss_com.streamsets.datacollector.execution.runner.common;

import _ss_com.streamsets.datacollector.config.PipelineConfiguration;
import _ss_com.streamsets.datacollector.execution.PipelineStatus;
import _ss_com.streamsets.datacollector.execution.StateListener;
import _ss_com.streamsets.datacollector.main.RuntimeModule;
import _ss_com.streamsets.datacollector.metrics.MetricsConfigurator;
import _ss_com.streamsets.datacollector.restapi.bean.IssuesJson;
import _ss_com.streamsets.datacollector.runner.Pipeline;
import _ss_com.streamsets.datacollector.runner.PipelineRuntimeException;
import _ss_com.streamsets.datacollector.util.Configuration;
import _ss_com.streamsets.datacollector.util.ContainerError;
import _ss_com.streamsets.datacollector.util.PipelineException;
import _ss_com.streamsets.datacollector.validation.Issue;
import _ss_com.streamsets.datacollector.validation.Issues;
import com.streamsets.pipeline.api.ExecutionMode;
import com.streamsets.pipeline.api.Record;
import com.streamsets.pipeline.api.StageException;
import com.streamsets.pipeline.api.impl.ErrorMessage;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:_ss_com/streamsets/datacollector/execution/runner/common/ProductionPipeline.class */
public class ProductionPipeline {
    private static final Logger LOG = LoggerFactory.getLogger(ProductionPipeline.class);
    private final PipelineConfiguration pipelineConf;
    private final Pipeline pipeline;
    private final ProductionPipelineRunner pipelineRunner;
    private StateListener stateListener;
    private final String name;
    private final String rev;
    private final boolean isExecutingInSlave;
    private final boolean shouldRetry;
    private boolean executionFailed;

    public ProductionPipeline(String str, String str2, PipelineConfiguration pipelineConfiguration, Configuration configuration, Pipeline pipeline, boolean z) {
        this.name = str;
        this.rev = str2;
        this.pipelineConf = pipelineConfiguration;
        this.pipeline = pipeline;
        this.pipelineRunner = (ProductionPipelineRunner) pipeline.getRunner();
        this.isExecutingInSlave = ExecutionMode.valueOf(configuration.get(RuntimeModule.PIPELINE_EXECUTION_MODE_KEY, ExecutionMode.STANDALONE.name())) == ExecutionMode.SLAVE;
        this.shouldRetry = z;
    }

    public StateListener getStatusListener() {
        return this.stateListener;
    }

    public void registerStatusListener(StateListener stateListener) {
        this.stateListener = stateListener;
    }

    private void stateChanged(PipelineStatus pipelineStatus, String str, Map<String, Object> map) throws PipelineRuntimeException {
        this.stateListener.stateChanged(pipelineStatus, str, map);
    }

    public void run() throws StageException, PipelineRuntimeException {
        boolean z = false;
        this.executionFailed = false;
        try {
            try {
                LOG.debug("Initializing");
                try {
                    List<Issue> init = getPipeline().init(true);
                    if (!init.isEmpty()) {
                        LOG.debug("Stopped due to validation error");
                        PipelineRuntimeException pipelineRuntimeException = new PipelineRuntimeException(ContainerError.CONTAINER_0800, this.name, init.get(0).getMessage());
                        HashMap hashMap = new HashMap();
                        hashMap.put("issues", new IssuesJson(new Issues(init)));
                        stateChanged(PipelineStatus.STARTING_ERROR, init.get(0).getMessage(), hashMap);
                        getPipeline().errorNotification(pipelineRuntimeException);
                        throw pipelineRuntimeException;
                    }
                    try {
                        stateChanged(PipelineStatus.RUNNING, null, null);
                        LOG.debug("Running");
                        this.pipeline.run();
                        if (!wasStopped()) {
                            LOG.debug("Finishing");
                            stateChanged(PipelineStatus.FINISHING, null, null);
                            z = true;
                        }
                        try {
                            LOG.debug("Destroying");
                            try {
                                this.pipeline.destroy(true, 0 != 0 ? PipelineStopReason.FAILURE : wasStopped() ? PipelineStopReason.USER_ACTION : PipelineStopReason.FINISHED);
                                if (0 != 0 || 0 != 0 || 0 != 0) {
                                    this.executionFailed = true;
                                    if (this.shouldRetry && !this.pipeline.shouldStopOnStageError() && !this.isExecutingInSlave && 1 != 0 && !wasStopped()) {
                                        stateChanged(PipelineStatus.RETRY, null, null);
                                    } else if (0 != 0) {
                                        stateChanged(PipelineStatus.START_ERROR, null, null);
                                    } else if (0 != 0) {
                                        stateChanged(PipelineStatus.RUN_ERROR, null, null);
                                    } else if (0 != 0) {
                                        stateChanged(PipelineStatus.STOP_ERROR, null, null);
                                    }
                                } else if (z) {
                                    LOG.debug("Finished");
                                    stateChanged(PipelineStatus.FINISHED, null, null);
                                }
                                if (this.isExecutingInSlave) {
                                    LOG.debug("Calling cluster source post destroy");
                                    this.pipeline.getSource().postDestroy();
                                }
                            } finally {
                            }
                        } catch (Throwable th) {
                            if (0 != 0 || 0 != 0 || 0 != 0) {
                                this.executionFailed = true;
                                if (this.shouldRetry && !this.pipeline.shouldStopOnStageError() && !this.isExecutingInSlave && 1 != 0 && !wasStopped()) {
                                    stateChanged(PipelineStatus.RETRY, null, null);
                                } else if (0 != 0) {
                                    stateChanged(PipelineStatus.START_ERROR, null, null);
                                } else if (0 != 0) {
                                    stateChanged(PipelineStatus.RUN_ERROR, null, null);
                                } else if (0 != 0) {
                                    stateChanged(PipelineStatus.STOP_ERROR, null, null);
                                }
                            } else if (z) {
                                LOG.debug("Finished");
                                stateChanged(PipelineStatus.FINISHED, null, null);
                            }
                            if (this.isExecutingInSlave) {
                                LOG.debug("Calling cluster source post destroy");
                                this.pipeline.getSource().postDestroy();
                            }
                            throw th;
                        }
                    } catch (Throwable th2) {
                        if (!wasStopped()) {
                            String th3 = th2.toString();
                            LOG.warn("Error while running: {}", th3, th2);
                            stateChanged(PipelineStatus.RUNNING_ERROR, th3, null);
                            isRecoverableThrowable(th2);
                        }
                        throw th2;
                    }
                } catch (Throwable th4) {
                    if (!wasStopped()) {
                        th4.toString();
                        LOG.warn("Error while starting: {}", th4.toString(), th4);
                        stateChanged(PipelineStatus.STARTING_ERROR, th4.toString(), null);
                    }
                    throw new PipelineRuntimeException(ContainerError.CONTAINER_0702, th4.toString(), th4);
                }
            } catch (Throwable th5) {
                try {
                    LOG.debug("Destroying");
                    try {
                        this.pipeline.destroy(true, 0 != 0 ? PipelineStopReason.FAILURE : wasStopped() ? PipelineStopReason.USER_ACTION : PipelineStopReason.FINISHED);
                        if (0 != 0 || 0 != 0 || 0 != 0) {
                            this.executionFailed = true;
                            if (this.shouldRetry && !this.pipeline.shouldStopOnStageError() && !this.isExecutingInSlave && 1 != 0 && !wasStopped()) {
                                stateChanged(PipelineStatus.RETRY, null, null);
                            } else if (0 != 0) {
                                stateChanged(PipelineStatus.START_ERROR, null, null);
                            } else if (0 != 0) {
                                stateChanged(PipelineStatus.RUN_ERROR, null, null);
                            } else if (0 != 0) {
                                stateChanged(PipelineStatus.STOP_ERROR, null, null);
                            }
                        } else if (0 != 0) {
                            LOG.debug("Finished");
                            stateChanged(PipelineStatus.FINISHED, null, null);
                        }
                        if (this.isExecutingInSlave) {
                            LOG.debug("Calling cluster source post destroy");
                            this.pipeline.getSource().postDestroy();
                        }
                        throw th5;
                    } finally {
                    }
                } catch (Throwable th6) {
                    if (0 != 0 || 0 != 0 || 0 != 0) {
                        this.executionFailed = true;
                        if (this.shouldRetry && !this.pipeline.shouldStopOnStageError() && !this.isExecutingInSlave && 1 != 0 && !wasStopped()) {
                            stateChanged(PipelineStatus.RETRY, null, null);
                        } else if (0 != 0) {
                            stateChanged(PipelineStatus.START_ERROR, null, null);
                        } else if (0 != 0) {
                            stateChanged(PipelineStatus.RUN_ERROR, null, null);
                        } else if (0 != 0) {
                            stateChanged(PipelineStatus.STOP_ERROR, null, null);
                        }
                    } else if (0 != 0) {
                        LOG.debug("Finished");
                        stateChanged(PipelineStatus.FINISHED, null, null);
                    }
                    if (this.isExecutingInSlave) {
                        LOG.debug("Calling cluster source post destroy");
                        this.pipeline.getSource().postDestroy();
                    }
                    throw th6;
                }
            }
        } finally {
            MetricsConfigurator.cleanUpJmxMetrics(this.name, this.rev);
        }
    }

    private boolean isRecoverableThrowable(Throwable th) {
        return !(th instanceof Error);
    }

    public PipelineConfiguration getPipelineConf() {
        return this.pipelineConf;
    }

    public Pipeline getPipeline() {
        return this.pipeline;
    }

    public void stop() throws PipelineException {
        this.pipelineRunner.stop();
        this.pipeline.stop();
    }

    public boolean wasStopped() {
        return this.pipelineRunner.wasStopped();
    }

    public boolean isExecutionFailed() {
        return this.executionFailed;
    }

    public Map<String, String> getCommittedOffsets() {
        return this.pipelineRunner.getCommittedOffsets();
    }

    public void captureSnapshot(String str, int i, int i2) {
        this.pipelineRunner.capture(str, i, i2);
    }

    public void cancelSnapshot(String str) throws PipelineException {
        this.pipelineRunner.cancelSnapshot(str);
    }

    public List<Record> getErrorRecords(String str, int i) {
        return this.pipelineRunner.getErrorRecords(str, i);
    }

    public List<ErrorMessage> getErrorMessages(String str, int i) {
        return this.pipelineRunner.getErrorMessages(str, i);
    }

    public long getLastBatchTime() {
        return this.pipelineRunner.getOffSetTracker().getLastBatchTime();
    }

    public void setThreadHealthReporter(ThreadHealthReporter threadHealthReporter) {
        this.pipelineRunner.setThreadHealthReporter(threadHealthReporter);
    }
}
