package _ss_com.streamsets.datacollector.execution.runner.standalone;

import _ss_com.com.google.common.collect.ImmutableList;
import _ss_com.com.google.common.collect.ImmutableMap;
import _ss_com.com.google.common.collect.ImmutableSet;
import _ss_com.fasterxml.jackson.core.JsonProcessingException;
import _ss_com.fasterxml.jackson.databind.ObjectMapper;
import _ss_com.streamsets.datacollector.alerts.AlertsUtil;
import _ss_com.streamsets.datacollector.callback.CallbackInfo;
import _ss_com.streamsets.datacollector.callback.CallbackObjectType;
import _ss_com.streamsets.datacollector.config.DataRuleDefinition;
import _ss_com.streamsets.datacollector.config.MetricsRuleDefinition;
import _ss_com.streamsets.datacollector.config.PipelineConfiguration;
import _ss_com.streamsets.datacollector.config.RuleDefinitions;
import _ss_com.streamsets.datacollector.config.StageConfiguration;
import _ss_com.streamsets.datacollector.creation.PipelineBeanCreator;
import _ss_com.streamsets.datacollector.creation.PipelineConfigBean;
import _ss_com.streamsets.datacollector.el.JobEL;
import _ss_com.streamsets.datacollector.el.PipelineEL;
import _ss_com.streamsets.datacollector.execution.AbstractRunner;
import _ss_com.streamsets.datacollector.execution.PipelineState;
import _ss_com.streamsets.datacollector.execution.PipelineStatus;
import _ss_com.streamsets.datacollector.execution.Runner;
import _ss_com.streamsets.datacollector.execution.Snapshot;
import _ss_com.streamsets.datacollector.execution.SnapshotInfo;
import _ss_com.streamsets.datacollector.execution.SnapshotStore;
import _ss_com.streamsets.datacollector.execution.StateListener;
import _ss_com.streamsets.datacollector.execution.alerts.AlertInfo;
import _ss_com.streamsets.datacollector.execution.metrics.MetricsEventRunnable;
import _ss_com.streamsets.datacollector.execution.runner.RetryUtils;
import _ss_com.streamsets.datacollector.execution.runner.common.Constants;
import _ss_com.streamsets.datacollector.execution.runner.common.DataObserverRunnable;
import _ss_com.streamsets.datacollector.execution.runner.common.MetricObserverRunnable;
import _ss_com.streamsets.datacollector.execution.runner.common.PipelineRunnerException;
import _ss_com.streamsets.datacollector.execution.runner.common.ProduceEmptyBatchesForIdleRunnersRunnable;
import _ss_com.streamsets.datacollector.execution.runner.common.ProductionObserver;
import _ss_com.streamsets.datacollector.execution.runner.common.ProductionPipeline;
import _ss_com.streamsets.datacollector.execution.runner.common.ProductionPipelineBuilder;
import _ss_com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunnable;
import _ss_com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner;
import _ss_com.streamsets.datacollector.execution.runner.common.RulesConfigLoader;
import _ss_com.streamsets.datacollector.execution.runner.common.SampledRecord;
import _ss_com.streamsets.datacollector.execution.runner.common.ThreadHealthReporter;
import _ss_com.streamsets.datacollector.execution.runner.common.dagger.PipelineProviderModule;
import _ss_com.streamsets.datacollector.json.ObjectMapperFactory;
import _ss_com.streamsets.datacollector.metrics.MetricsConfigurator;
import _ss_com.streamsets.datacollector.restapi.bean.MetricRegistryJson;
import _ss_com.streamsets.datacollector.runner.Observer;
import _ss_com.streamsets.datacollector.runner.Pipeline;
import _ss_com.streamsets.datacollector.runner.PipelineRunner;
import _ss_com.streamsets.datacollector.runner.PipelineRuntimeException;
import _ss_com.streamsets.datacollector.runner.UserContext;
import _ss_com.streamsets.datacollector.runner.production.OffsetFileUtil;
import _ss_com.streamsets.datacollector.runner.production.ProductionSourceOffsetTracker;
import _ss_com.streamsets.datacollector.runner.production.RulesConfigLoaderRunnable;
import _ss_com.streamsets.datacollector.runner.production.SourceOffset;
import _ss_com.streamsets.datacollector.store.PipelineStoreException;
import _ss_com.streamsets.datacollector.updatechecker.UpdateChecker;
import _ss_com.streamsets.datacollector.util.ContainerError;
import _ss_com.streamsets.datacollector.util.LogUtil;
import _ss_com.streamsets.datacollector.util.PipelineException;
import _ss_com.streamsets.dc.execution.manager.standalone.ResourceManager;
import _ss_com.streamsets.dc.execution.manager.standalone.ThreadUsage;
import _ss_com.streamsets.lib.security.http.RemoteSSOService;
import _ss_com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService;
import _ss_com.streamsets.pipeline.lib.log.LogConstants;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.streamsets.pipeline.api.ErrorListener;
import com.streamsets.pipeline.api.ExecutionMode;
import com.streamsets.pipeline.api.Record;
import com.streamsets.pipeline.api.StageException;
import com.streamsets.pipeline.api.impl.ErrorMessage;
import com.streamsets.pipeline.api.impl.Utils;
import dagger.ObjectGraph;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:_ss_com/streamsets/datacollector/execution/runner/standalone/StandaloneRunner.class */
public class StandaloneRunner extends AbstractRunner implements StateListener {
    public static final String STATS_NULL_TARGET = "com_streamsets_pipeline_stage_destination_devnull_StatsNullDTarget";
    public static final String STATS_DPM_DIRECTLY_TARGET = "com_streamsets_pipeline_stage_destination_devnull_StatsDpmDirectlyDTarget";

    @Inject
    SnapshotStore snapshotStore;

    @Inject
    @Named("runnerExecutor")
    SafeScheduledExecutorService runnerExecutor;

    @Inject
    ResourceManager resourceManager;
    private final ObjectGraph objectGraph;
    private String pipelineTitle;
    private String token;
    private ThreadHealthReporter threadHealthReporter;
    private DataObserverRunnable observerRunnable;
    private ProductionPipeline prodPipeline;
    private MetricsEventRunnable metricsEventRunnable;
    private int maxRetries;
    private ScheduledFuture<Void> retryFuture;
    private ProductionPipelineRunnable pipelineRunnable;
    private boolean isRetrying;
    private volatile boolean isClosed;
    private UpdateChecker updateChecker;
    private volatile String metricsForRetry;
    private final List<ErrorListener> errorListeners;
    private static final Logger LOG = LoggerFactory.getLogger(StandaloneRunner.class);
    private static final ImmutableList<PipelineStatus> RESET_OFFSET_DISALLOWED_STATUSES = ImmutableList.of(PipelineStatus.CONNECTING, PipelineStatus.DISCONNECTING, PipelineStatus.FINISHING, PipelineStatus.RETRY, PipelineStatus.RUNNING, PipelineStatus.STARTING, PipelineStatus.STOPPING);
    private static final ImmutableSet<PipelineStatus> FORCE_QUIT_ALLOWED_STATES = ImmutableSet.of(PipelineStatus.STOPPING, PipelineStatus.STOPPING_ERROR, PipelineStatus.STARTING_ERROR, PipelineStatus.RUNNING_ERROR, PipelineStatus.FINISHING);
    private static final Map<PipelineStatus, Set<PipelineStatus>> VALID_TRANSITIONS = new ImmutableMap.Builder().put(PipelineStatus.EDITED, ImmutableSet.of(PipelineStatus.STARTING)).put(PipelineStatus.STARTING, ImmutableSet.of(PipelineStatus.START_ERROR, PipelineStatus.STARTING_ERROR, PipelineStatus.RUNNING, PipelineStatus.DISCONNECTING, PipelineStatus.STOPPING)).put(PipelineStatus.STARTING_ERROR, ImmutableSet.of(PipelineStatus.START_ERROR, PipelineStatus.STOPPING_ERROR, PipelineStatus.RETRY)).put(PipelineStatus.START_ERROR, ImmutableSet.of(PipelineStatus.STARTING)).put(PipelineStatus.RUNNING, ImmutableSet.of(PipelineStatus.RUNNING_ERROR, PipelineStatus.FINISHING, PipelineStatus.STOPPING, PipelineStatus.DISCONNECTING)).put(PipelineStatus.RUNNING_ERROR, ImmutableSet.of(PipelineStatus.RETRY, PipelineStatus.STOPPING_ERROR, PipelineStatus.RUN_ERROR)).put(PipelineStatus.RETRY, ImmutableSet.of(PipelineStatus.STARTING, PipelineStatus.STOPPING, PipelineStatus.DISCONNECTING)).put(PipelineStatus.RUN_ERROR, ImmutableSet.of(PipelineStatus.STARTING)).put(PipelineStatus.STOPPING_ERROR, ImmutableSet.of(PipelineStatus.START_ERROR, PipelineStatus.RUN_ERROR, PipelineStatus.STOP_ERROR, PipelineStatus.RETRY)).put(PipelineStatus.STOP_ERROR, ImmutableSet.of(PipelineStatus.STARTING)).put(PipelineStatus.FINISHING, ImmutableSet.of(PipelineStatus.FINISHED, PipelineStatus.STOPPING_ERROR)).put(PipelineStatus.STOPPING, ImmutableSet.of(PipelineStatus.STOPPED, PipelineStatus.STOPPING_ERROR)).put(PipelineStatus.FINISHED, ImmutableSet.of(PipelineStatus.STARTING)).put(PipelineStatus.STOPPED, ImmutableSet.of(PipelineStatus.STARTING)).put(PipelineStatus.DISCONNECTING, ImmutableSet.of(PipelineStatus.DISCONNECTED)).put(PipelineStatus.DISCONNECTED, ImmutableSet.of(PipelineStatus.CONNECTING, PipelineStatus.STARTING, PipelineStatus.RETRY)).put(PipelineStatus.CONNECTING, ImmutableSet.of(PipelineStatus.STARTING, PipelineStatus.DISCONNECTING, PipelineStatus.RETRY)).build();

    public StandaloneRunner(String str, String str2, ObjectGraph objectGraph) {
        super(str, str2);
        this.pipelineTitle = null;
        this.objectGraph = objectGraph;
        this.errorListeners = new ArrayList();
        objectGraph.inject(this);
    }

    public void addErrorListener(ErrorListener errorListener) {
        this.errorListeners.add(errorListener);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void prepareForDataCollectorStart(String str) throws PipelineException {
        PipelineStatus status = getState().getStatus();
        try {
            MDC.put(LogConstants.USER, str);
            LogUtil.injectPipelineInMDC(getPipelineTitle(), getName());
            LOG.info("Pipeline " + getName() + " with rev " + getRev() + " is in state: " + status);
            String str2 = null;
            ArrayList arrayList = new ArrayList();
            switch (status) {
                case STARTING:
                    str2 = "Pipeline was in STARTING state, forcing it to DISCONNECTING";
                    arrayList.add(PipelineStatus.DISCONNECTING);
                    arrayList.add(PipelineStatus.DISCONNECTED);
                    break;
                case RETRY:
                    str2 = "Pipeline was in RETRY state, forcing it to DISCONNECTING";
                    arrayList.add(PipelineStatus.DISCONNECTING);
                    arrayList.add(PipelineStatus.DISCONNECTED);
                    break;
                case CONNECTING:
                    str2 = "Pipeline was in CONNECTING state, forcing it to DISCONNECTING";
                    arrayList.add(PipelineStatus.DISCONNECTING);
                    arrayList.add(PipelineStatus.DISCONNECTED);
                    break;
                case RUNNING:
                    str2 = "Pipeline was in RUNNING state, forcing it to DISCONNECTING";
                    arrayList.add(PipelineStatus.DISCONNECTING);
                    arrayList.add(PipelineStatus.DISCONNECTED);
                    break;
                case DISCONNECTING:
                    str2 = "Pipeline was in DISCONNECTING state, forcing it to DISCONNECTED";
                    arrayList.add(PipelineStatus.DISCONNECTED);
                    break;
                case STARTING_ERROR:
                    str2 = "Pipeline was in STARTING_ERROR state, forcing it to START_ERROR";
                    arrayList.add(PipelineStatus.START_ERROR);
                    break;
                case STOPPING_ERROR:
                    str2 = "Pipeline was in STOPPING_ERROR state, forcing it to STOP_ERROR";
                    arrayList.add(PipelineStatus.STOP_ERROR);
                    break;
                case RUNNING_ERROR:
                    str2 = "Pipeline was in RUNNING_ERROR state, forcing it to terminal state of RUN_ERROR";
                    arrayList.add(PipelineStatus.RUN_ERROR);
                    break;
                case STOPPING:
                    str2 = "Pipeline was in STOPPING state, forcing it to terminal state of STOPPED";
                    arrayList.add(PipelineStatus.STOPPED);
                    break;
                case FINISHING:
                    str2 = "Pipeline was in FINISHING state, forcing it to terminal state of FINISHED";
                    arrayList.add(PipelineStatus.FINISHED);
                    break;
                case DISCONNECTED:
                case RUN_ERROR:
                case EDITED:
                case FINISHED:
                case KILLED:
                case START_ERROR:
                case STOPPED:
                case STOP_ERROR:
                    break;
                default:
                    throw new IllegalStateException(Utils.format("Pipeline in undefined state: '{}'", new Object[]{status}));
            }
            if (str2 != null) {
                LOG.debug(str2);
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                validateAndSetStateTransition(str, (PipelineStatus) it.next(), str2, null);
            }
        } finally {
            MDC.clear();
        }
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void onDataCollectorStart(String str) throws PipelineException, StageException {
        try {
            MDC.put(LogConstants.USER, str);
            LogUtil.injectPipelineInMDC(getPipelineTitle(), getName());
            PipelineState state = getState();
            PipelineStatus status = state.getStatus();
            state.getAttributes();
            LOG.info("Pipeline '{}::{}' has status: '{}'", new Object[]{getName(), getRev(), status});
            for (SnapshotInfo snapshotInfo : getSnapshotsInfo()) {
                if (snapshotInfo.isInProgress()) {
                    this.snapshotStore.deleteSnapshot(snapshotInfo.getName(), snapshotInfo.getRev(), snapshotInfo.getId());
                }
            }
            switch (status) {
                case DISCONNECTED:
                    LOG.debug("Pipeline was in DISCONNECTED state, changing it to CONNECTING");
                    loadStartPipelineContextFromState(str);
                    retryOrStart(getStartPipelineContext());
                    break;
                default:
                    LOG.error(Utils.format("Pipeline cannot start with status: '{}'", new Object[]{status}));
                    break;
            }
        } finally {
            MDC.clear();
        }
    }

    private void retryOrStart(Runner.StartPipelineContext startPipelineContext) throws PipelineException, StageException {
        PipelineState state = getState();
        if (state.getRetryAttempt() == 0 || state.getStatus() == PipelineStatus.DISCONNECTED) {
            prepareForStart(startPipelineContext);
            start(startPipelineContext);
        } else {
            validateAndSetStateTransition(startPipelineContext.getUser(), PipelineStatus.RETRY, "Changing the state to RETRY on startup", null);
            this.isRetrying = true;
            this.metricsForRetry = getState().getMetrics();
        }
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void onDataCollectorStop(String str) throws PipelineException {
        try {
            MDC.put(LogConstants.USER, str);
            LogUtil.injectPipelineInMDC(getPipelineTitle(), getName());
            if (getState().getStatus() == PipelineStatus.RETRY) {
                LOG.info("Pipeline '{}'::'{}' is in retry", getName(), getRev());
                this.retryFuture.cancel(true);
                validateAndSetStateTransition(str, PipelineStatus.DISCONNECTING, null, null);
                validateAndSetStateTransition(str, PipelineStatus.DISCONNECTED, "Disconnected as SDC is shutting down", null);
                return;
            }
            if (!getState().getStatus().isActive() || getState().getStatus() == PipelineStatus.DISCONNECTED) {
                LOG.info("Pipeline '{}'::'{}' is no longer active", getName(), getRev());
                return;
            }
            LOG.info("Stopping pipeline {}::{}", getName(), getRev());
            try {
                try {
                    validateAndSetStateTransition(str, PipelineStatus.DISCONNECTING, "Stopping the pipeline as SDC is shutting down", null);
                } catch (PipelineRunnerException e) {
                    LOG.warn("Cannot transition to PipelineStatus.DISCONNECTING: {}", e.toString(), e);
                }
                stopPipeline(true);
            } catch (Exception e2) {
                LOG.warn("Error while stopping the pipeline: {} ", e2.toString(), e2);
            }
        } finally {
            MDC.clear();
        }
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public String getPipelineTitle() throws PipelineException {
        if (this.pipelineTitle == null) {
            this.pipelineTitle = getPipelineStore().getInfo(getName()).getTitle();
        }
        return this.pipelineTitle;
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public synchronized void resetOffset(String str) throws PipelineStoreException, PipelineRunnerException {
        PipelineStatus status = getState().getStatus();
        LOG.debug("Resetting offset for pipeline {}, {}", getName(), getRev());
        if (RESET_OFFSET_DISALLOWED_STATUSES.contains(status)) {
            throw new PipelineRunnerException(ContainerError.CONTAINER_0104, getName());
        }
        new ProductionSourceOffsetTracker(getName(), getRev(), getRuntimeInfo()).resetOffset(getName(), getRev());
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public SourceOffset getCommittedOffsets() throws PipelineException {
        return OffsetFileUtil.getOffset(getRuntimeInfo(), getName(), getRev());
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void updateCommittedOffsets(SourceOffset sourceOffset) throws PipelineException {
        if (getState().getStatus().isActive()) {
            throw new PipelineRunnerException(ContainerError.CONTAINER_0118, getName());
        }
        OffsetFileUtil.saveSourceOffset(getRuntimeInfo(), getName(), getRev(), sourceOffset);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void stop(String str) throws PipelineException {
        stopPipeline(false);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void forceQuit(String str) throws PipelineException {
        if (this.pipelineRunnable == null || !FORCE_QUIT_ALLOWED_STATES.contains(getState().getStatus())) {
            LOG.info("Ignoring force quit request because pipeline is in {} state", getState().getStatus());
        } else {
            LOG.debug("Force Quit the pipeline '{}'::'{}'", getName(), getRev());
            this.pipelineRunnable.forceQuit();
        }
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public Object getMetrics() throws PipelineStoreException {
        if (this.prodPipeline == null) {
            return null;
        }
        MetricRegistry metrics = this.prodPipeline.getPipeline().getRunner().getMetrics();
        return (metrics == null && getState().getStatus().isActive()) ? this.metricsForRetry : metrics;
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public String captureSnapshot(String str, String str2, String str3, int i, int i2) throws PipelineException {
        if (i2 <= 0) {
            throw new PipelineRunnerException(ContainerError.CONTAINER_0107, Integer.valueOf(i2));
        }
        return captureSnapshot(str, str2, str3, i, i2, true);
    }

    public String captureSnapshot(String str, String str2, String str3, int i, int i2, boolean z) throws PipelineException {
        int i3 = getConfiguration().get(Constants.SNAPSHOT_MAX_BATCH_SIZE_KEY, 10);
        if (i2 > i3) {
            i2 = i3;
        }
        LOG.debug("Capturing snapshot with batch size {}", Integer.valueOf(i2));
        if (z) {
            checkState(getState().getStatus().equals(PipelineStatus.RUNNING), ContainerError.CONTAINER_0105, new Object[0]);
        }
        SnapshotInfo create = this.snapshotStore.create(str, getName(), getRev(), str2, str3, false);
        this.prodPipeline.captureSnapshot(str2, i2, i);
        return create.getId();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public String updateSnapshotLabel(String str, String str2) throws PipelineException {
        SnapshotInfo updateLabel = this.snapshotStore.updateLabel(getName(), getRev(), str, str2);
        if (updateLabel != null) {
            return updateLabel.getId();
        }
        return null;
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public Snapshot getSnapshot(String str) throws PipelineException {
        return this.snapshotStore.get(getName(), getRev(), str);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public List<SnapshotInfo> getSnapshotsInfo() throws PipelineException {
        return this.snapshotStore.getSummaryForPipeline(getName(), getRev());
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void deleteSnapshot(String str) throws PipelineException {
        Snapshot snapshot = getSnapshot(str);
        if (snapshot != null && snapshot.getInfo() != null && snapshot.getInfo().isInProgress()) {
            this.prodPipeline.cancelSnapshot(snapshot.getInfo().getId());
        }
        this.snapshotStore.deleteSnapshot(getName(), getRev(), str);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public List<Record> getErrorRecords(String str, int i) throws PipelineRunnerException, PipelineStoreException {
        checkState(getState().getStatus().isActive(), ContainerError.CONTAINER_0106, new Object[0]);
        return this.prodPipeline.getErrorRecords(str, i);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public List<ErrorMessage> getErrorMessages(String str, int i) throws PipelineRunnerException, PipelineStoreException {
        checkState(getState().getStatus().isActive(), ContainerError.CONTAINER_0106, new Object[0]);
        return this.prodPipeline.getErrorMessages(str, i);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public List<SampledRecord> getSampledRecords(String str, int i) throws PipelineRunnerException, PipelineStoreException {
        checkState(getState().getStatus().isActive(), ContainerError.CONTAINER_0106, new Object[0]);
        return this.observerRunnable.getSampledRecords(str, i);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public List<AlertInfo> getAlerts() throws PipelineException {
        ArrayList arrayList = new ArrayList();
        MetricRegistry metricRegistry = (MetricRegistry) getMetrics();
        if (metricRegistry != null) {
            RuleDefinitions retrieveRules = getPipelineStore().retrieveRules(getName(), getRev());
            for (MetricsRuleDefinition metricsRuleDefinition : retrieveRules.getMetricsRuleDefinitions()) {
                Gauge gauge = MetricsConfigurator.getGauge(metricRegistry, AlertsUtil.getAlertGaugeName(metricsRuleDefinition.getId()));
                if (gauge != null) {
                    arrayList.add(new AlertInfo(getName(), metricsRuleDefinition, gauge));
                }
            }
            for (DataRuleDefinition dataRuleDefinition : retrieveRules.getAllDataRuleDefinitions()) {
                Gauge gauge2 = MetricsConfigurator.getGauge(metricRegistry, AlertsUtil.getAlertGaugeName(dataRuleDefinition.getId()));
                if (gauge2 != null) {
                    arrayList.add(new AlertInfo(getName(), dataRuleDefinition, gauge2));
                }
            }
        }
        return arrayList;
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public boolean deleteAlert(String str) throws PipelineRunnerException, PipelineStoreException {
        checkState(getState().getStatus().isActive(), ContainerError.CONTAINER_0402, new Object[0]);
        MetricsConfigurator.resetCounter((MetricRegistry) getMetrics(), AlertsUtil.getUserMetricName(str));
        return MetricsConfigurator.removeGauge((MetricRegistry) getMetrics(), AlertsUtil.getAlertGaugeName(str), getName(), getRev());
    }

    @Override // _ss_com.streamsets.datacollector.execution.StateListener
    public void stateChanged(PipelineStatus pipelineStatus, String str, Map<String, Object> map) throws PipelineRuntimeException {
        HashMap hashMap = null;
        if (map != null) {
            try {
                hashMap = new HashMap();
                hashMap.putAll(getState().getAttributes());
                hashMap.putAll(map);
            } catch (PipelineRunnerException | PipelineStoreException e) {
                throw new PipelineRuntimeException(e.getErrorCode(), e);
            }
        }
        validateAndSetStateTransition(getState().getUser(), pipelineStatus, str, hashMap);
    }

    private void validateAndSetStateTransition(String str, PipelineStatus pipelineStatus, String str2, Map<String, Object> map) throws PipelineStoreException, PipelineRunnerException {
        PipelineState state;
        PipelineState saveState;
        synchronized (this) {
            state = getState();
            checkState(VALID_TRANSITIONS.get(state.getStatus()).contains(pipelineStatus), ContainerError.CONTAINER_0102, state.getStatus(), pipelineStatus);
            long nextRetryTimeStamp = state.getNextRetryTimeStamp();
            int retryAttempt = state.getRetryAttempt();
            String str3 = null;
            if (pipelineStatus == PipelineStatus.RETRY && state.getStatus() != PipelineStatus.CONNECTING) {
                retryAttempt = state.getRetryAttempt() + 1;
                if (retryAttempt <= this.maxRetries || this.maxRetries == -1) {
                    nextRetryTimeStamp = RetryUtils.getNextRetryTimeStamp(retryAttempt, System.currentTimeMillis());
                    this.isRetrying = true;
                    this.metricsForRetry = getState().getMetrics();
                } else {
                    LOG.info("Retry attempt '{}' is greater than max no of retries '{}'", Integer.valueOf(retryAttempt), Integer.valueOf(this.maxRetries));
                    pipelineStatus = PipelineStatus.RUN_ERROR;
                    retryAttempt = 0;
                    nextRetryTimeStamp = 0;
                }
            } else if (!pipelineStatus.isActive()) {
                retryAttempt = 0;
                nextRetryTimeStamp = 0;
            }
            if (!pipelineStatus.isActive() || pipelineStatus == PipelineStatus.DISCONNECTED || (pipelineStatus == PipelineStatus.RETRY && state.getStatus() != PipelineStatus.CONNECTING)) {
                Object metrics = getMetrics();
                if (metrics != null) {
                    try {
                        str3 = ObjectMapperFactory.get().writeValueAsString(metrics);
                        getEventListenerManager().broadcastMetrics(getName(), str3);
                    } catch (JsonProcessingException e) {
                        throw new PipelineStoreException(ContainerError.CONTAINER_0210, e.toString(), e);
                    }
                }
                if (str3 == null) {
                    str3 = getState().getMetrics();
                }
            }
            saveState = getPipelineStateStore().saveState(str, getName(), getRev(), pipelineStatus, str2, map, ExecutionMode.STANDALONE, str3, retryAttempt, nextRetryTimeStamp);
            if (pipelineStatus == PipelineStatus.RETRY) {
                this.retryFuture = scheduleForRetries(this.runnerExecutor);
            }
        }
        getEventListenerManager().broadcastStateChange(state, saveState, ThreadUsage.STANDALONE, OffsetFileUtil.getOffsets(getRuntimeInfo(), getName(), getRev()));
    }

    private void checkState(boolean z, ContainerError containerError, Object... objArr) throws PipelineRunnerException {
        if (!z) {
            throw new PipelineRunnerException(containerError, objArr);
        }
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void prepareForStart(Runner.StartPipelineContext startPipelineContext) throws PipelineStoreException, PipelineRunnerException {
        PipelineState state = getState();
        checkState(VALID_TRANSITIONS.get(state.getStatus()).contains(PipelineStatus.STARTING), ContainerError.CONTAINER_0102, state.getStatus(), PipelineStatus.STARTING);
        if (!this.resourceManager.requestRunnerResources(ThreadUsage.STANDALONE)) {
            throw new PipelineRunnerException(ContainerError.CONTAINER_0166, getName());
        }
        LOG.info("Preparing to start pipeline '{}::{}'", getName(), getRev());
        setStartPipelineContext(startPipelineContext);
        validateAndSetStateTransition(startPipelineContext.getUser(), PipelineStatus.STARTING, null, createStateAttributes());
        this.token = UUID.randomUUID().toString();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void prepareForStop(String str) throws PipelineStoreException, PipelineRunnerException {
        LOG.info("Preparing to stop pipeline");
        if (getState().getStatus() != PipelineStatus.RETRY) {
            validateAndSetStateTransition(str, PipelineStatus.STOPPING, null, null);
            return;
        }
        this.retryFuture.cancel(true);
        validateAndSetStateTransition(str, PipelineStatus.STOPPING, null, null);
        validateAndSetStateTransition(str, PipelineStatus.STOPPED, "Stopped while the pipeline was in RETRY state", null);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void start(Runner.StartPipelineContext startPipelineContext) throws PipelineException, StageException {
        startPipeline(startPipelineContext);
        LOG.debug("Starting the runnable for pipeline {} {}", getName(), getRev());
        if (this.pipelineRunnable.isStopped()) {
            return;
        }
        this.pipelineRunnable.run();
    }

    private void startPipeline(Runner.StartPipelineContext startPipelineContext) throws PipelineException, StageException {
        Utils.checkState(!this.isClosed, Utils.formatL("Cannot start the pipeline '{}::{}' as the runner is already closed", new Object[]{getName(), getRev()}));
        synchronized (this) {
            try {
                LOG.info("Starting pipeline {} {}", getName(), getRev());
                setStartPipelineContext(startPipelineContext);
                UserContext userContext = new UserContext(startPipelineContext.getUser(), getRuntimeInfo().isDPMEnabled(), getConfiguration().get(RemoteSSOService.DPM_USER_ALIAS_NAME_ENABLED, false));
                PipelineConfiguration pipelineConf = getPipelineConf(getName(), getRev());
                ArrayList arrayList = new ArrayList();
                PipelineEL.setConstantsInContext(pipelineConf, userContext, getState().getTimeStamp());
                PipelineConfigBean create = PipelineBeanCreator.get().create(pipelineConf, arrayList, getStartPipelineContext().getRuntimeParameters());
                if (create == null) {
                    throw new PipelineRuntimeException(ContainerError.CONTAINER_0116, arrayList);
                }
                JobEL.setConstantsInContext(create.constants);
                this.maxRetries = create.retryAttempts;
                ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(getConfiguration().get(Constants.OBSERVER_QUEUE_SIZE_KEY, 100), true);
                ArrayBlockingQueue arrayBlockingQueue2 = null;
                boolean isStatsAggregationEnabled = isStatsAggregationEnabled(pipelineConf);
                if (isStatsAggregationEnabled) {
                    arrayBlockingQueue2 = new ArrayBlockingQueue(getConfiguration().get(Constants.STATS_AGGREGATOR_QUEUE_SIZE_KEY, 1000), true);
                }
                ObjectGraph plus = this.objectGraph.plus(new PipelineProviderModule(getName(), pipelineConf.getTitle(), getRev(), isStatsAggregationEnabled, create.constants));
                this.threadHealthReporter = (ThreadHealthReporter) plus.get(ThreadHealthReporter.class);
                this.observerRunnable = (DataObserverRunnable) plus.get(DataObserverRunnable.class);
                this.metricsEventRunnable = (MetricsEventRunnable) plus.get(MetricsEventRunnable.class);
                ImmutableList.Builder builder = ImmutableList.builder();
                ProductionObserver productionObserver = (ProductionObserver) plus.get(Observer.class);
                productionObserver.setPipelineStartTime(getState().getTimeStamp());
                RulesConfigLoader rulesConfigLoader = (RulesConfigLoader) plus.get(RulesConfigLoader.class);
                RulesConfigLoaderRunnable rulesConfigLoaderRunnable = (RulesConfigLoaderRunnable) plus.get(RulesConfigLoaderRunnable.class);
                MetricObserverRunnable metricObserverRunnable = (MetricObserverRunnable) plus.get(MetricObserverRunnable.class);
                ProductionPipelineRunner productionPipelineRunner = (ProductionPipelineRunner) plus.get(PipelineRunner.class);
                productionPipelineRunner.addErrorListeners(this.errorListeners);
                if (this.isRetrying) {
                    ObjectMapper objectMapper = ObjectMapperFactory.get();
                    try {
                        if (this.metricsForRetry != null) {
                            MetricRegistryJson metricRegistryJson = (MetricRegistryJson) objectMapper.readValue(this.metricsForRetry, MetricRegistryJson.class);
                            productionPipelineRunner.updateMetrics(metricRegistryJson);
                            this.observerRunnable.setMetricRegistryJson(metricRegistryJson);
                        }
                    } catch (IOException e) {
                        LOG.warn("Error while serializing slave metrics: , {}", e.toString(), e);
                    }
                    this.isRetrying = false;
                }
                if (create.rateLimit > 0) {
                    productionPipelineRunner.setRateLimit(Long.valueOf(create.rateLimit));
                }
                ProductionPipelineBuilder productionPipelineBuilder = (ProductionPipelineBuilder) plus.get(ProductionPipelineBuilder.class);
                registerEmailNotifierIfRequired(create, getName(), pipelineConf.getTitle(), getRev());
                registerWebhookNotifierIfRequired(create, getName(), pipelineConf.getTitle(), getRev());
                productionObserver.setObserveRequests(arrayBlockingQueue);
                productionPipelineRunner.setObserveRequests(arrayBlockingQueue);
                productionPipelineRunner.setStatsAggregatorRequests(arrayBlockingQueue2);
                productionPipelineRunner.setDeliveryGuarantee(create.deliveryGuarantee);
                this.prodPipeline = productionPipelineBuilder.build(userContext, pipelineConf, getState().getTimeStamp(), startPipelineContext.getInterceptorConfigurations(), startPipelineContext.getRuntimeParameters());
                this.prodPipeline.registerStatusListener(this);
                this.metricsEventRunnable.setStatsQueue(arrayBlockingQueue2);
                this.metricsEventRunnable.setPipelineConfiguration(pipelineConf);
                if (getConfiguration().get("ui.refresh.interval.ms", 2000) > 0) {
                    builder.add((ImmutableList.Builder) this.runnerExecutor.scheduleAtFixedRate(this.metricsEventRunnable, 0L, this.metricsEventRunnable.getScheduledDelay(), TimeUnit.MILLISECONDS));
                }
                rulesConfigLoader.setStatsQueue(arrayBlockingQueue2);
                try {
                    rulesConfigLoader.load(productionObserver);
                    builder.add((ImmutableList.Builder) this.runnerExecutor.scheduleWithFixedDelay(rulesConfigLoaderRunnable, 1L, 2L, TimeUnit.SECONDS));
                    builder.add((ImmutableList.Builder) this.runnerExecutor.scheduleWithFixedDelay(metricObserverRunnable, 1L, 2L, TimeUnit.SECONDS));
                    if (create.runnerIdleTIme > 0) {
                        ProduceEmptyBatchesForIdleRunnersRunnable produceEmptyBatchesForIdleRunnersRunnable = new ProduceEmptyBatchesForIdleRunnersRunnable(productionPipelineRunner, create.runnerIdleTIme * 1000);
                        long j = (create.runnerIdleTIme * 1000) / 10;
                        builder.add((ImmutableList.Builder) this.runnerExecutor.scheduleWithFixedDelay(produceEmptyBatchesForIdleRunnersRunnable, j, j, TimeUnit.MILLISECONDS));
                    }
                    this.updateChecker = new UpdateChecker(getRuntimeInfo(), getConfiguration(), pipelineConf, this);
                    builder.add((ImmutableList.Builder) this.runnerExecutor.scheduleAtFixedRate(this.updateChecker, 1L, 1440L, TimeUnit.MINUTES));
                    this.observerRunnable.setRequestQueue(arrayBlockingQueue);
                    this.observerRunnable.setStatsQueue(arrayBlockingQueue2);
                    builder.add((ImmutableList.Builder) this.runnerExecutor.submit(this.observerRunnable));
                    this.pipelineRunnable = new ProductionPipelineRunnable(this.threadHealthReporter, this, this.prodPipeline, getName(), getRev(), builder.build());
                } catch (InterruptedException e2) {
                    throw new PipelineRuntimeException(ContainerError.CONTAINER_0403, getName(), e2.toString(), e2);
                }
            } catch (Exception e3) {
                validateAndSetStateTransition(startPipelineContext.getUser(), PipelineStatus.START_ERROR, e3.toString(), null);
                throw e3;
            }
        }
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void startAndCaptureSnapshot(Runner.StartPipelineContext startPipelineContext, String str, String str2, int i, int i2) throws PipelineException, StageException {
        try {
            startPipeline(startPipelineContext);
            captureSnapshot(startPipelineContext.getUser(), str, str2, i, i2, false);
            LOG.debug("Starting the runnable for pipeline {} {}", getName(), getRev());
            if (!this.pipelineRunnable.isStopped()) {
                this.pipelineRunnable.run();
            }
        } catch (Exception e) {
            validateAndSetStateTransition(startPipelineContext.getUser(), PipelineStatus.START_ERROR, e.toString(), null);
            throw e;
        }
    }

    private boolean isStatsAggregationEnabled(PipelineConfiguration pipelineConfiguration) throws PipelineStoreException {
        boolean z = false;
        StageConfiguration statsAggregatorStage = pipelineConfiguration.getStatsAggregatorStage();
        if (statsAggregatorStage != null && !statsAggregatorStage.getStageName().equals(STATS_NULL_TARGET) && !statsAggregatorStage.getStageName().equals("com_streamsets_pipeline_stage_destination_devnull_StatsDpmDirectlyDTarget") && pipelineConfiguration.getMetadata() != null) {
            z = true;
        }
        return z && isRemotePipeline();
    }

    private void stopPipeline(boolean z) throws PipelineException {
        if (this.pipelineRunnable != null && !this.pipelineRunnable.isStopped()) {
            LOG.info("Stopping pipeline {} {}", this.pipelineRunnable.getName(), this.pipelineRunnable.getRev());
            this.pipelineRunnable.stop(z);
            this.pipelineRunnable = null;
        }
        if (this.metricsEventRunnable != null) {
            this.metricsEventRunnable.onStopPipeline();
            this.metricsEventRunnable = null;
        }
        if (this.threadHealthReporter != null) {
            this.threadHealthReporter.destroy();
            this.threadHealthReporter = null;
        }
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void close() {
        this.isClosed = true;
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public Map getUpdateInfo() {
        return this.updateChecker.getUpdateInfo();
    }

    public Pipeline getPipeline() {
        if (this.prodPipeline != null) {
            return this.prodPipeline.getPipeline();
        }
        return null;
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public Collection<CallbackInfo> getSlaveCallbackList(CallbackObjectType callbackObjectType) {
        throw new UnsupportedOperationException("This method is only supported in Cluster Runner");
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void updateSlaveCallbackInfo(CallbackInfo callbackInfo) {
        throw new UnsupportedOperationException("This method is only supported in Cluster Runner");
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public String getToken() {
        return this.token;
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public int getRunnerCount() {
        if (this.prodPipeline != null) {
            return this.prodPipeline.getPipeline().getNumOfRunners();
        }
        return 0;
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public Runner getDelegatingRunner() {
        return null;
    }
}
