package _ss_com.streamsets.datacollector.lineage;

import _ss_com.com.google.common.annotations.VisibleForTesting;
import _ss_com.streamsets.datacollector.config.LineagePublisherDefinition;
import _ss_com.streamsets.datacollector.stagelibrary.StageLibraryTask;
import _ss_com.streamsets.datacollector.task.AbstractTask;
import _ss_com.streamsets.datacollector.util.Configuration;
import _ss_org.apache.commons.lang3.StringUtils;
import com.streamsets.pipeline.api.impl.Utils;
import com.streamsets.pipeline.api.lineage.LineageEvent;
import com.streamsets.pipeline.api.lineage.LineagePublisher;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:_ss_com/streamsets/datacollector/lineage/LineagePublisherTaskImpl.class */
public class LineagePublisherTaskImpl extends AbstractTask implements LineagePublisherTask {
    private static final Logger LOG = LoggerFactory.getLogger(LineagePublisherTaskImpl.class);
    private final Configuration configuration;
    private final StageLibraryTask stageLibraryTask;

    @VisibleForTesting
    LineagePublisherRuntime publisherRuntime;
    private ArrayBlockingQueue<LineageEvent> eventQueue;
    private ExecutorService executorService;
    private EventQueueConsumer consumerRunnable;
    private Future consumerFuture;

    /* loaded from: input_file:_ss_com/streamsets/datacollector/lineage/LineagePublisherTaskImpl$EventQueueConsumer.class */
    private class EventQueueConsumer implements Runnable {
        boolean continueRunning;

        private EventQueueConsumer() {
            this.continueRunning = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            LineagePublisherTaskImpl.LOG.info("Starting lineage event consumer");
            Thread.currentThread().setName("Lineage Publisher Consumer");
            while (this.continueRunning) {
                ArrayList arrayList = new ArrayList();
                try {
                    LineageEvent lineageEvent = (LineageEvent) LineagePublisherTaskImpl.this.eventQueue.poll(1L, TimeUnit.SECONDS);
                    if (lineageEvent != null) {
                        arrayList.add(lineageEvent);
                        LineagePublisherTaskImpl.this.eventQueue.drainTo(arrayList);
                        LineagePublisherTaskImpl.LOG.debug("Consuming {} lineage events", Integer.valueOf(arrayList.size()));
                        try {
                            LineagePublisherTaskImpl.this.publisherRuntime.publishEvents(arrayList);
                        } catch (Throwable th) {
                            LineagePublisherTaskImpl.LOG.error("Failed to publish events", th);
                        }
                    }
                } catch (InterruptedException e) {
                    LineagePublisherTaskImpl.LOG.info("Consumer thread interrupted while waiting for events to show up");
                }
            }
            LineagePublisherTaskImpl.LOG.info("Lineage event consumer finished");
        }
    }

    @Inject
    public LineagePublisherTaskImpl(Configuration configuration, StageLibraryTask stageLibraryTask) {
        super("Lineage Publisher Task");
        this.configuration = configuration;
        this.stageLibraryTask = stageLibraryTask;
    }

    @Override // _ss_com.streamsets.datacollector.lineage.LineagePublisherTask
    public void publishEvent(LineageEvent lineageEvent) {
        if (this.eventQueue != null) {
            this.eventQueue.add(lineageEvent);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // _ss_com.streamsets.datacollector.task.AbstractTask
    public void initTask() {
        String str = this.configuration.get(LineagePublisherConstants.CONFIG_LINEAGE_PUBLISHERS, (String) null);
        if (StringUtils.isEmpty(str)) {
            LOG.info("No publishers configured");
            return;
        }
        String[] split = str.split(com.amazonaws.util.StringUtils.COMMA_SEPARATOR);
        if (split.length != 1) {
            throw new IllegalStateException("Only one lineage publisher is supported at the moment");
        }
        String str2 = split[0];
        LineagePublisherDefinition definition = getDefinition(str2);
        LOG.info("Using lineage publisher named {} (backed by {}::{})", new Object[]{str2, definition.getLibraryDefinition().getName(), definition.getName()});
        createAndInitializeRuntime(definition, str2);
        this.eventQueue = new ArrayBlockingQueue<>(this.configuration.get(LineagePublisherConstants.CONFIG_LINEAGE_QUEUE_SIZE, 100));
        this.executorService = Executors.newSingleThreadExecutor();
        this.consumerRunnable = new EventQueueConsumer();
    }

    @Override // _ss_com.streamsets.datacollector.task.AbstractTask
    protected void runTask() {
        if (this.executorService != null) {
            this.consumerFuture = this.executorService.submit(this.consumerRunnable);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // _ss_com.streamsets.datacollector.task.AbstractTask
    public void stopTask() {
        if (this.consumerFuture != null) {
            this.consumerRunnable.continueRunning = false;
            try {
                this.consumerFuture.get();
            } catch (InterruptedException | ExecutionException e) {
                LOG.error("Exception while stopping consumer thread", e);
            }
        }
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
        if (this.eventQueue != null) {
            ArrayList arrayList = new ArrayList();
            this.eventQueue.drainTo(arrayList);
            this.publisherRuntime.publishEvents(arrayList);
        }
        if (this.publisherRuntime != null) {
            this.publisherRuntime.destroy();
        }
    }

    private LineagePublisherDefinition getDefinition(String str) {
        String configDef = LineagePublisherConstants.configDef(str);
        String str2 = this.configuration.get(configDef, (String) null);
        if (StringUtils.isEmpty(str2)) {
            throw new IllegalArgumentException(Utils.format("Missing definition '{}'", new Object[]{configDef}));
        }
        String[] split = str2.split("::");
        if (split.length != 2) {
            throw new IllegalStateException(Utils.format("Invalid definition '{}', expected $libraryName::$publisherName", new Object[]{str2}));
        }
        LineagePublisherDefinition lineagePublisherDefinition = this.stageLibraryTask.getLineagePublisherDefinition(split[0], split[1]);
        if (lineagePublisherDefinition == null) {
            throw new IllegalStateException(Utils.format("Can't find publisher '{}'", new Object[]{str2}));
        }
        return lineagePublisherDefinition;
    }

    private void createAndInitializeRuntime(LineagePublisherDefinition lineagePublisherDefinition, String str) {
        this.publisherRuntime = new LineagePublisherRuntime(lineagePublisherDefinition, instantiateLineagePublisher(lineagePublisherDefinition));
        try {
            List<LineagePublisher.ConfigIssue> init = this.publisherRuntime.init(new LineagePublisherContext(str, this.configuration));
            if (init.isEmpty()) {
                return;
            }
            Iterator<LineagePublisher.ConfigIssue> it = init.iterator();
            while (it.hasNext()) {
                LOG.error("Lineage init issue: {}", it.next());
            }
            throw new RuntimeException(Utils.format("Can't initialize lineage publisher ({} issues)", new Object[]{Integer.valueOf(init.size())}));
        } catch (Throwable th) {
            LOG.error("Failed initializing lineage publisher: {}", th.toString(), th);
            throw new RuntimeException("Failed initializing lineage publisher", th);
        }
    }

    private LineagePublisher instantiateLineagePublisher(LineagePublisherDefinition lineagePublisherDefinition) {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            try {
                Thread.currentThread().setContextClassLoader(lineagePublisherDefinition.getClassLoader());
                LineagePublisher newInstance = lineagePublisherDefinition.getKlass().newInstance();
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                return newInstance;
            } catch (IllegalAccessException | InstantiationException e) {
                LOG.error("Can't instantiate publisher", e);
                throw new RuntimeException("Can't instantiate publisher", e);
            }
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }
}
