/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.embedded;

import io.debezium.annotation.ThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.config.Instantiator;
import io.debezium.embedded.StopConnectorException;
import io.debezium.embedded.Transformations;
import io.debezium.embedded.spi.OffsetCommitPolicy;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.StopEngineException;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.util.Clock;
import io.debezium.util.DelayStrategy;
import io.debezium.util.VariableLatch;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceConnectorContext;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public final class EmbeddedEngine
implements DebeziumEngine<SourceRecord> {
    public static final Field ENGINE_NAME = Field.create((String)"name").withDescription("Unique name for this connector instance.").required();
    public static final Field CONNECTOR_CLASS = Field.create((String)"connector.class").withDescription("The Java class for the connector").required();
    public static final Field OFFSET_STORAGE = Field.create((String)"offset.storage").withDescription("The Java class that implements the `OffsetBackingStore` interface, used to periodically store offsets so that, upon restart, the connector can resume where it last left off.").withDefault(FileOffsetBackingStore.class.getName());
    public static final Field OFFSET_STORAGE_FILE_FILENAME = Field.create((String)"offset.storage.file.filename").withDescription("The file where offsets are to be stored. Required when 'offset.storage' is set to the " + FileOffsetBackingStore.class.getName() + " class.").withDefault("");
    public static final Field OFFSET_STORAGE_KAFKA_TOPIC = Field.create((String)"offset.storage.topic").withDescription("The name of the Kafka topic where offsets are to be stored. Required with other properties when 'offset.storage' is set to the " + KafkaOffsetBackingStore.class.getName() + " class.").withDefault("");
    public static final Field OFFSET_STORAGE_KAFKA_PARTITIONS = Field.create((String)"offset.storage.partitions").withType(ConfigDef.Type.INT).withDescription("The number of partitions used when creating the offset storage topic. Required with other properties when 'offset.storage' is set to the " + KafkaOffsetBackingStore.class.getName() + " class.");
    public static final Field OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR = Field.create((String)"offset.storage.replication.factor").withType(ConfigDef.Type.SHORT).withDescription("Replication factor used when creating the offset storage topic. Required with other properties when 'offset.storage' is set to the " + KafkaOffsetBackingStore.class.getName() + " class.");
    public static final Field OFFSET_FLUSH_INTERVAL_MS = Field.create((String)"offset.flush.interval.ms").withDescription("Interval at which to try committing offsets, given in milliseconds. Defaults to 1 minute (60,000 ms).").withDefault(60000L).withValidation(new Field.Validator[]{Field::isNonNegativeInteger});
    public static final Field OFFSET_COMMIT_TIMEOUT_MS = Field.create((String)"offset.flush.timeout.ms").withDescription("Time to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt, given in milliseconds. Defaults to 5 seconds (5000 ms).").withDefault(5000L).withValidation(new Field.Validator[]{Field::isPositiveInteger});
    public static final Field OFFSET_COMMIT_POLICY = Field.create((String)"offset.commit.policy").withDescription("The fully-qualified class name of the commit policy type. This class must implement the interface " + OffsetCommitPolicy.class.getName() + ". The default is a periodic commit policy based upon time intervals.").withDefault(OffsetCommitPolicy.PeriodicCommitOffsetPolicy.class.getName()).withValidation(new Field.Validator[]{Field::isClassName});
    public static final Field PREDICATES = Field.create((String)"predicates").withDisplayName("List of prefixes defining predicates.").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.LOW).withDescription("Optional list of predicates that can be assigned to transformations. The predicates are defined using '<predicate.prefix>.type' config option and configured using options '<predicate.prefix>.<option>'");
    public static final Field TRANSFORMS = Field.create((String)"transforms").withDisplayName("List of prefixes defining transformations.").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.LOW).withDescription("Optional list of single message transformations applied on the messages. The transforms are defined using '<transform.prefix>.type' config option and configured using options '<transform.prefix>.<option>'");
    private static final Field ERRORS_MAX_RETRIES = Field.create((String)"errors.max.retries").withDisplayName("The maximum number of retries").withType(ConfigDef.Type.INT).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDefault(-1).withValidation(new Field.Validator[]{Field::isInteger}).withDescription("The maximum number of retries on connection errors before failing (-1 = no limit, 0 = disabled, > 0 = num of retries).");
    private static final Field ERRORS_RETRY_DELAY_INITIAL_MS = Field.create((String)"errors.retry.delay.initial.ms").withDisplayName("Initial delay for retries").withType(ConfigDef.Type.INT).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDefault(300).withValidation(new Field.Validator[]{Field::isPositiveInteger}).withDescription("Initial delay (in ms) for retries when encountering connection errors. This value will be doubled upon every retry but won't exceed 'errors.retry.delay.max.ms'.");
    private static final Field ERRORS_RETRY_DELAY_MAX_MS = Field.create((String)"errors.retry.delay.max.ms").withDisplayName("Max delay between retries").withType(ConfigDef.Type.INT).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDefault(10000).withValidation(new Field.Validator[]{Field::isPositiveInteger}).withDescription("Max delay (in ms) between retries when encountering connection errors.");
    public static final Field.Set CONNECTOR_FIELDS = Field.setOf((Field[])new Field[]{ENGINE_NAME, CONNECTOR_CLASS});
    protected static final Field.Set ALL_FIELDS = CONNECTOR_FIELDS.with(new Field[]{OFFSET_STORAGE, OFFSET_STORAGE_FILE_FILENAME, OFFSET_FLUSH_INTERVAL_MS, OFFSET_COMMIT_TIMEOUT_MS, ERRORS_MAX_RETRIES, ERRORS_RETRY_DELAY_INITIAL_MS, ERRORS_RETRY_DELAY_MAX_MS});
    private static final Duration WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_DEFAULT = Duration.ofMinutes(5L);
    private static final String WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_PROP = "debezium.embedded.shutdown.pause.before.interrupt.ms";
    private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedEngine.class);
    private final Configuration config;
    private final Clock clock;
    private final ClassLoader classLoader;
    private final DebeziumEngine.ChangeConsumer<SourceRecord> handler;
    private final DebeziumEngine.CompletionCallback completionCallback;
    private final DebeziumEngine.ConnectorCallback connectorCallback;
    private final AtomicReference<Thread> runningThread = new AtomicReference();
    private final VariableLatch latch = new VariableLatch(0);
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final WorkerConfig workerConfig;
    private final CompletionResult completionResult;
    private long recordsSinceLastCommit = 0L;
    private long timeOfLastCommitMillis = 0L;
    private OffsetCommitPolicy offsetCommitPolicy;
    private SourceTask task;
    private final Transformations transformations;

    private static ChangeConsumer buildDefaultChangeConsumer(final Consumer<SourceRecord> consumer) {
        return new ChangeConsumer(){

            public void handleBatch(List<SourceRecord> records, DebeziumEngine.RecordCommitter<SourceRecord> committer) throws InterruptedException {
                for (SourceRecord record : records) {
                    try {
                        consumer.accept(record);
                        committer.markProcessed((Object)record);
                    }
                    catch (StopConnectorException | StopEngineException ex) {
                        committer.markProcessed((Object)record);
                        throw ex;
                    }
                }
                committer.markBatchFinished();
            }
        };
    }

    @Deprecated
    public static Builder create() {
        return new BuilderImpl();
    }

    private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, DebeziumEngine.ChangeConsumer<SourceRecord> handler, DebeziumEngine.CompletionCallback completionCallback, DebeziumEngine.ConnectorCallback connectorCallback, OffsetCommitPolicy offsetCommitPolicy) {
        this.config = config;
        this.handler = handler;
        this.classLoader = classLoader;
        this.clock = clock;
        this.completionCallback = completionCallback != null ? completionCallback : (success, msg, error) -> {
            if (!success) {
                LOGGER.error(msg, error);
            }
        };
        this.connectorCallback = connectorCallback;
        this.completionResult = new CompletionResult();
        this.offsetCommitPolicy = offsetCommitPolicy;
        assert (this.config != null);
        assert (this.handler != null);
        assert (this.classLoader != null);
        assert (this.clock != null);
        Map<String, String> internalConverterConfig = Collections.singletonMap("schemas.enable", "false");
        this.keyConverter = (Converter)Instantiator.getInstance((String)JsonConverter.class.getName());
        this.keyConverter.configure(internalConverterConfig, true);
        this.valueConverter = (Converter)Instantiator.getInstance((String)JsonConverter.class.getName());
        this.valueConverter.configure(internalConverterConfig, false);
        this.transformations = new Transformations(config);
        Map embeddedConfig = config.asMap(ALL_FIELDS);
        embeddedConfig.put("key.converter", JsonConverter.class.getName());
        embeddedConfig.put("value.converter", JsonConverter.class.getName());
        this.workerConfig = new EmbeddedConfig(embeddedConfig);
    }

    public boolean isRunning() {
        return this.runningThread.get() != null;
    }

    private void fail(String msg) {
        this.fail(msg, null);
    }

    private void fail(String msg, Throwable error) {
        if (this.completionResult.hasError()) {
            LOGGER.error(msg, error);
            return;
        }
        this.completionResult.handle(false, msg, error);
    }

    private void succeed(String msg) {
        this.completionResult.handle(true, msg, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        if (this.runningThread.compareAndSet(null, Thread.currentThread())) {
            String engineName = this.config.getString(ENGINE_NAME);
            String connectorClassName = this.config.getString(CONNECTOR_CLASS);
            Optional<DebeziumEngine.ConnectorCallback> connectorCallback = Optional.ofNullable(this.connectorCallback);
            this.latch.countUp();
            try {
                if (!this.config.validateAndRecord((Iterable)CONNECTOR_FIELDS, arg_0 -> ((Logger)LOGGER).error(arg_0))) {
                    this.fail("Failed to start connector with invalid configuration (see logs for actual errors)");
                    return;
                }
                SourceConnector connector = null;
                try {
                    Class<?> connectorClass = this.classLoader.loadClass(connectorClassName);
                    connector = (SourceConnector)connectorClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                }
                catch (Throwable t) {
                    this.fail("Unable to instantiate connector class '" + connectorClassName + "'", t);
                    this.latch.countDown();
                    this.runningThread.set(null);
                    this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                    return;
                }
                String offsetStoreClassName = this.config.getString(OFFSET_STORAGE);
                OffsetBackingStore offsetStore = null;
                try {
                    Class<?> offsetStoreClass = this.classLoader.loadClass(offsetStoreClassName);
                    offsetStore = (OffsetBackingStore)offsetStoreClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                }
                catch (Throwable t) {
                    this.fail("Unable to instantiate OffsetBackingStore class '" + offsetStoreClassName + "'", t);
                    this.latch.countDown();
                    this.runningThread.set(null);
                    this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                    return;
                }
                try {
                    offsetStore.configure(this.workerConfig);
                    offsetStore.start();
                }
                catch (Throwable t) {
                    this.fail("Unable to configure and start the '" + offsetStoreClassName + "' offset backing store", t);
                    offsetStore.stop();
                    this.latch.countDown();
                    this.runningThread.set(null);
                    this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                    return;
                }
                if (this.offsetCommitPolicy == null) {
                    try {
                        this.offsetCommitPolicy = (OffsetCommitPolicy)Instantiator.getInstanceWithProperties((String)this.config.getString(OFFSET_COMMIT_POLICY), (Properties)this.config.asProperties());
                    }
                    catch (Throwable t) {
                        this.fail("Unable to instantiate OffsetCommitPolicy class '" + offsetStoreClassName + "'", t);
                        this.latch.countDown();
                        this.runningThread.set(null);
                        this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                        return;
                    }
                }
                OffsetStorageReaderImpl offsetReader = new OffsetStorageReaderImpl(offsetStore, engineName, this.keyConverter, this.valueConverter);
                SourceConnectorContext context = new SourceConnectorContext(){
                    final /* synthetic */ OffsetStorageReader val$offsetReader;
                    {
                        this.val$offsetReader = offsetStorageReader;
                    }

                    public void requestTaskReconfiguration() {
                    }

                    public void raiseError(Exception e) {
                        EmbeddedEngine.this.fail(e.getMessage(), e);
                    }

                    public OffsetStorageReader offsetStorageReader() {
                        return this.val$offsetReader;
                    }
                };
                connector.initialize((ConnectorContext)context);
                OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, engineName, this.keyConverter, this.valueConverter);
                Duration commitTimeout = Duration.ofMillis(this.config.getLong(OFFSET_COMMIT_TIMEOUT_MS));
                try {
                    connector.start(this.workerConfig.originalsStrings());
                    connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStarted);
                    List taskConfigs = connector.taskConfigs(1);
                    Class taskClass = connector.taskClass();
                    if (taskConfigs.isEmpty()) {
                        String msg = "Unable to start connector's task class '" + taskClass.getName() + "' with no task configuration";
                        this.fail(msg);
                        return;
                    }
                    this.task = null;
                    try {
                        this.task = (SourceTask)taskClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                    }
                    catch (IllegalAccessException | InstantiationException t) {
                        this.fail("Unable to instantiate connector's task class '" + taskClass.getName() + "'", t);
                        try {
                            offsetStore.stop();
                        }
                        catch (Throwable t2) {
                            this.fail("Error while trying to stop the offset store", t2);
                        }
                        finally {
                            try {
                                connector.stop();
                                connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStopped);
                            }
                            catch (Throwable t3) {
                                this.fail("Error while trying to stop connector class '" + connectorClassName + "'", t3);
                            }
                        }
                        this.latch.countDown();
                        this.runningThread.set(null);
                        this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                        return;
                    }
                    try {
                        SourceTaskContext taskContext = new SourceTaskContext(){
                            final /* synthetic */ OffsetStorageReader val$offsetReader;
                            {
                                this.val$offsetReader = offsetStorageReader;
                            }

                            public OffsetStorageReader offsetStorageReader() {
                                return this.val$offsetReader;
                            }

                            public Map<String, String> configs() {
                                return null;
                            }
                        };
                        this.task.initialize(taskContext);
                        this.task.start((Map)taskConfigs.get(0));
                        connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::taskStarted);
                    }
                    catch (Throwable t) {
                        try {
                            LOGGER.debug("Stopping the task");
                            this.task.stop();
                        }
                        catch (Throwable tstop) {
                            LOGGER.info("Error while trying to stop the task");
                        }
                        Configuration config = Configuration.from((Map)((Map)taskConfigs.get(0))).withMaskedPasswords();
                        String msg = "Unable to initialize and start connector's task class '" + taskClass.getName() + "' with config: " + config;
                        this.fail(msg, t);
                        try {
                            offsetStore.stop();
                        }
                        catch (Throwable t4) {
                            this.fail("Error while trying to stop the offset store", t4);
                        }
                        finally {
                            try {
                                connector.stop();
                                connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStopped);
                            }
                            catch (Throwable t5) {
                                this.fail("Error while trying to stop connector class '" + connectorClassName + "'", t5);
                            }
                        }
                        this.latch.countDown();
                        this.runningThread.set(null);
                        this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
                        return;
                    }
                    this.recordsSinceLastCommit = 0L;
                    Throwable handlerError = null;
                    try {
                        this.timeOfLastCommitMillis = this.clock.currentTimeInMillis();
                        RecordCommitter committer = this.buildRecordCommitter(offsetWriter, this.task, commitTimeout);
                        while (this.runningThread.get() != null) {
                            List changeRecords;
                            block129: {
                                changeRecords = null;
                                try {
                                    LOGGER.debug("Embedded engine is polling task for records on thread {}", (Object)this.runningThread.get());
                                    changeRecords = this.task.poll();
                                    LOGGER.debug("Embedded engine returned from polling task for records");
                                }
                                catch (InterruptedException e) {
                                    LOGGER.debug("Embedded engine interrupted on thread {} while polling the task for records", (Object)this.runningThread.get());
                                    if (this.runningThread.get() == Thread.currentThread()) {
                                        Thread.currentThread().interrupt();
                                    }
                                    break;
                                }
                                catch (RetriableException e) {
                                    int maxRetries = this.getErrorsMaxRetries();
                                    LOGGER.info("Retriable exception thrown, connector will be restarted; errors.max.retries={}", (Object)maxRetries, (Object)e);
                                    if (maxRetries == 0) break block129;
                                    DelayStrategy delayStrategy = this.delayStrategy(this.config);
                                    int totalRetries = 0;
                                    boolean startedSuccessfully = false;
                                    while (!startedSuccessfully) {
                                        try {
                                            LOGGER.info("Starting connector, attempt {}", (Object)(++totalRetries));
                                            this.task.stop();
                                            this.task.start((Map)taskConfigs.get(0));
                                            startedSuccessfully = true;
                                        }
                                        catch (Exception ex) {
                                            if (totalRetries == maxRetries) {
                                                LOGGER.error("Can't start the connector, max retries to connect exceeded; stopping connector...", (Throwable)ex);
                                                throw ex;
                                            }
                                            LOGGER.error("Can't start the connector, will retry later...", (Throwable)ex);
                                        }
                                        delayStrategy.sleepWhen(!startedSuccessfully);
                                    }
                                }
                            }
                            try {
                                if (changeRecords != null && !changeRecords.isEmpty()) {
                                    LOGGER.debug("Received {} records from the task", (Object)changeRecords.size());
                                    changeRecords = changeRecords.stream().map(this.transformations::transform).filter(x -> x != null).collect(Collectors.toList());
                                }
                                if (changeRecords != null && !changeRecords.isEmpty()) {
                                    LOGGER.debug("Received {} transformed records from the task", (Object)changeRecords.size());
                                    try {
                                        this.handler.handleBatch(changeRecords, (DebeziumEngine.RecordCommitter)committer);
                                        continue;
                                    }
                                    catch (StopConnectorException e) {
                                        break;
                                    }
                                }
                                LOGGER.debug("Received no records from the task");
                            }
                            catch (Throwable t) {
                                handlerError = t;
                                break;
                            }
                        }
                    }
                    finally {
                        if (handlerError != null) {
                            this.fail("Stopping connector after error in the application's handler method: " + handlerError.getMessage(), handlerError);
                        }
                        try {
                            LOGGER.info("Stopping the task and engine");
                            this.task.stop();
                            connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::taskStopped);
                            this.commitOffsets(offsetWriter, commitTimeout, this.task);
                            if (handlerError == null) {
                                this.succeed("Connector '" + connectorClassName + "' completed normally.");
                            }
                        }
                        catch (InterruptedException e) {
                            LOGGER.debug("Interrupted while committing offsets");
                            Thread.currentThread().interrupt();
                        }
                        catch (Throwable t) {
                            this.fail("Error while trying to stop the task and commit the offsets", t);
                        }
                    }
                }
                catch (Throwable t) {
                    this.fail("Error while trying to run connector class '" + connectorClassName + "'", t);
                }
                finally {
                    try {
                        offsetStore.stop();
                    }
                    catch (Throwable t) {
                        this.fail("Error while trying to stop the offset store", t);
                    }
                    finally {
                        try {
                            connector.stop();
                            connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStopped);
                        }
                        catch (Throwable t) {
                            this.fail("Error while trying to stop connector class '" + connectorClassName + "'", t);
                        }
                    }
                }
            }
            finally {
                this.latch.countDown();
                this.runningThread.set(null);
                this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
            }
        }
    }

    private int getErrorsMaxRetries() {
        int maxRetries = this.config.getInteger(ERRORS_MAX_RETRIES);
        return maxRetries;
    }

    protected RecordCommitter buildRecordCommitter(final OffsetStorageWriter offsetWriter, final SourceTask task, final Duration commitTimeout) {
        return new RecordCommitter(){

            public synchronized void markProcessed(SourceRecord record) throws InterruptedException {
                task.commitRecord(record);
                ++EmbeddedEngine.this.recordsSinceLastCommit;
                offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
            }

            public synchronized void markBatchFinished() throws InterruptedException {
                EmbeddedEngine.this.maybeFlush(offsetWriter, EmbeddedEngine.this.offsetCommitPolicy, commitTimeout, task);
            }

            public synchronized void markProcessed(SourceRecord record, DebeziumEngine.Offsets sourceOffsets) throws InterruptedException {
                SourceRecordOffsets offsets = (SourceRecordOffsets)sourceOffsets;
                SourceRecord recordWithUpdatedOffsets = new SourceRecord(record.sourcePartition(), offsets.getOffsets(), record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp(), (Iterable)record.headers());
                this.markProcessed(recordWithUpdatedOffsets);
            }

            public DebeziumEngine.Offsets buildOffsets() {
                return new SourceRecordOffsets();
            }
        };
    }

    protected void maybeFlush(OffsetStorageWriter offsetWriter, OffsetCommitPolicy policy, Duration commitTimeout, SourceTask task) throws InterruptedException {
        long timeSinceLastCommitMillis = this.clock.currentTimeInMillis() - this.timeOfLastCommitMillis;
        if (policy.performCommit(this.recordsSinceLastCommit, Duration.ofMillis(timeSinceLastCommitMillis))) {
            this.commitOffsets(offsetWriter, commitTimeout, task);
        }
    }

    protected void commitOffsets(OffsetStorageWriter offsetWriter, Duration commitTimeout, SourceTask task) throws InterruptedException {
        long started = this.clock.currentTimeInMillis();
        long timeout = started + commitTimeout.toMillis();
        if (!offsetWriter.beginFlush()) {
            return;
        }
        Future flush = offsetWriter.doFlush(this::completedFlush);
        if (flush == null) {
            return;
        }
        try {
            flush.get(Math.max(timeout - this.clock.currentTimeInMillis(), 0L), TimeUnit.MILLISECONDS);
            task.commit();
            this.recordsSinceLastCommit = 0L;
            this.timeOfLastCommitMillis = this.clock.currentTimeInMillis();
        }
        catch (InterruptedException e) {
            LOGGER.warn("Flush of {} offsets interrupted, cancelling", (Object)this);
            offsetWriter.cancelFlush();
            if (this.runningThread.get() == Thread.currentThread()) {
                Thread.currentThread().interrupt();
                throw e;
            }
        }
        catch (ExecutionException e) {
            LOGGER.error("Flush of {} offsets threw an unexpected exception: ", (Object)this, (Object)e);
            offsetWriter.cancelFlush();
        }
        catch (TimeoutException e) {
            LOGGER.error("Timed out waiting to flush {} offsets to storage", (Object)this);
            offsetWriter.cancelFlush();
        }
    }

    protected void completedFlush(Throwable error, Void result) {
        if (error != null) {
            LOGGER.error("Failed to flush {} offsets to storage: ", (Object)this, (Object)error);
        } else {
            LOGGER.trace("Finished flushing {} offsets to storage", (Object)this);
        }
    }

    public boolean stop() {
        LOGGER.info("Stopping the embedded engine");
        Thread thread = this.runningThread.getAndSet(null);
        if (thread != null) {
            try {
                Duration timeout = Duration.ofMillis(Long.valueOf(System.getProperty(WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_PROP, Long.toString(WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_DEFAULT.toMillis()))));
                LOGGER.info("Waiting for {} for connector to stop", (Object)timeout);
                this.latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            LOGGER.debug("Interrupting the embedded engine's thread {} (already interrupted: {})", (Object)thread, (Object)thread.isInterrupted());
            thread.interrupt();
            return true;
        }
        return false;
    }

    public void close() throws IOException {
        this.stop();
    }

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return this.latch.await(timeout, unit);
    }

    public String toString() {
        return "EmbeddedEngine{id=" + this.config.getString(ENGINE_NAME) + "}";
    }

    public void runWithTask(Consumer<SourceTask> consumer) {
        consumer.accept(this.task);
    }

    private DelayStrategy delayStrategy(Configuration config) {
        return DelayStrategy.exponential((Duration)Duration.ofMillis(config.getInteger(ERRORS_RETRY_DELAY_INITIAL_MS)), (Duration)Duration.ofMillis(config.getInteger(ERRORS_RETRY_DELAY_MAX_MS)));
    }

    public static final class BuilderImpl
    implements Builder {
        private Configuration config;
        private DebeziumEngine.ChangeConsumer<SourceRecord> handler;
        private ClassLoader classLoader;
        private Clock clock;
        private DebeziumEngine.CompletionCallback completionCallback;
        private DebeziumEngine.ConnectorCallback connectorCallback;
        private OffsetCommitPolicy offsetCommitPolicy = null;

        @Override
        public Builder using(Configuration config) {
            this.config = config;
            return this;
        }

        public Builder using(Properties config) {
            this.config = Configuration.from((Properties)config);
            return this;
        }

        @Override
        public Builder using(ClassLoader classLoader) {
            this.classLoader = classLoader;
            return this;
        }

        @Override
        public Builder using(Clock clock) {
            this.clock = clock;
            return this;
        }

        public Builder using(DebeziumEngine.CompletionCallback completionCallback) {
            this.completionCallback = completionCallback;
            return this;
        }

        public Builder using(DebeziumEngine.ConnectorCallback connectorCallback) {
            this.connectorCallback = connectorCallback;
            return this;
        }

        @Override
        public Builder using(OffsetCommitPolicy offsetCommitPolicy) {
            this.offsetCommitPolicy = offsetCommitPolicy;
            return this;
        }

        @Override
        public Builder notifying(Consumer<SourceRecord> consumer) {
            this.handler = EmbeddedEngine.buildDefaultChangeConsumer(consumer);
            return this;
        }

        @Override
        public Builder notifying(DebeziumEngine.ChangeConsumer<SourceRecord> handler) {
            this.handler = handler;
            if (!this.config.hasKey(CommonConnectorConfig.TOMBSTONES_ON_DELETE.name()) && !handler.supportsTombstoneEvents()) {
                LOGGER.info("Consumer doesn't support tombstone events, setting '{}' to false.", (Object)CommonConnectorConfig.TOMBSTONES_ON_DELETE.name());
                this.config = ((Configuration.Builder)this.config.edit().with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)).build();
            }
            return this;
        }

        public Builder using(final java.time.Clock clock) {
            return this.using(new Clock(){

                public long currentTimeInMillis() {
                    return clock.millis();
                }
            });
        }

        @Override
        public EmbeddedEngine build() {
            if (this.classLoader == null) {
                this.classLoader = this.getClass().getClassLoader();
            }
            if (this.clock == null) {
                this.clock = Clock.system();
            }
            Objects.requireNonNull(this.config, "A connector configuration must be specified.");
            Objects.requireNonNull(this.handler, "A connector consumer or changeHandler must be specified.");
            return new EmbeddedEngine(this.config, this.classLoader, this.clock, this.handler, this.completionCallback, this.connectorCallback, this.offsetCommitPolicy);
        }

        @Override
        public Builder using(CompletionCallback completionCallback) {
            return this.using((DebeziumEngine.CompletionCallback)completionCallback);
        }

        @Override
        public Builder using(ConnectorCallback connectorCallback) {
            return this.using((DebeziumEngine.ConnectorCallback)connectorCallback);
        }
    }

    public static class CompletionResult
    implements CompletionCallback {
        private final CompletionCallback delegate;
        private final CountDownLatch completed = new CountDownLatch(1);
        private boolean success;
        private String message;
        private Throwable error;

        public CompletionResult() {
            this(null);
        }

        public CompletionResult(CompletionCallback delegate) {
            this.delegate = delegate;
        }

        public void handle(boolean success, String message, Throwable error) {
            this.success = success;
            this.message = message;
            this.error = error;
            this.completed.countDown();
            if (this.delegate != null) {
                this.delegate.handle(success, message, error);
            }
        }

        public void await() throws InterruptedException {
            this.completed.await();
        }

        public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
            return this.completed.await(timeout, unit);
        }

        public boolean hasCompleted() {
            return this.completed.getCount() == 0L;
        }

        public boolean success() {
            return this.success;
        }

        public String message() {
            return this.message;
        }

        public Throwable error() {
            return this.error;
        }

        public boolean hasError() {
            return this.error != null;
        }
    }

    protected static class EmbeddedConfig
    extends WorkerConfig {
        private static final ConfigDef CONFIG;

        protected EmbeddedConfig(Map<String, String> props) {
            super(CONFIG, props);
        }

        static {
            ConfigDef config = EmbeddedConfig.baseConfigDef();
            Field.group((ConfigDef)config, (String)"file", (Field[])new Field[]{OFFSET_STORAGE_FILE_FILENAME});
            Field.group((ConfigDef)config, (String)"kafka", (Field[])new Field[]{OFFSET_STORAGE_KAFKA_TOPIC});
            Field.group((ConfigDef)config, (String)"kafka", (Field[])new Field[]{OFFSET_STORAGE_KAFKA_PARTITIONS});
            Field.group((ConfigDef)config, (String)"kafka", (Field[])new Field[]{OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR});
            CONFIG = config;
        }
    }

    @ThreadSafe
    @Deprecated
    public static interface RecordCommitter
    extends DebeziumEngine.RecordCommitter<SourceRecord> {
    }

    protected class SourceRecordOffsets
    implements DebeziumEngine.Offsets {
        private HashMap<String, Object> offsets = new HashMap();

        protected SourceRecordOffsets() {
        }

        public void set(String key, Object value) {
            this.offsets.put(key, value);
        }

        protected HashMap<String, Object> getOffsets() {
            return this.offsets;
        }
    }

    @Deprecated
    public static interface Builder
    extends DebeziumEngine.Builder<SourceRecord> {
        public Builder using(Configuration var1);

        public Builder using(Clock var1);

        public Builder notifying(Consumer<SourceRecord> var1);

        public Builder notifying(DebeziumEngine.ChangeConsumer<SourceRecord> var1);

        public Builder using(ClassLoader var1);

        public Builder using(CompletionCallback var1);

        public Builder using(ConnectorCallback var1);

        public Builder using(OffsetCommitPolicy var1);

        public EmbeddedEngine build();
    }

    @Deprecated
    public static interface ChangeConsumer
    extends DebeziumEngine.ChangeConsumer<SourceRecord> {
    }

    @Deprecated
    public static interface ConnectorCallback
    extends DebeziumEngine.ConnectorCallback {
    }

    @Deprecated
    public static interface CompletionCallback
    extends DebeziumEngine.CompletionCallback {
    }
}

