package com.streamsets.pipeline.sdk;

import _ss_com.com.google.common.base.Preconditions;
import _ss_com.streamsets.datacollector.main.RuntimeInfo;
import _ss_com.streamsets.datacollector.runner.PushSourceContextDelegate;
import com.streamsets.pipeline.api.BatchContext;
import com.streamsets.pipeline.api.DeliveryGuarantee;
import com.streamsets.pipeline.api.ExecutionMode;
import com.streamsets.pipeline.api.OnRecordError;
import com.streamsets.pipeline.api.PushSource;
import com.streamsets.pipeline.api.StageException;
import com.streamsets.pipeline.api.StageType;
import com.streamsets.pipeline.api.impl.Utils;
import com.streamsets.pipeline.sdk.ProtoRunner;
import com.streamsets.pipeline.sdk.StageRunner;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/streamsets/pipeline/sdk/PushSourceRunner.class */
public class PushSourceRunner extends StageRunner<PushSource> implements PushSourceContextDelegate {
    private static final Logger LOG = LoggerFactory.getLogger(PushSourceRunner.class);
    private Map<String, String> offsets;
    private Callback callback;
    private ExecutorService executor;
    private Future<Void> stageRunner;

    /* loaded from: input_file:com/streamsets/pipeline/sdk/PushSourceRunner$Builder.class */
    public static class Builder extends StageRunner.Builder<PushSource, PushSourceRunner, Builder> {
        public Builder(Class<? extends PushSource> cls, PushSource pushSource) {
            super(cls, pushSource);
        }

        public Builder(Class<? extends PushSource> cls) {
            super(cls);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // com.streamsets.pipeline.sdk.StageRunner.Builder
        public PushSourceRunner build() {
            Utils.checkState(!this.outputLanes.isEmpty(), "A Source must have at least one output stream");
            return this.stage != 0 ? new PushSourceRunner(this.stageClass, this.stage, this.configs, this.outputLanes, this.isPreview, this.onRecordError, this.constants, this.stageSdcConf, this.executionMode, this.deliveryGuarantee, this.resourcesDir, this.runtimeInfo, this.services) : new PushSourceRunner(this.stageClass, this.configs, this.outputLanes, this.isPreview, this.onRecordError, this.constants, this.stageSdcConf, this.executionMode, this.deliveryGuarantee, this.resourcesDir, this.runtimeInfo, this.services);
        }
    }

    /* loaded from: input_file:com/streamsets/pipeline/sdk/PushSourceRunner$Callback.class */
    public interface Callback {
        void processBatch(StageRunner.Output output) throws StageException;
    }

    /* loaded from: input_file:com/streamsets/pipeline/sdk/PushSourceRunner$StageRunnable.class */
    private class StageRunnable implements Runnable {
        Map<String, String> lastOffsets;
        int maxBatchSize;

        StageRunnable(Map<String, String> map, int i) {
            this.lastOffsets = map;
            this.maxBatchSize = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                PushSourceRunner.this.getStage().produce(this.lastOffsets, this.maxBatchSize);
            } catch (StageException e) {
                throw new RuntimeException((Throwable) e);
            }
        }
    }

    public PushSourceRunner(Class<PushSource> cls, Map<String, Object> map, List<String> list, boolean z, OnRecordError onRecordError, Map<String, Object> map2, Map<String, String> map3, ExecutionMode executionMode, DeliveryGuarantee deliveryGuarantee, String str, RuntimeInfo runtimeInfo, List<ServiceRunner> list2) {
        super(cls, StageType.SOURCE, map, list, z, onRecordError, map2, map3, executionMode, deliveryGuarantee, str, runtimeInfo, list2);
    }

    public PushSourceRunner(Class<PushSource> cls, PushSource pushSource, Map<String, Object> map, List<String> list, boolean z, OnRecordError onRecordError, Map<String, Object> map2, Map<String, String> map3, ExecutionMode executionMode, DeliveryGuarantee deliveryGuarantee, String str, RuntimeInfo runtimeInfo, List<ServiceRunner> list2) {
        super(cls, pushSource, StageType.SOURCE, map, list, z, onRecordError, map2, map3, executionMode, deliveryGuarantee, str, runtimeInfo, list2);
    }

    public void runProduce(Map<String, String> map, int i, Callback callback) throws StageException {
        Preconditions.checkNotNull(map, "Last offsets can't be null");
        Preconditions.checkNotNull(callback, "Callback object can't be null");
        this.offsets = new HashMap(map);
        this.callback = callback;
        this.executor = Executors.newSingleThreadExecutor();
        try {
            LOG.debug("Stage '{}' produce starts", getInfo().getInstanceName());
            ensureStatus(ProtoRunner.Status.INITIALIZED);
            getContext().setPushSourceContextDelegate(this);
            this.stageRunner = this.executor.submit(new StageRunnable(map, i));
            LOG.debug("Stage '{}' produce ends", getInfo().getInstanceName());
        } catch (Throwable th) {
            LOG.debug("Stage '{}' produce ends", getInfo().getInstanceName());
            throw th;
        }
    }

    public void setStop() {
        getContext().setStop(true);
    }

    public void waitOnProduce() throws ExecutionException, InterruptedException {
        try {
            this.stageRunner.get();
        } finally {
            this.executor.shutdownNow();
        }
    }

    @Override // _ss_com.streamsets.datacollector.runner.PushSourceContextDelegate
    public BatchContext startBatch() {
        return new BatchContextSdkImpl(new BatchMakerImpl(getContext().getOutputLanes()), getContext());
    }

    @Override // _ss_com.streamsets.datacollector.runner.PushSourceContextDelegate
    public boolean processBatch(BatchContext batchContext, String str, String str2) {
        try {
            this.callback.processBatch(StageRunner.getOutput(str, str2, batchContext.getBatchMaker()));
            if (str == null) {
                return true;
            }
            commitOffset(str, str2);
            return true;
        } catch (StageException e) {
            LOG.error("StageException while processing batch: {}", e.toString(), e);
            return false;
        }
    }

    @Override // _ss_com.streamsets.datacollector.runner.PushSourceContextDelegate
    public void commitOffset(String str, String str2) {
        Preconditions.checkNotNull(str);
        if (str2 == null) {
            this.offsets.remove(str);
        } else {
            this.offsets.put(str, str2);
        }
    }

    public Map<String, String> getOffsets() {
        return this.offsets;
    }
}
