package _ss_com.streamsets.datacollector.execution.runner.common;

import _ss_com.com.google.common.base.Preconditions;
import _ss_com.com.google.common.base.Throwables;
import _ss_com.com.google.common.collect.EvictingQueue;
import _ss_com.com.google.common.collect.ImmutableList;
import _ss_com.com.google.common.collect.ImmutableMap;
import _ss_com.com.google.common.collect.Lists;
import _ss_com.com.google.common.collect.Sets;
import _ss_com.com.google.common.util.concurrent.RateLimiter;
import _ss_com.streamsets.datacollector.bundles.SupportBundleManager;
import _ss_com.streamsets.datacollector.config.PipelineConfiguration;
import _ss_com.streamsets.datacollector.creation.PipelineConfigBean;
import _ss_com.streamsets.datacollector.el.JobEL;
import _ss_com.streamsets.datacollector.el.PipelineEL;
import _ss_com.streamsets.datacollector.execution.SnapshotInfo;
import _ss_com.streamsets.datacollector.execution.SnapshotStore;
import _ss_com.streamsets.datacollector.execution.metrics.MetricsEventRunnable;
import _ss_com.streamsets.datacollector.json.ObjectMapperFactory;
import _ss_com.streamsets.datacollector.main.RuntimeInfo;
import _ss_com.streamsets.datacollector.metrics.MetricsConfigurator;
import _ss_com.streamsets.datacollector.restapi.bean.MeterJson;
import _ss_com.streamsets.datacollector.restapi.bean.MetricRegistryJson;
import _ss_com.streamsets.datacollector.runner.BatchContextImpl;
import _ss_com.streamsets.datacollector.runner.BatchImpl;
import _ss_com.streamsets.datacollector.runner.BatchListener;
import _ss_com.streamsets.datacollector.runner.ErrorSink;
import _ss_com.streamsets.datacollector.runner.EventSink;
import _ss_com.streamsets.datacollector.runner.FullPipeBatch;
import _ss_com.streamsets.datacollector.runner.Observer;
import _ss_com.streamsets.datacollector.runner.Pipe;
import _ss_com.streamsets.datacollector.runner.PipeContext;
import _ss_com.streamsets.datacollector.runner.PipeRunner;
import _ss_com.streamsets.datacollector.runner.PipelineRunner;
import _ss_com.streamsets.datacollector.runner.PipelineRuntimeException;
import _ss_com.streamsets.datacollector.runner.ProcessedSink;
import _ss_com.streamsets.datacollector.runner.PushSourceContextDelegate;
import _ss_com.streamsets.datacollector.runner.RunnerPool;
import _ss_com.streamsets.datacollector.runner.SourceOffsetTracker;
import _ss_com.streamsets.datacollector.runner.SourcePipe;
import _ss_com.streamsets.datacollector.runner.SourceResponseSink;
import _ss_com.streamsets.datacollector.runner.StageOutput;
import _ss_com.streamsets.datacollector.runner.StagePipe;
import _ss_com.streamsets.datacollector.runner.StageRuntime;
import _ss_com.streamsets.datacollector.runner.production.BadRecordsHandler;
import _ss_com.streamsets.datacollector.runner.production.PipelineErrorNotificationRequest;
import _ss_com.streamsets.datacollector.runner.production.ReportErrorDelegate;
import _ss_com.streamsets.datacollector.runner.production.StatsAggregationHandler;
import _ss_com.streamsets.datacollector.util.AggregatorUtil;
import _ss_com.streamsets.datacollector.util.Configuration;
import _ss_com.streamsets.datacollector.util.ContainerError;
import _ss_com.streamsets.datacollector.util.PipelineException;
import _ss_com.streamsets.datacollector.util.ValidationUtil;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.streamsets.pipeline.api.BatchContext;
import com.streamsets.pipeline.api.DeliveryGuarantee;
import com.streamsets.pipeline.api.ErrorListener;
import com.streamsets.pipeline.api.OffsetCommitTrigger;
import com.streamsets.pipeline.api.PushSource;
import com.streamsets.pipeline.api.Record;
import com.streamsets.pipeline.api.StageException;
import com.streamsets.pipeline.api.StageType;
import com.streamsets.pipeline.api.impl.ErrorMessage;
import com.streamsets.pipeline.api.impl.Utils;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Named;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:_ss_com/streamsets/datacollector/execution/runner/common/ProductionPipelineRunner.class */
public class ProductionPipelineRunner implements PipelineRunner, PushSourceContextDelegate, ReportErrorDelegate {
    private static final Logger LOG = LoggerFactory.getLogger(ProductionPipelineRunner.class);
    private final RuntimeInfo runtimeInfo;
    private final Configuration configuration;
    private final MetricRegistry metrics;
    private SourceOffsetTracker offsetTracker;
    private final SnapshotStore snapshotStore;
    private DeliveryGuarantee deliveryGuarantee;
    private final String pipelineName;
    private final String revision;
    private final SupportBundleManager supportBundleManager;
    private SourcePipe originPipe;
    private List<PipeRunner> pipes;
    private RunnerPool<PipeRunner> runnerPool;
    private BadRecordsHandler badRecordsHandler;
    private StatsAggregationHandler statsAggregationHandler;
    private final Timer batchProcessingTimer;
    private final Meter batchCountMeter;
    private final Counter batchCountCounter;
    private final Histogram batchInputRecordsHistogram;
    private final Histogram batchOutputRecordsHistogram;
    private final Histogram batchErrorRecordsHistogram;
    private final Histogram batchErrorsHistogram;
    private final Meter batchInputRecordsMeter;
    private final Meter batchOutputRecordsMeter;
    private final Meter batchErrorRecordsMeter;
    private final Meter batchErrorMessagesMeter;
    private final Counter batchInputRecordsCounter;
    private final Counter batchOutputRecordsCounter;
    private final Counter batchErrorRecordsCounter;
    private final Counter batchErrorMessagesCounter;
    private final Histogram runnersHistogram;
    private MetricRegistryJson metricRegistryJson;
    private Long rateLimit;
    private RateLimiter rateLimiter;
    private volatile String snapshotName;
    private volatile int snapshotBatchSize;
    private BlockingQueue<Object> observeRequests;
    private Observer observer;
    private BlockingQueue<Record> statsAggregatorRequests;
    private ThreadHealthReporter threadHealthReporter;
    private long pipelineStartTime;
    private Map<String, Object> parameters;
    private volatile boolean stop = false;
    private volatile boolean finished = false;
    private volatile boolean running = false;
    private volatile int batchesToCapture = 0;
    private volatile Throwable exceptionFromExecution = null;
    private final List<BatchListener> batchListenerList = new CopyOnWriteArrayList();
    private final List<List<StageOutput>> capturedBatches = new ArrayList();
    private PipeContext pipeContext = null;
    private PipelineConfigBean pipelineConfigBean = null;
    private PipelineConfiguration pipelineConfiguration = null;
    private Lock destroyLock = new ReentrantLock();
    private final Map<String, EvictingQueue<Record>> stageToErrorRecordsMap = new HashMap();
    private final Map<String, EvictingQueue<ErrorMessage>> stageToErrorMessagesMap = new HashMap();
    private final List<ErrorListener> errorListeners = new ArrayList();

    @Inject
    public ProductionPipelineRunner(@Named("name") String str, @Named("rev") String str2, SupportBundleManager supportBundleManager, Configuration configuration, RuntimeInfo runtimeInfo, MetricRegistry metricRegistry, SnapshotStore snapshotStore, ThreadHealthReporter threadHealthReporter) {
        this.runtimeInfo = runtimeInfo;
        this.configuration = configuration;
        this.metrics = metricRegistry;
        this.threadHealthReporter = threadHealthReporter;
        this.snapshotStore = snapshotStore;
        this.pipelineName = str;
        this.revision = str2;
        this.supportBundleManager = supportBundleManager;
        MetricsConfigurator.registerPipeline(str, str2);
        this.batchProcessingTimer = MetricsConfigurator.createTimer(metricRegistry, "pipeline.batchProcessing", str, str2);
        this.batchCountMeter = MetricsConfigurator.createMeter(metricRegistry, "pipeline.batchCount", str, str2);
        this.batchCountCounter = MetricsConfigurator.createCounter(metricRegistry, "pipeline.batchCount", str, str2);
        this.batchInputRecordsHistogram = MetricsConfigurator.createHistogram5Min(metricRegistry, "pipeline.inputRecordsPerBatch", str, str2);
        this.batchOutputRecordsHistogram = MetricsConfigurator.createHistogram5Min(metricRegistry, "pipeline.outputRecordsPerBatch", str, str2);
        this.batchErrorRecordsHistogram = MetricsConfigurator.createHistogram5Min(metricRegistry, "pipeline.errorRecordsPerBatch", str, str2);
        this.batchErrorsHistogram = MetricsConfigurator.createHistogram5Min(metricRegistry, "pipeline.errorsPerBatch", str, str2);
        this.batchInputRecordsMeter = MetricsConfigurator.createMeter(metricRegistry, "pipeline.batchInputRecords", str, str2);
        this.batchOutputRecordsMeter = MetricsConfigurator.createMeter(metricRegistry, "pipeline.batchOutputRecords", str, str2);
        this.batchErrorRecordsMeter = MetricsConfigurator.createMeter(metricRegistry, "pipeline.batchErrorRecords", str, str2);
        this.batchErrorMessagesMeter = MetricsConfigurator.createMeter(metricRegistry, "pipeline.batchErrorMessages", str, str2);
        this.batchInputRecordsCounter = MetricsConfigurator.createCounter(metricRegistry, "pipeline.batchInputRecords", str, str2);
        this.batchOutputRecordsCounter = MetricsConfigurator.createCounter(metricRegistry, "pipeline.batchOutputRecords", str, str2);
        this.batchErrorRecordsCounter = MetricsConfigurator.createCounter(metricRegistry, "pipeline.batchErrorRecords", str, str2);
        this.batchErrorMessagesCounter = MetricsConfigurator.createCounter(metricRegistry, "pipeline.batchErrorMessages", str, str2);
        this.runnersHistogram = MetricsConfigurator.createHistogram5Min(metricRegistry, "pipeline.runners", str, str2);
    }

    public void setObserveRequests(BlockingQueue<Object> blockingQueue) {
        this.observeRequests = blockingQueue;
    }

    public void setStatsAggregatorRequests(BlockingQueue<Record> blockingQueue) {
        this.statsAggregatorRequests = blockingQueue;
    }

    public void addErrorListeners(List<ErrorListener> list) {
        LOG.info("Adding error listeners" + list.size());
        this.errorListeners.addAll(list);
    }

    public void updateMetrics(MetricRegistryJson metricRegistryJson) {
        this.metricRegistryJson = metricRegistryJson;
        this.batchInputRecordsHistogram.update(metricRegistryJson.getHistograms().get("pipeline.inputRecordsPerBatch.histogramM5").getCount());
        this.batchOutputRecordsHistogram.update(metricRegistryJson.getHistograms().get("pipeline.outputRecordsPerBatch.histogramM5").getCount());
        this.batchErrorRecordsHistogram.update(metricRegistryJson.getHistograms().get("pipeline.errorRecordsPerBatch.histogramM5").getCount());
        this.batchErrorsHistogram.update(metricRegistryJson.getHistograms().get("pipeline.errorsPerBatch.histogramM5").getCount());
        MeterJson meterJson = metricRegistryJson.getMeters().get("pipeline.batchInputRecords.meter");
        this.batchInputRecordsMeter.mark(meterJson.getCount());
        this.batchInputRecordsCounter.inc(meterJson.getCount());
        MeterJson meterJson2 = metricRegistryJson.getMeters().get("pipeline.batchOutputRecords.meter");
        this.batchOutputRecordsMeter.mark(meterJson2.getCount());
        this.batchOutputRecordsCounter.inc(meterJson2.getCount());
        MeterJson meterJson3 = metricRegistryJson.getMeters().get("pipeline.batchErrorRecords.meter");
        this.batchErrorRecordsMeter.mark(meterJson3.getCount());
        this.batchErrorRecordsCounter.inc(meterJson3.getCount());
        MeterJson meterJson4 = metricRegistryJson.getMeters().get("pipeline.batchErrorMessages.meter");
        this.batchErrorMessagesMeter.mark(meterJson4.getCount());
        this.batchErrorMessagesCounter.inc(meterJson4.getCount());
        this.runnersHistogram.update(metricRegistryJson.getHistograms().get("pipeline.runners.histogramM5").getCount());
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public MetricRegistryJson getMetricRegistryJson() {
        return this.metricRegistryJson;
    }

    public void setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) {
        this.deliveryGuarantee = deliveryGuarantee;
    }

    public void setRateLimit(Long l) {
        this.rateLimit = l;
        this.rateLimiter = RateLimiter.create(l.doubleValue());
    }

    public void setOffsetTracker(SourceOffsetTracker sourceOffsetTracker) {
        this.offsetTracker = sourceOffsetTracker;
    }

    public void setPipelineStartTime(long j) {
        this.pipelineStartTime = j;
    }

    public void setParameters(Map<String, Object> map) {
        this.parameters = map;
    }

    public void setThreadHealthReporter(ThreadHealthReporter threadHealthReporter) {
        this.threadHealthReporter = threadHealthReporter;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public void setObserver(Observer observer) {
        this.observer = observer;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public RuntimeInfo getRuntimeInfo() {
        return this.runtimeInfo;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public boolean isPreview() {
        return false;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public MetricRegistry getMetrics() {
        return this.metrics;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public void runLifecycleEvent(Record record, StageRuntime stageRuntime) throws StageException, PipelineRuntimeException {
        BatchImpl batchImpl = new BatchImpl(stageRuntime.getConfiguration().getInstanceName(), "", "", ImmutableList.of(record));
        ErrorSink errorSink = new ErrorSink();
        Preconditions.checkArgument(stageRuntime.getDefinition().getType().isOneOf(new StageType[]{StageType.EXECUTOR, StageType.TARGET}), "Invalid lifecycle event stage type: " + stageRuntime.getDefinition().getType());
        stageRuntime.execute(null, 1000, batchImpl, null, errorSink, new EventSink(), new ProcessedSink(), new SourceResponseSink());
        if (errorSink.getErrorRecords().isEmpty()) {
            return;
        }
        throw new PipelineRuntimeException(ContainerError.CONTAINER_0792, (String) errorSink.getErrorRecords().values().stream().flatMap((v0) -> {
            return v0.stream();
        }).map(record2 -> {
            return record2.getHeader().getErrorMessage();
        }).collect(Collectors.joining(", ")));
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public void run(SourcePipe sourcePipe, List<PipeRunner> list, BadRecordsHandler badRecordsHandler, StatsAggregationHandler statsAggregationHandler) throws StageException, PipelineRuntimeException {
        this.originPipe = sourcePipe;
        this.pipes = list;
        this.badRecordsHandler = badRecordsHandler;
        this.statsAggregationHandler = statsAggregationHandler;
        this.runnerPool = new RunnerPool<>(list, this.pipeContext.getRuntimeStats(), this.runnersHistogram);
        this.running = true;
        try {
            if (sourcePipe.getStage().getStage() instanceof PushSource) {
                runPushSource();
            } else {
                runPollSource();
            }
        } catch (Throwable th) {
            LOG.error("Pipeline execution failed", th);
            sendPipelineErrorNotificationRequest(th);
            errorNotification(sourcePipe, list, th);
            if (this.supportBundleManager != null) {
                this.supportBundleManager.uploadNewBundleOnError();
            }
            Throwables.propagateIfInstanceOf(th, StageException.class);
            Throwables.propagateIfInstanceOf(th, PipelineRuntimeException.class);
            Throwables.propagate(th);
        }
    }

    private void runPushSource() throws StageException, PipelineRuntimeException {
        this.originPipe.getStage().setPushSourceContextDelegate(this);
        int i = this.configuration.get(Constants.MAX_BATCH_SIZE_KEY, 1000);
        if (this.batchesToCapture > 0) {
            i = this.snapshotBatchSize;
        }
        this.originPipe.process(this.offsetTracker.getOffsets(), i, this);
        if (this.exceptionFromExecution != null) {
            Throwables.propagateIfInstanceOf(this.exceptionFromExecution, StageException.class);
            Throwables.propagateIfInstanceOf(this.exceptionFromExecution, PipelineRuntimeException.class);
            Throwables.propagate(this.exceptionFromExecution);
        }
    }

    private FullPipeBatch createFullPipeBatch(String str, String str2) {
        FullPipeBatch fullPipeBatch = this.batchesToCapture > 0 ? new FullPipeBatch(str, str2, this.snapshotBatchSize, true) : new FullPipeBatch(str, str2, this.configuration.get(Constants.MAX_BATCH_SIZE_KEY, 1000), false);
        fullPipeBatch.setRateLimiter(this.rateLimiter);
        return fullPipeBatch;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PushSourceContextDelegate
    public BatchContext startBatch() {
        if (this.observer != null) {
            this.observer.reconfigure();
        }
        BatchContextImpl batchContextImpl = new BatchContextImpl(createFullPipeBatch(null, null));
        this.originPipe.prepareBatchContext(batchContextImpl);
        PipelineEL.setConstantsInContext(this.pipelineConfiguration, this.originPipe.getStage().getContext().getUserContext(), this.pipelineStartTime);
        JobEL.setConstantsInContext(this.parameters);
        Iterator<BatchListener> it = this.batchListenerList.iterator();
        while (it.hasNext()) {
            it.next().preBatch();
        }
        return batchContextImpl;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PushSourceContextDelegate
    public boolean processBatch(BatchContext batchContext, String str, String str2) {
        BatchContextImpl batchContextImpl = (BatchContextImpl) batchContext;
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        try {
            try {
                Map<String, Object> finishBatchContext = this.originPipe.finishBatchContext(batchContextImpl);
                if (isStatsAggregationEnabled()) {
                    hashMap2.put(this.originPipe.getStage().getInfo().getInstanceName(), finishBatchContext);
                }
                runSourceLessBatch(batchContextImpl.getStartTime(), batchContextImpl.getPipeBatch(), str, str2, hashMap, hashMap2);
                Iterator<BatchListener> it = this.batchListenerList.iterator();
                while (it.hasNext()) {
                    it.next().postBatch();
                }
                PipelineEL.unsetConstantsInContext();
                JobEL.unsetConstantsInContext();
                return true;
            } catch (Throwable th) {
                LOG.error("Can't process batch", th);
                createFailureBatch(batchContextImpl.getPipeBatch());
                this.originPipe.getStage().getContext().setStop(true);
                synchronized (this) {
                    if (this.exceptionFromExecution == null) {
                        this.exceptionFromExecution = th;
                    }
                    PipelineEL.unsetConstantsInContext();
                    JobEL.unsetConstantsInContext();
                    return false;
                }
            }
        } catch (Throwable th2) {
            PipelineEL.unsetConstantsInContext();
            JobEL.unsetConstantsInContext();
            throw th2;
        }
    }

    @Override // _ss_com.streamsets.datacollector.runner.PushSourceContextDelegate
    public void commitOffset(String str, String str2) {
        this.offsetTracker.commitOffset(str, str2);
    }

    @Override // _ss_com.streamsets.datacollector.runner.production.ReportErrorDelegate
    public void reportError(String str, ErrorMessage errorMessage) {
        retainErrorMessagesInMemory(ImmutableMap.of(str, ImmutableList.of(errorMessage)));
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineFinisherDelegate
    public void setFinished() {
        this.finished = true;
        this.originPipe.getStage().getContext().setStop(true);
    }

    public void runPollSource() throws StageException, PipelineException {
        while (!this.offsetTracker.isFinished() && !this.stop && !this.finished) {
            if (this.threadHealthReporter != null) {
                this.threadHealthReporter.reportHealth(ProductionPipelineRunnable.RUNNABLE_NAME, -1, System.currentTimeMillis());
            }
            Iterator<BatchListener> it = this.batchListenerList.iterator();
            while (it.hasNext()) {
                it.next().preBatch();
            }
            if (this.observer != null) {
                this.observer.reconfigure();
            }
            long currentTimeMillis = System.currentTimeMillis();
            FullPipeBatch createFullPipeBatch = createFullPipeBatch("$com.streamsets.datacollector.pollsource.offset$", this.offsetTracker.getOffsets().get("$com.streamsets.datacollector.pollsource.offset$"));
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            processPipe(this.originPipe, createFullPipeBatch, false, null, null, hashMap, hashMap2);
            try {
                runSourceLessBatch(currentTimeMillis, createFullPipeBatch, "$com.streamsets.datacollector.pollsource.offset$", createFullPipeBatch.getNewOffset(), hashMap, hashMap2);
            } catch (Throwable th) {
                createFailureBatch(createFullPipeBatch);
                Throwables.propagateIfInstanceOf(th, StageException.class);
                Throwables.propagateIfInstanceOf(th, PipelineRuntimeException.class);
                Throwables.propagate(th);
            }
            Iterator<BatchListener> it2 = this.batchListenerList.iterator();
            while (it2.hasNext()) {
                it2.next().postBatch();
            }
        }
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public void errorNotification(SourcePipe sourcePipe, List<PipeRunner> list, Throwable th) {
        Set<ErrorListener> newIdentityHashSet = Sets.newIdentityHashSet();
        Iterator<BatchListener> it = this.batchListenerList.iterator();
        while (it.hasNext()) {
            it.next().postBatch();
        }
        newIdentityHashSet.addAll(new ArrayList(this.errorListeners));
        if (sourcePipe.getStage().getStage() instanceof ErrorListener) {
            newIdentityHashSet.add(sourcePipe.getStage().getStage());
        }
        Iterator<PipeRunner> it2 = list.iterator();
        while (it2.hasNext()) {
            it2.next().forEach(pipe -> {
                ErrorListener stage = pipe.getStage().getStage();
                if (stage instanceof ErrorListener) {
                    newIdentityHashSet.add(stage);
                }
            });
        }
        for (ErrorListener errorListener : newIdentityHashSet) {
            try {
                errorListener.errorNotification(th);
            } catch (Exception e) {
                LOG.error(Utils.format("Error in calling ErrorListenerStage {}: {}", new Object[]{errorListener.getClass().getName(), e}), e);
            }
        }
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public void run(SourcePipe sourcePipe, List<PipeRunner> list, BadRecordsHandler badRecordsHandler, List<StageOutput> list2, StatsAggregationHandler statsAggregationHandler) throws StageException, PipelineRuntimeException {
        throw new UnsupportedOperationException();
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public void destroy(SourcePipe sourcePipe, List<PipeRunner> list, BadRecordsHandler badRecordsHandler, StatsAggregationHandler statsAggregationHandler) throws StageException, PipelineRuntimeException {
        this.running = false;
        try {
            this.destroyLock.lock();
            if (this.runnerPool != null) {
                this.runnerPool.destroy();
            }
            int i = this.configuration.get(Constants.MAX_BATCH_SIZE_KEY, 1000);
            long lastBatchTime = this.offsetTracker.getLastBatchTime();
            long currentTimeMillis = System.currentTimeMillis();
            FullPipeBatch fullPipeBatch = new FullPipeBatch(null, null, i, false);
            try {
                LOG.trace("Destroying origin pipe");
                fullPipeBatch.skipStage(sourcePipe);
                sourcePipe.destroy(fullPipeBatch);
            } catch (RuntimeException e) {
                LOG.warn("Exception throw while destroying pipe", e);
            }
            for (PipeRunner pipeRunner : Lists.reverse(list)) {
                FullPipeBatch fullPipeBatch2 = fullPipeBatch;
                pipeRunner.executeBatch(null, null, currentTimeMillis, pipe -> {
                    pipe.getStage().getContext().setLastBatchTime(lastBatchTime);
                    String instanceName = pipe.getStage().getConfiguration().getInstanceName();
                    if (!(pipe instanceof StagePipe)) {
                        LOG.trace("Non stage pipe {}, running last process", instanceName);
                        pipe.process(fullPipeBatch2);
                    } else if (pipe.getStage().getConfiguration().isInEventPath()) {
                        LOG.trace("Stage pipe {} is in event path, running last process", instanceName);
                        pipe.process(fullPipeBatch2);
                    } else {
                        LOG.trace("Stage pipe {} is in data path, skipping it's processing.", instanceName);
                        fullPipeBatch2.skipStage(pipe);
                    }
                    try {
                        LOG.trace("Running destroy for {}", instanceName);
                        pipe.destroy(fullPipeBatch2);
                    } catch (RuntimeException e2) {
                        LOG.warn("Exception throw while destroying pipe", e2);
                    }
                });
                badRecordsHandler.handle(null, null, fullPipeBatch.getErrorSink(), fullPipeBatch.getSourceResponseSink());
                fullPipeBatch = new FullPipeBatch(null, null, i, false);
                fullPipeBatch.skipStage(sourcePipe);
            }
            if (isStatsAggregationEnabled()) {
                ArrayList arrayList = new ArrayList();
                this.statsAggregatorRequests.drainTo(arrayList);
                Object obj = this.pipelineConfigBean.constants.get(MetricsEventRunnable.TIME_SERIES_ANALYSIS);
                boolean booleanValue = obj != null ? ((Boolean) obj).booleanValue() : true;
                try {
                    String writeValueAsString = ObjectMapperFactory.get().writer().writeValueAsString(this.metrics);
                    LOG.info("Queueing last batch of record to be sent to stats aggregator");
                    arrayList.add(AggregatorUtil.createMetricJsonRecord(this.runtimeInfo.getId(), this.runtimeInfo.getMasterSDCId(), this.pipelineConfiguration.getMetadata(), false, booleanValue, true, writeValueAsString));
                    statsAggregationHandler.handle(null, null, arrayList);
                } catch (Exception e2) {
                    throw new RuntimeException(Utils.format("Error converting metric json to string: {}", new Object[]{e2}), e2);
                }
            }
        } finally {
            this.destroyLock.unlock();
        }
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public List<List<StageOutput>> getBatchesOutput() {
        throw new UnsupportedOperationException();
    }

    public Map<String, String> getCommittedOffsets() {
        return this.offsetTracker.getOffsets();
    }

    public void stop() throws PipelineException {
        this.stop = true;
        if (this.batchesToCapture > 0) {
            cancelSnapshot(this.snapshotName);
            this.snapshotStore.deleteSnapshot(this.pipelineName, this.revision, this.snapshotName);
        }
    }

    public boolean wasStopped() {
        return this.stop;
    }

    public void capture(String str, int i, int i2) {
        Preconditions.checkArgument(i > 0);
        this.snapshotName = str;
        this.snapshotBatchSize = i;
        this.batchesToCapture = i2;
    }

    public void cancelSnapshot(String str) throws PipelineException {
        Preconditions.checkArgument(this.snapshotName != null && this.snapshotName.equals(str));
        synchronized (this) {
            this.snapshotBatchSize = 0;
            this.batchesToCapture = 0;
            this.capturedBatches.clear();
        }
    }

    private boolean processPipe(Pipe pipe, FullPipeBatch fullPipeBatch, boolean z, String str, String str2, Map<String, Long> map, Map<String, Object> map2) throws PipelineRuntimeException, StageException {
        pipe.getStage().getContext().setLastBatchTime(this.offsetTracker.getLastBatchTime());
        if (!fullPipeBatch.isIdleBatch() && this.deliveryGuarantee == DeliveryGuarantee.AT_MOST_ONCE && pipe.getStage().getDefinition().getType() == StageType.TARGET && !z) {
            this.offsetTracker.commitOffset(str, str2);
            z = true;
        }
        pipe.process(fullPipeBatch);
        if ((pipe instanceof StagePipe) && isStatsAggregationEnabled()) {
            map2.put(pipe.getStage().getInfo().getInstanceName(), ((StagePipe) pipe).getBatchMetrics());
        }
        return z;
    }

    private void runSourceLessBatch(long j, FullPipeBatch fullPipeBatch, String str, String str2, Map<String, Long> map, Map<String, Object> map2) throws PipelineException, StageException {
        PipeRunner pipeRunner = null;
        try {
            pipeRunner = this.runnerPool.getRunner();
            executeRunner(pipeRunner, j, fullPipeBatch, str, str2, map, map2);
            if (pipeRunner != null) {
                this.runnerPool.returnRunner(pipeRunner);
            }
        } catch (Throwable th) {
            if (pipeRunner != null) {
                this.runnerPool.returnRunner(pipeRunner);
            }
            throw th;
        }
    }

    private void executeRunner(PipeRunner pipeRunner, long j, FullPipeBatch fullPipeBatch, String str, String str2, Map<String, Long> map, Map<String, Object> map2) throws PipelineException, StageException {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        String previousOffset = fullPipeBatch.getPreviousOffset();
        OffsetCommitTrigger offsetCommitTrigger = pipeRunner.getOffsetCommitTrigger();
        pipeRunner.executeBatch(str, str2, j, pipe -> {
            atomicBoolean.set(processPipe(pipe, fullPipeBatch, atomicBoolean.get(), str, str2, map, map2));
        });
        this.badRecordsHandler.handle(str, str2, fullPipeBatch.getErrorSink(), fullPipeBatch.getSourceResponseSink());
        if (!fullPipeBatch.isIdleBatch() && this.deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE && (offsetCommitTrigger == null || offsetCommitTrigger.commit())) {
            this.offsetTracker.commitOffset(str, str2);
        }
        long currentTimeMillis = System.currentTimeMillis() - j;
        this.batchProcessingTimer.update(currentTimeMillis, TimeUnit.MILLISECONDS);
        this.batchCountCounter.inc();
        this.batchCountMeter.mark();
        this.batchInputRecordsHistogram.update(fullPipeBatch.getInputRecords());
        this.batchOutputRecordsHistogram.update(fullPipeBatch.getOutputRecords());
        this.batchErrorRecordsHistogram.update(fullPipeBatch.getErrorRecords());
        this.batchErrorsHistogram.update(fullPipeBatch.getErrorMessages());
        this.batchInputRecordsMeter.mark(fullPipeBatch.getInputRecords());
        this.batchOutputRecordsMeter.mark(fullPipeBatch.getOutputRecords());
        this.batchErrorRecordsMeter.mark(fullPipeBatch.getErrorRecords());
        this.batchErrorMessagesMeter.mark(fullPipeBatch.getErrorMessages());
        this.batchInputRecordsCounter.inc(fullPipeBatch.getInputRecords());
        this.batchOutputRecordsCounter.inc(fullPipeBatch.getOutputRecords());
        this.batchErrorRecordsCounter.inc(fullPipeBatch.getErrorRecords());
        this.batchErrorMessagesCounter.inc(fullPipeBatch.getErrorMessages());
        if (this.pipeContext != null) {
            this.pipeContext.getRuntimeStats().setLastBatchInputRecordsCount(fullPipeBatch.getInputRecords());
            this.pipeContext.getRuntimeStats().setLastBatchOutputRecordsCount(fullPipeBatch.getOutputRecords());
            this.pipeContext.getRuntimeStats().setLastBatchErrorRecordsCount(fullPipeBatch.getErrorRecords());
            this.pipeContext.getRuntimeStats().setLastBatchErrorMessagesCount(fullPipeBatch.getErrorMessages());
        }
        if (isStatsAggregationEnabled()) {
            HashMap hashMap = new HashMap();
            hashMap.put(AggregatorUtil.PIPELINE_BATCH_DURATION, Long.valueOf(currentTimeMillis));
            hashMap.put("batchCount", 1);
            hashMap.put(AggregatorUtil.BATCH_INPUT_RECORDS, Integer.valueOf(fullPipeBatch.getInputRecords()));
            hashMap.put(AggregatorUtil.BATCH_OUTPUT_RECORDS, Integer.valueOf(fullPipeBatch.getOutputRecords()));
            hashMap.put(AggregatorUtil.BATCH_ERROR_RECORDS, Integer.valueOf(fullPipeBatch.getErrorRecords()));
            hashMap.put(AggregatorUtil.BATCH_ERRORS, Integer.valueOf(fullPipeBatch.getErrorMessages()));
            hashMap.put(AggregatorUtil.STAGE_BATCH_METRICS, map2);
            AggregatorUtil.enqueStatsRecord(AggregatorUtil.createMetricRecord(hashMap), this.statsAggregatorRequests, this.configuration);
        }
        synchronized (this) {
            List<StageOutput> snapshotsOfAllStagesOutput = fullPipeBatch.getSnapshotsOfAllStagesOutput();
            if (this.batchesToCapture > 0 && ValidationUtil.isSnapshotOutputUsable(fullPipeBatch.getSnapshotsOfAllStagesOutput())) {
                if (!snapshotsOfAllStagesOutput.isEmpty()) {
                    this.capturedBatches.add(snapshotsOfAllStagesOutput);
                }
                this.batchesToCapture--;
                if (this.batchesToCapture == 0) {
                    this.snapshotBatchSize = 0;
                    this.batchesToCapture = 0;
                    if (!this.capturedBatches.isEmpty()) {
                        this.snapshotStore.save(this.pipelineName, this.revision, this.snapshotName, this.batchCountMeter.getCount(), this.capturedBatches);
                        this.capturedBatches.clear();
                    }
                }
            }
        }
        Map<String, List<Record>> errorRecords = fullPipeBatch.getErrorSink().getErrorRecords();
        retainErrorMessagesInMemory(fullPipeBatch.getErrorSink().getStageErrors());
        retainErrorRecordsInMemory(errorRecords);
        if (isStatsAggregationEnabled()) {
            ArrayList arrayList = new ArrayList();
            this.statsAggregatorRequests.drainTo(arrayList);
            this.statsAggregationHandler.handle(str, previousOffset, arrayList);
        }
    }

    public int produceEmptyBatchesForIdleRunners(long j) throws PipelineException, StageException {
        LOG.debug("Checking if any active runner is idle");
        int i = 0;
        try {
            this.destroyLock.lock();
            while (this.running && i < this.pipes.size()) {
                i++;
                PipeRunner pipeRunner = null;
                try {
                    pipeRunner = this.runnerPool.getIdleRunner(j);
                    if (pipeRunner == null) {
                        if (pipeRunner != null) {
                            this.runnerPool.returnRunner(pipeRunner);
                        }
                        return i;
                    }
                    LOG.debug("Generating empty batch for runner: {}", Integer.valueOf(pipeRunner.getRunnerId()));
                    this.pipeContext.getRuntimeStats().incIdleBatchCount();
                    FullPipeBatch fullPipeBatch = new FullPipeBatch(null, null, 0, false);
                    fullPipeBatch.setIdleBatch(true);
                    fullPipeBatch.skipStage(this.originPipe);
                    executeRunner(pipeRunner, System.currentTimeMillis(), fullPipeBatch, null, null, new HashMap(), new HashMap());
                    if (pipeRunner != null) {
                        this.runnerPool.returnRunner(pipeRunner);
                    }
                } catch (Throwable th) {
                    if (pipeRunner != null) {
                        this.runnerPool.returnRunner(pipeRunner);
                    }
                    throw th;
                }
            }
            this.destroyLock.unlock();
            return i;
        } finally {
            this.destroyLock.unlock();
        }
    }

    private void sendPipelineErrorNotificationRequest(Throwable th) {
        boolean z = false;
        try {
            this.observeRequests.put(new PipelineErrorNotificationRequest(th));
            z = true;
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while sending pipeline error notification request.");
            Thread.currentThread().interrupt();
        }
        if (z) {
            return;
        }
        LOG.error("Could not submit alert request for pipeline ending error: {}", th, th);
    }

    public SourceOffsetTracker getOffSetTracker() {
        return this.offsetTracker;
    }

    private void retainErrorMessagesInMemory(Map<String, List<ErrorMessage>> map) {
        if (map.isEmpty()) {
            return;
        }
        synchronized (this.stageToErrorMessagesMap) {
            for (Map.Entry<String, List<ErrorMessage>> entry : map.entrySet()) {
                this.stageToErrorMessagesMap.computeIfAbsent(entry.getKey(), str -> {
                    return EvictingQueue.create(this.configuration.get(Constants.MAX_PIPELINE_ERRORS_KEY, 100));
                }).addAll(map.get(entry.getKey()));
            }
        }
    }

    private void retainErrorRecordsInMemory(Map<String, List<Record>> map) {
        if (map.isEmpty()) {
            return;
        }
        synchronized (this.stageToErrorRecordsMap) {
            for (Map.Entry<String, List<Record>> entry : map.entrySet()) {
                this.stageToErrorRecordsMap.computeIfAbsent(entry.getKey(), str -> {
                    return EvictingQueue.create(this.configuration.get(Constants.MAX_ERROR_RECORDS_PER_STAGE_KEY, 100));
                }).addAll(map.get(entry.getKey()));
            }
        }
    }

    public List<Record> getErrorRecords(String str, int i) {
        synchronized (this.stageToErrorRecordsMap) {
            if (this.stageToErrorRecordsMap == null || this.stageToErrorRecordsMap.isEmpty() || this.stageToErrorRecordsMap.get(str) == null || this.stageToErrorRecordsMap.get(str).isEmpty()) {
                return Collections.emptyList();
            }
            if (this.stageToErrorRecordsMap.get(str).size() > i) {
                return new CopyOnWriteArrayList(this.stageToErrorRecordsMap.get(str)).subList(0, i);
            }
            return new CopyOnWriteArrayList(this.stageToErrorRecordsMap.get(str));
        }
    }

    public List<ErrorMessage> getErrorMessages(String str, int i) {
        synchronized (this.stageToErrorMessagesMap) {
            if (this.stageToErrorMessagesMap == null || this.stageToErrorMessagesMap.isEmpty() || this.stageToErrorMessagesMap.get(str) == null || this.stageToErrorMessagesMap.get(str).isEmpty()) {
                return Collections.emptyList();
            }
            if (this.stageToErrorMessagesMap.get(str).size() > i) {
                return new CopyOnWriteArrayList(this.stageToErrorMessagesMap.get(str)).subList(0, i);
            }
            return new CopyOnWriteArrayList(this.stageToErrorMessagesMap.get(str));
        }
    }

    private void createFailureBatch(FullPipeBatch fullPipeBatch) {
        if (this.pipelineConfigBean.shouldCreateFailureSnapshot) {
            try {
                for (SnapshotInfo snapshotInfo : this.snapshotStore.getSummaryForPipeline(this.pipelineName, this.revision)) {
                    if (snapshotInfo.isFailureSnapshot()) {
                        LOG.trace("Skipping creation of failure snapshot as {} already exists.", snapshotInfo.getId());
                        return;
                    }
                }
                String str = "Failure_" + UUID.randomUUID().toString();
                this.snapshotStore.create("", this.pipelineName, this.revision, str, "Failure at " + LocalDateTime.now().toString(), true);
                this.snapshotStore.save(this.pipelineName, this.revision, str, -1L, ImmutableList.of(fullPipeBatch.createFailureSnapshot()));
            } catch (PipelineException e) {
                LOG.error("Can't serialize failure snapshot", e);
            }
        }
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public void registerListener(BatchListener batchListener) {
        this.batchListenerList.add(batchListener);
    }

    private boolean isStatsAggregationEnabled() {
        return null != this.statsAggregatorRequests;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public void setRuntimeConfiguration(PipeContext pipeContext, PipelineConfiguration pipelineConfiguration, PipelineConfigBean pipelineConfigBean) {
        this.pipeContext = pipeContext;
        this.pipelineConfiguration = pipelineConfiguration;
        this.pipelineConfigBean = pipelineConfigBean;
    }
}
