package com.streamsets.pipeline.sdk;

import _ss_com.com.google.common.collect.ImmutableList;
import _ss_com.com.google.common.collect.Sets;
import _ss_com.streamsets.datacollector.email.EmailSender;
import _ss_com.streamsets.datacollector.json.JsonMapperImpl;
import _ss_com.streamsets.datacollector.lineage.LineagePublisherDelegator;
import _ss_com.streamsets.datacollector.main.RuntimeInfo;
import _ss_com.streamsets.datacollector.restapi.configuration.ConfigurationInjector;
import _ss_com.streamsets.datacollector.runner.SourceResponseSink;
import _ss_com.streamsets.datacollector.runner.StageContext;
import _ss_com.streamsets.datacollector.util.AggregatorUtil;
import _ss_com.streamsets.datacollector.util.Configuration;
import _ss_com.streamsets.datacollector.util.ContainerError;
import _ss_org.apache.commons.configuration2.tree.DefaultExpressionEngineSymbols;
import com.streamsets.pipeline.api.BatchMaker;
import com.streamsets.pipeline.api.DeliveryGuarantee;
import com.streamsets.pipeline.api.EventRecord;
import com.streamsets.pipeline.api.ExecutionMode;
import com.streamsets.pipeline.api.OnRecordError;
import com.streamsets.pipeline.api.Record;
import com.streamsets.pipeline.api.Stage;
import com.streamsets.pipeline.api.StageDef;
import com.streamsets.pipeline.api.StageException;
import com.streamsets.pipeline.api.StageType;
import com.streamsets.pipeline.api.ext.DataCollectorServices;
import com.streamsets.pipeline.api.impl.ErrorMessage;
import com.streamsets.pipeline.api.impl.Utils;
import com.streamsets.pipeline.api.lineage.LineageEvent;
import com.streamsets.pipeline.api.service.ServiceDependency;
import com.streamsets.pipeline.sdk.ProtoRunner;
import com.streamsets.pipeline.sdk.ServiceRunner;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/streamsets/pipeline/sdk/StageRunner.class */
public abstract class StageRunner<S extends Stage> extends ProtoRunner {
    private static final Logger LOG = LoggerFactory.getLogger(StageRunner.class);
    private final Class<S> stageClass;
    private final S stage;
    private final Stage.Info info;
    private final StageContext context;
    private final List<ServiceRunner> services;
    private final List<LineageEvent> lineageEvents;

    /* loaded from: input_file:com/streamsets/pipeline/sdk/StageRunner$Builder.class */
    public static abstract class Builder<S extends Stage, R extends StageRunner, B extends Builder> {
        final S stage;
        final Class<S> stageClass;
        final List<String> outputLanes;
        final Map<String, String> stageSdcConf;
        final Map<String, Object> configs;
        final Map<String, Object> constants;
        boolean isPreview;
        ExecutionMode executionMode;
        DeliveryGuarantee deliveryGuarantee;
        OnRecordError onRecordError;
        String resourcesDir;
        RuntimeInfo runtimeInfo;
        List<ServiceRunner> services;

        /* JADX INFO: Access modifiers changed from: protected */
        public Builder(Class<S> cls, S s) {
            this.executionMode = ExecutionMode.STANDALONE;
            this.deliveryGuarantee = DeliveryGuarantee.AT_LEAST_ONCE;
            this.stageClass = cls;
            this.stage = s;
            this.outputLanes = new ArrayList();
            this.configs = new HashMap();
            this.onRecordError = OnRecordError.STOP_PIPELINE;
            this.constants = new HashMap();
            this.stageSdcConf = new HashMap();
            this.runtimeInfo = new SdkRuntimeInfo("", null, null);
            this.services = new ArrayList();
        }

        public B setPreview(boolean z) {
            this.isPreview = z;
            return this;
        }

        public B setExecutionMode(ExecutionMode executionMode) {
            this.executionMode = executionMode;
            return this;
        }

        public B setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) {
            this.deliveryGuarantee = deliveryGuarantee;
            return this;
        }

        public B setResourcesDir(String str) {
            this.resourcesDir = str;
            return this;
        }

        public B setOnRecordError(OnRecordError onRecordError) {
            this.onRecordError = onRecordError;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public Builder(Class<S> cls) {
            this((Class) Utils.checkNotNull(cls, "stageClass"), null);
        }

        public B addOutputLane(String str) {
            this.outputLanes.add((String) Utils.checkNotNull(str, AggregatorUtil.LANE));
            return this;
        }

        public B addConfiguration(String str, Object obj) {
            this.configs.put((String) Utils.checkNotNull(str, "name"), obj);
            return this;
        }

        public B addStageSdcConfiguration(String str, String str2) {
            this.stageSdcConf.put((String) Utils.checkNotNull(str, "name"), str2);
            return this;
        }

        public B addConstants(Map<String, Object> map) {
            this.constants.putAll(map);
            return this;
        }

        public B setRuntimeInfo(RuntimeInfo runtimeInfo) {
            this.runtimeInfo = runtimeInfo;
            return this;
        }

        public B addService(Class cls, Object obj) {
            this.services.add(new ServiceRunner.Builder(cls, obj).build());
            return this;
        }

        public B addService(ServiceRunner serviceRunner) {
            this.services.add(serviceRunner);
            return this;
        }

        public abstract R build();
    }

    /* loaded from: input_file:com/streamsets/pipeline/sdk/StageRunner$Output.class */
    public static class Output {
        private final String offsetEntity;
        private final String newOffset;
        private final Map<String, List<Record>> records;

        /* JADX INFO: Access modifiers changed from: package-private */
        public Output(String str, String str2, Map<String, List<Record>> map) {
            this.offsetEntity = str;
            this.newOffset = str2;
            for (Map.Entry<String, List<Record>> entry : map.entrySet()) {
                entry.setValue(Collections.unmodifiableList(entry.getValue()));
            }
            this.records = Collections.unmodifiableMap(map);
        }

        public String getOffsetEntity() {
            return this.offsetEntity;
        }

        public String getNewOffset() {
            return this.newOffset;
        }

        public Map<String, List<Record>> getRecords() {
            return this.records;
        }
    }

    private static Stage getStage(Class<? extends Stage> cls) {
        try {
            return cls.newInstance();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private StageDef getStageDefinition(Class<? extends Stage> cls) {
        return cls.getAnnotation(StageDef.class);
    }

    private String getName(Class<? extends Stage> cls) {
        return cls.getName().replace(DefaultExpressionEngineSymbols.DEFAULT_PROPERTY_DELIMITER, "_");
    }

    private int getVersion(Class<? extends Stage> cls) {
        StageDef stageDefinition = getStageDefinition(cls);
        if (stageDefinition != null) {
            return stageDefinition.version();
        }
        return -1;
    }

    private Set<Class> getDeclaredServices(Class<? extends Stage> cls) {
        StageDef stageDefinition = getStageDefinition(cls);
        HashSet hashSet = new HashSet();
        for (ServiceDependency serviceDependency : stageDefinition.services()) {
            hashSet.add(serviceDependency.service());
        }
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StageRunner(Class<S> cls, StageType stageType, Map<String, Object> map, List<String> list, boolean z, OnRecordError onRecordError, Map<String, Object> map2, Map<String, String> map3, ExecutionMode executionMode, DeliveryGuarantee deliveryGuarantee, String str, RuntimeInfo runtimeInfo, List<ServiceRunner> list2) {
        this(cls, getStage((Class) Utils.checkNotNull(cls, "stageClass")), stageType, map, list, z, onRecordError, map2, map3, executionMode, deliveryGuarantee, str, runtimeInfo, list2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public StageRunner(Class<S> cls, S s, StageType stageType, Map<String, Object> map, List<String> list, boolean z, OnRecordError onRecordError, Map<String, Object> map2, Map<String, String> map3, ExecutionMode executionMode, DeliveryGuarantee deliveryGuarantee, String str, RuntimeInfo runtimeInfo, List<ServiceRunner> list2) {
        if (DataCollectorServices.instance().get("com.streamsets.pipeline.api.ext.json.JsonMapper") == null) {
            DataCollectorServices.instance().put("com.streamsets.pipeline.api.ext.json.JsonMapper", new JsonMapperImpl());
        }
        Utils.checkNotNull(s, "stage");
        Utils.checkNotNull(map, ConfigurationInjector.CONFIGURATION);
        Utils.checkNotNull(list, "outputLanes");
        Utils.checkState(getStageDefinition(cls) != null, Utils.format("@StageDef annotation not found on class {} (provided the right DStage as stageClass argument?)", new Object[]{cls.toString()}));
        this.stageClass = cls;
        this.stage = s;
        try {
            configureObject(s, map);
            String name = getName(s.getClass());
            String str2 = name + "_1";
            this.info = ContextInfoCreator.createInfo(name, getVersion(cls), str2);
            try {
                Map<String, Class<?>[]> configToElDefMap = ElUtil.getConfigToElDefMap(cls);
                Configuration configuration = new Configuration();
                map3.forEach((str3, str4) -> {
                    configuration.set("stage.conf_" + str3, str4);
                });
                this.lineageEvents = new ArrayList();
                this.services = list2;
                HashMap hashMap = new HashMap();
                for (ServiceRunner serviceRunner : list2) {
                    hashMap.put(serviceRunner.getServiceClass(), serviceRunner.getService());
                }
                Set<Class> declaredServices = getDeclaredServices(cls);
                Set keySet = hashMap.keySet();
                Sets.SetView difference = Sets.difference(declaredServices, keySet);
                Sets.SetView difference2 = Sets.difference(keySet, declaredServices);
                if (!difference.isEmpty() || !difference2.isEmpty()) {
                    throw new RuntimeException(Utils.format("Services mismatch - missing ({}), extra({})", new Object[]{difference, difference2}));
                }
                this.context = new StageContext(str2, stageType, 0, z, onRecordError, list, configToElDefMap, map2, executionMode, deliveryGuarantee, str, new EmailSender(new Configuration()), configuration, new LineagePublisherDelegator.ListDelegator(this.lineageEvents), runtimeInfo, hashMap);
                this.context.getErrorSink().registerInterceptorsForStage(getInfo().getInstanceName(), Collections.emptyList());
                this.context.getEventSink().registerInterceptorsForStage(getInfo().getInstanceName(), Collections.emptyList());
                this.status = ProtoRunner.Status.CREATED;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    public S getStage() {
        return this.stage;
    }

    public Stage.Info getInfo() {
        return this.info;
    }

    public Stage.Context getContext() {
        return this.context;
    }

    public List<Stage.ConfigIssue> runValidateConfigs() throws StageException {
        try {
            LOG.debug("Stage '{}' validateConfigs starts", getInfo().getInstanceName());
            ensureStatus(ProtoRunner.Status.CREATED);
            try {
                ArrayList arrayList = new ArrayList();
                Iterator<ServiceRunner> it = this.services.iterator();
                while (it.hasNext()) {
                    arrayList.addAll(it.next().runValidateConfigs());
                }
                arrayList.addAll(this.stage.init(getInfo(), getContext()));
                this.stage.destroy();
                LOG.debug("Stage '{}' validateConfigs ends", getInfo().getInstanceName());
                return arrayList;
            } catch (Throwable th) {
                this.stage.destroy();
                throw th;
            }
        } catch (Throwable th2) {
            LOG.debug("Stage '{}' validateConfigs ends", getInfo().getInstanceName());
            throw th2;
        }
    }

    public void runInit() throws StageException {
        LOG.debug("Stage '{}' init starts", getInfo().getInstanceName());
        ensureStatus(ProtoRunner.Status.CREATED);
        Iterator<ServiceRunner> it = this.services.iterator();
        while (it.hasNext()) {
            it.next().runInit();
        }
        List init = this.stage.init(getInfo(), getContext());
        if (init.isEmpty()) {
            this.status = ProtoRunner.Status.INITIALIZED;
            LOG.debug("Stage '{}' init ends", getInfo().getInstanceName());
        } else {
            ArrayList arrayList = new ArrayList(init.size());
            Iterator it2 = init.iterator();
            while (it2.hasNext()) {
                arrayList.add(((Stage.ConfigIssue) it2.next()).toString());
            }
            throw new StageException(ContainerError.CONTAINER_0010, new Object[]{arrayList});
        }
    }

    public void runDestroy() throws StageException {
        LOG.debug("Stage '{}' destroy starts", getInfo().getInstanceName());
        ensureStatus(ProtoRunner.Status.INITIALIZED, ProtoRunner.Status.CREATED);
        Iterator<ServiceRunner> it = this.services.iterator();
        while (it.hasNext()) {
            it.next().runDestroy();
        }
        this.stage.destroy();
        this.status = ProtoRunner.Status.DESTROYED;
        LOG.debug("Stage '{}' destroy ends", getInfo().getInstanceName());
    }

    public List<Record> getErrorRecords() throws StageException {
        return this.context.getErrorSink().getErrorRecords(this.info.getInstanceName());
    }

    public List<String> getErrors() {
        List<ErrorMessage> stageErrors = this.context.getErrorSink().getStageErrors(this.info.getInstanceName());
        ArrayList arrayList = new ArrayList();
        Iterator<ErrorMessage> it = stageErrors.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getNonLocalized());
        }
        return arrayList;
    }

    public void clearErrors() {
        this.context.getErrorSink().clear();
    }

    public List<EventRecord> getEventRecords() throws StageException {
        return this.context.getEventSink().getStageEventsAsEventRecords(this.info.getInstanceName());
    }

    public void clearEvents() {
        this.context.getEventSink().clear();
    }

    public List<LineageEvent> getLineageEvents() {
        return ImmutableList.copyOf((Collection) this.lineageEvents);
    }

    public void clearLineageEvents() {
        this.lineageEvents.clear();
    }

    public SourceResponseSink getSourceResponseSink() {
        return this.context.getSourceResponseSink();
    }

    public void setSourceResponseSink(SourceResponseSink sourceResponseSink) {
        this.context.setSourceResponseSink(sourceResponseSink);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static BatchMaker createTestBatchMaker(String... strArr) {
        return new BatchMakerImpl(ImmutableList.copyOf(strArr));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Output getOutput(BatchMaker batchMaker) {
        return getOutput("$com.streamsets.datacollector.pollsource.offset$", "sdk:offset", batchMaker);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Output getOutput(String str, String str2, BatchMaker batchMaker) {
        return new Output(str, str2, ((BatchMakerImpl) batchMaker).getOutput());
    }
}
