/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.embedded;

import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.embedded.OffsetCommitPolicy;
import io.debezium.util.Clock;
import io.debezium.util.VariableLatch;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public final class EmbeddedEngine
implements Runnable {
    public static final Field ENGINE_NAME = Field.create((String)"name").withDescription("Unique name for this connector instance.").withValidation(new Field.Validator[]{Field::isRequired});
    public static final Field CONNECTOR_CLASS = Field.create((String)"connector.class").withDescription("The Java class for the connector").withValidation(new Field.Validator[]{Field::isRequired});
    public static final Field OFFSET_STORAGE = Field.create((String)"offset.storage").withDescription("The Java class that implements the `OffsetBackingStore` interface, used to periodically store offsets so that, upon restart, the connector can resume where it last left off.").withDefault(FileOffsetBackingStore.class.getName());
    public static final Field OFFSET_STORAGE_FILE_FILENAME = Field.create((String)"offset.storage.file.filename").withDescription("The file where offsets are to be stored. Required when 'offset.storage' is set to the " + FileOffsetBackingStore.class.getName() + " class.").withDefault("");
    public static final Field OFFSET_STORAGE_KAFKA_TOPIC = Field.create((String)"offset.storage.topic").withDescription("The name of the Kafka topic where offsets are to be stored. Required with other properties when 'offset.storage' is set to the " + KafkaOffsetBackingStore.class.getName() + " class.").withDefault("");
    public static final Field OFFSET_FLUSH_INTERVAL_MS = Field.create((String)"offset.flush.interval.ms").withDescription("Interval at which to try committing offsets. The default is 1 minute.").withDefault(60000L).withValidation(new Field.Validator[]{Field::isNonNegativeInteger});
    public static final Field OFFSET_COMMIT_TIMEOUT_MS = Field.create((String)"offset.flush.timeout.ms").withDescription("Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt.").withDefault(5000L).withValidation(new Field.Validator[]{Field::isPositiveInteger});
    protected static final Field INTERNAL_KEY_CONVERTER_CLASS = Field.create((String)"internal.key.converter").withDescription("The Converter class that should be used to serialize and deserialize key data for offsets.").withDefault(JsonConverter.class.getName());
    protected static final Field INTERNAL_VALUE_CONVERTER_CLASS = Field.create((String)"internal.value.converter").withDescription("The Converter class that should be used to serialize and deserialize value data for offsets.").withDefault(JsonConverter.class.getName());
    public static final Field.Set CONNECTOR_FIELDS = Field.setOf((Field[])new Field[]{ENGINE_NAME, CONNECTOR_CLASS});
    protected static final Field.Set ALL_FIELDS = CONNECTOR_FIELDS.with(new Field[]{OFFSET_STORAGE, OFFSET_STORAGE_FILE_FILENAME, OFFSET_FLUSH_INTERVAL_MS, OFFSET_COMMIT_TIMEOUT_MS, INTERNAL_KEY_CONVERTER_CLASS, INTERNAL_VALUE_CONVERTER_CLASS});
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final Configuration config;
    private final Clock clock;
    private final ClassLoader classLoader;
    private final Consumer<SourceRecord> consumer;
    private final CompletionCallback completionCallback;
    private final AtomicReference<Thread> runningThread = new AtomicReference();
    private final VariableLatch latch = new VariableLatch(0);
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final WorkerConfig workerConfig;
    private long recordsSinceLastCommit = 0L;
    private long timeSinceLastCommitMillis = 0L;

    public static Builder create() {
        return new Builder(){
            private Configuration config;
            private Consumer<SourceRecord> consumer;
            private ClassLoader classLoader;
            private Clock clock;
            private CompletionCallback completionCallback;

            @Override
            public Builder using(Configuration config) {
                this.config = config;
                return this;
            }

            @Override
            public Builder using(ClassLoader classLoader) {
                this.classLoader = classLoader;
                return this;
            }

            @Override
            public Builder using(Clock clock) {
                this.clock = clock;
                return this;
            }

            @Override
            public Builder using(CompletionCallback completionCallback) {
                this.completionCallback = completionCallback;
                return this;
            }

            @Override
            public Builder notifying(Consumer<SourceRecord> consumer) {
                this.consumer = consumer;
                return this;
            }

            @Override
            public EmbeddedEngine build() {
                if (this.classLoader == null) {
                    this.classLoader = this.getClass().getClassLoader();
                }
                if (this.clock == null) {
                    this.clock = Clock.system();
                }
                Objects.requireNonNull(this.config, "A connector configuration must be specified.");
                Objects.requireNonNull(this.consumer, "A connector consumer must be specified.");
                return new EmbeddedEngine(this.config, this.classLoader, this.clock, this.consumer, this.completionCallback);
            }
        };
    }

    private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, Consumer<SourceRecord> consumer, CompletionCallback completionCallback) {
        this.config = config;
        this.consumer = consumer;
        this.classLoader = classLoader;
        this.clock = clock;
        CompletionCallback completionCallback2 = this.completionCallback = completionCallback != null ? completionCallback : (success, msg, error) -> {
            if (success) {
                this.logger.error(msg, error);
            }
        };
        assert (this.config != null);
        assert (this.consumer != null);
        assert (this.classLoader != null);
        assert (this.clock != null);
        this.keyConverter = (Converter)config.getInstance(INTERNAL_KEY_CONVERTER_CLASS, Converter.class, () -> this.classLoader);
        this.keyConverter.configure(config.subset(INTERNAL_KEY_CONVERTER_CLASS.name() + ".", true).asMap(), true);
        this.valueConverter = (Converter)config.getInstance(INTERNAL_VALUE_CONVERTER_CLASS, Converter.class, () -> this.classLoader);
        Configuration valueConverterConfig = config;
        if (this.valueConverter instanceof JsonConverter) {
            valueConverterConfig = ((Configuration.Builder)config.edit().with(INTERNAL_VALUE_CONVERTER_CLASS + ".schemas.enable", false)).build();
        }
        this.valueConverter.configure(valueConverterConfig.subset(INTERNAL_VALUE_CONVERTER_CLASS.name() + ".", true).asMap(), false);
        Map embeddedConfig = config.asMap(ALL_FIELDS);
        embeddedConfig.put("key.converter", JsonConverter.class.getName());
        embeddedConfig.put("value.converter", JsonConverter.class.getName());
        this.workerConfig = new EmbeddedConfig(embeddedConfig);
    }

    protected boolean isRunning() {
        return this.runningThread.get() != null;
    }

    private void fail(String msg) {
        this.fail(msg, null);
    }

    private void fail(String msg, Throwable error) {
        this.completionCallback.handle(false, msg, error);
    }

    private void succeed(String msg) {
        this.completionCallback.handle(true, msg, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    @Override
    public void run() {
        block58: {
            if (this.runningThread.compareAndSet(null, Thread.currentThread())) {
                String engineName = this.config.getString(ENGINE_NAME);
                String connectorClassName = this.config.getString(CONNECTOR_CLASS);
                this.latch.countUp();
                try {
                    if (!this.config.validateAndRecord((Iterable)CONNECTOR_FIELDS, arg_0 -> ((Logger)this.logger).error(arg_0))) {
                        this.fail("Failed to start connector with invalid configuration (see logs for actual errors)");
                        return;
                    }
                    SourceConnector connector = null;
                    try {
                        Class<?> connectorClass = this.classLoader.loadClass(connectorClassName);
                        connector = (SourceConnector)connectorClass.newInstance();
                    }
                    catch (Throwable t) {
                        this.fail("Unable to instantiate connector class '" + connectorClassName + "'", t);
                        this.latch.countDown();
                        this.runningThread.set(null);
                        return;
                    }
                    String offsetStoreClassName = this.config.getString(OFFSET_STORAGE);
                    OffsetBackingStore offsetStore = null;
                    try {
                        Class<?> offsetStoreClass = this.classLoader.loadClass(offsetStoreClassName);
                        offsetStore = (OffsetBackingStore)offsetStoreClass.newInstance();
                    }
                    catch (Throwable t) {
                        this.fail("Unable to instantiate OffsetBackingStore class '" + offsetStoreClassName + "'", t);
                        this.latch.countDown();
                        this.runningThread.set(null);
                        return;
                    }
                    try {
                        offsetStore.configure(this.workerConfig);
                        offsetStore.start();
                    }
                    catch (Throwable t) {
                        this.fail("Unable to configure and start the '" + offsetStoreClassName + "' offset backing store", t);
                        this.latch.countDown();
                        this.runningThread.set(null);
                        return;
                    }
                    long offsetPeriodMs = this.config.getLong(OFFSET_FLUSH_INTERVAL_MS);
                    OffsetCommitPolicy offsetCommitPolicy = OffsetCommitPolicy.periodic(offsetPeriodMs, TimeUnit.MILLISECONDS);
                    ConnectorContext context = new ConnectorContext(){

                        public void requestTaskReconfiguration() {
                        }

                        public void raiseError(Exception e) {
                            EmbeddedEngine.this.fail(e.getMessage(), e);
                        }
                    };
                    connector.initialize(context);
                    OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, engineName, this.keyConverter, this.valueConverter);
                    OffsetStorageReaderImpl offsetReader = new OffsetStorageReaderImpl(offsetStore, engineName, this.keyConverter, this.valueConverter);
                    long commitTimeoutMs = this.config.getLong(OFFSET_COMMIT_TIMEOUT_MS);
                    connector.start(this.config.asMap());
                    List taskConfigs = connector.taskConfigs(1);
                    Class taskClass = connector.taskClass();
                    SourceTask task = null;
                    try {
                        task = (SourceTask)taskClass.newInstance();
                    }
                    catch (IllegalAccessException | InstantiationException t) {
                        this.fail("Unable to instantiate connector's task class '" + taskClass.getName() + "'", t);
                        try {
                            offsetStore.stop();
                        }
                        finally {
                            connector.stop();
                        }
                        this.latch.countDown();
                        this.runningThread.set(null);
                        return;
                    }
                    try {
                        SourceTaskContext taskContext = () -> EmbeddedEngine.lambda$run$3((OffsetStorageReader)offsetReader);
                        task.initialize(taskContext);
                        task.start((Map)taskConfigs.get(0));
                    }
                    catch (Throwable t) {
                        String msg = "Unable to initialize and start connector's task class '" + taskClass.getName() + "' with config: " + taskConfigs.get(0);
                        this.fail(msg, t);
                        try {
                            offsetStore.stop();
                        }
                        finally {
                            connector.stop();
                        }
                        this.latch.countDown();
                        this.runningThread.set(null);
                        return;
                    }
                    this.recordsSinceLastCommit = 0L;
                    Throwable handlerError = null;
                    this.timeSinceLastCommitMillis = this.clock.currentTimeInMillis();
                    while (this.runningThread.get() != null && handlerError == null) {
                        try {
                            this.logger.debug("Embedded engine is polling task for records");
                            List changeRecords = task.poll();
                            if (changeRecords != null && !changeRecords.isEmpty()) {
                                this.logger.debug("Received {} records from the task", (Object)changeRecords.size());
                                for (SourceRecord record : changeRecords) {
                                    try {
                                        this.consumer.accept(record);
                                    }
                                    catch (Throwable t) {
                                        handlerError = t;
                                        break;
                                    }
                                    offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
                                    ++this.recordsSinceLastCommit;
                                }
                                this.maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeoutMs);
                                continue;
                            }
                            this.logger.debug("Received no records from the task");
                        }
                        catch (InterruptedException e) {
                            this.maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeoutMs);
                            Thread.interrupted();
                            break;
                        }
                    }
                    try {
                        this.logger.debug("Stopping the task and engine");
                        task.stop();
                    }
                    finally {
                        this.commitOffsets(offsetWriter, commitTimeoutMs);
                        if (handlerError != null) {
                            this.fail("Stopping connector after error in the application's handler method: " + handlerError.getMessage(), handlerError);
                        } else {
                            this.succeed("Connector '" + connectorClassName + "' completed normally.");
                        }
                    }
                    try {
                        offsetStore.stop();
                        break block58;
                    }
                    finally {
                        connector.stop();
                    }
                    catch (Throwable t) {
                        try {
                            this.fail("Error while trying to run connector class '" + connectorClassName + "'", t);
                        }
                        catch (Throwable throwable) {
                            try {
                                offsetStore.stop();
                            }
                            finally {
                                connector.stop();
                            }
                            throw throwable;
                        }
                        try {
                            offsetStore.stop();
                        }
                        finally {
                            connector.stop();
                        }
                        this.latch.countDown();
                        this.runningThread.set(null);
                        return;
                    }
                }
                finally {
                    this.latch.countDown();
                    this.runningThread.set(null);
                }
            }
        }
    }

    protected void maybeFlush(OffsetStorageWriter offsetWriter, OffsetCommitPolicy policy, long commitTimeoutMs) {
        if (policy.performCommit(this.recordsSinceLastCommit, this.timeSinceLastCommitMillis, TimeUnit.MILLISECONDS)) {
            this.commitOffsets(offsetWriter, commitTimeoutMs);
        }
    }

    protected void commitOffsets(OffsetStorageWriter offsetWriter, long commitTimeoutMs) {
        long started = this.clock.currentTimeInMillis();
        long timeout = started + commitTimeoutMs;
        if (!offsetWriter.beginFlush()) {
            return;
        }
        Future flush = offsetWriter.doFlush(this::completedFlush);
        if (flush == null) {
            return;
        }
        try {
            flush.get(Math.max(timeout - this.clock.currentTimeInMillis(), 0L), TimeUnit.MILLISECONDS);
            this.recordsSinceLastCommit = 0L;
            this.timeSinceLastCommitMillis = this.clock.currentTimeInMillis();
        }
        catch (InterruptedException e) {
            this.logger.warn("Flush of {} offsets interrupted, cancelling", (Object)this);
            offsetWriter.cancelFlush();
        }
        catch (ExecutionException e) {
            this.logger.error("Flush of {} offsets threw an unexpected exception: ", (Object)this, (Object)e);
            offsetWriter.cancelFlush();
        }
        catch (TimeoutException e) {
            this.logger.error("Timed out waiting to flush {} offsets to storage", (Object)this);
            offsetWriter.cancelFlush();
        }
    }

    protected void completedFlush(Throwable error, Void result) {
        if (error != null) {
            this.logger.error("Failed to flush {} offsets to storage: ", (Object)this, (Object)error);
        } else {
            this.logger.trace("Finished flushing {} offsets to storage", (Object)this);
        }
    }

    public boolean stop() {
        this.logger.debug("Stopping the embedded engine");
        Thread thread = this.runningThread.getAndSet(null);
        if (thread != null) {
            thread.interrupt();
            return true;
        }
        return false;
    }

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return this.latch.await(timeout, unit);
    }

    public String toString() {
        return "EmbeddedConnector{id=" + this.config.getString(ENGINE_NAME) + '}';
    }

    private static /* synthetic */ OffsetStorageReader lambda$run$3(OffsetStorageReader offsetReader) {
        return offsetReader;
    }

    protected static class EmbeddedConfig
    extends WorkerConfig {
        private static final ConfigDef CONFIG;

        protected EmbeddedConfig(Map<String, String> props) {
            super(CONFIG, props);
        }

        static {
            ConfigDef config = EmbeddedConfig.baseConfigDef();
            Field.group((ConfigDef)config, (String)"file", (Field[])new Field[]{OFFSET_STORAGE_FILE_FILENAME});
            Field.group((ConfigDef)config, (String)"kafka", (Field[])new Field[]{OFFSET_STORAGE_KAFKA_TOPIC});
            CONFIG = config;
        }
    }

    public static interface Builder {
        public Builder notifying(Consumer<SourceRecord> var1);

        public Builder using(Configuration var1);

        public Builder using(ClassLoader var1);

        public Builder using(Clock var1);

        public Builder using(CompletionCallback var1);

        public EmbeddedEngine build();
    }

    public static class CompletionResult
    implements CompletionCallback {
        private final CountDownLatch completed = new CountDownLatch(1);
        private boolean success;
        private String message;
        private Throwable error;

        @Override
        public void handle(boolean success, String message, Throwable error) {
            this.success = success;
            this.message = message;
            this.error = error;
            this.completed.countDown();
        }

        public void await() throws InterruptedException {
            this.completed.await();
        }

        public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
            return this.completed.await(timeout, unit);
        }

        public boolean hasCompleted() {
            return this.completed.getCount() == 0L;
        }

        public boolean success() {
            return this.success;
        }

        public String message() {
            return this.message;
        }

        public Throwable error() {
            return this.error;
        }

        public boolean hasError() {
            return this.error != null;
        }
    }

    public static interface CompletionCallback {
        public void handle(boolean var1, String var2, Throwable var3);
    }
}

