/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cockroachdb;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.cockroachdb.CockroachDBChangeEventCreator;
import io.debezium.connector.cockroachdb.CockroachDBConnectorConfig;
import io.debezium.connector.cockroachdb.CockroachDBErrorHandler;
import io.debezium.connector.cockroachdb.CockroachDBEventMetadataProvider;
import io.debezium.connector.cockroachdb.CockroachDBOffsetContext;
import io.debezium.connector.cockroachdb.CockroachDBPartition;
import io.debezium.connector.cockroachdb.CockroachDBSchema;
import io.debezium.connector.cockroachdb.CockroachDBStreamingChangeEventSource;
import io.debezium.connector.cockroachdb.CockroachDBTaskContext;
import io.debezium.connector.cockroachdb.Module;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.ChangeEventCreator;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CockroachDBConnectorTask
extends SourceTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(CockroachDBConnectorTask.class);
    private static final String CONTEXT_NAME = "cockroachdb-connector-task";
    private volatile CockroachDBTaskContext taskContext;
    private volatile CockroachDBSchema schema;
    private volatile CockroachDBErrorHandler errorHandler;
    private volatile CockroachDBOffsetContext offsetContext;
    private volatile CockroachDBPartition partition;
    private volatile boolean running = false;
    private volatile List<SourceRecord> pendingRecords = new ArrayList<SourceRecord>();

    public String version() {
        return Module.version();
    }

    public void start(Map<String, String> props) {
        if (props == null) {
            throw new IllegalArgumentException("Configuration properties cannot be null");
        }
        Configuration config = Configuration.from(props);
        final CockroachDBConnectorConfig connectorConfig = new CockroachDBConnectorConfig(config);
        LOGGER.info("Starting CockroachDB connector task with configuration: {}", (Object)config.withMaskedPasswords());
        TopicNamingStrategy<TableId> topicNamingStrategy = new TopicNamingStrategy<TableId>(){

            public String dataChangeTopic(TableId tableId) {
                return connectorConfig.getLogicalName() + "." + tableId.schema() + "." + tableId.table();
            }

            public String schemaChangeTopic() {
                return connectorConfig.getLogicalName() + ".schema-changes";
            }

            public String sanitizedTopicName(String topicName) {
                return topicName.replaceAll("[^a-zA-Z0-9._-]", "_");
            }

            public String transactionTopic() {
                return connectorConfig.getLogicalName() + ".transaction";
            }

            public String heartbeatTopic() {
                return connectorConfig.getLogicalName() + ".heartbeat";
            }

            public void configure(Properties props) {
            }
        };
        this.schema = new CockroachDBSchema(connectorConfig, topicNamingStrategy);
        try {
            this.schema.initialize(connectorConfig);
        }
        catch (Exception e) {
            LOGGER.error("Failed to initialize schema - connector will fail to start", (Throwable)e);
            throw new RuntimeException("Failed to initialize schema", e);
        }
        this.taskContext = new CockroachDBTaskContext(connectorConfig, this.schema, topicNamingStrategy);
        this.partition = new CockroachDBPartition();
        this.offsetContext = new CockroachDBOffsetContext(connectorConfig);
        Map offset = this.context.offsetStorageReader().offset(this.partition.getSourcePartition());
        if (offset != null) {
            this.offsetContext = new CockroachDBOffsetContext.Loader(connectorConfig).load(offset);
        } else {
            LOGGER.info("No existing offset found, starting from beginning");
        }
        this.running = true;
        Thread streamingThread = new Thread(() -> {
            try {
                this.executeStreaming(connectorConfig);
            }
            catch (Exception e) {
                LOGGER.error("Error in streaming thread - connector will fail", (Throwable)e);
                this.running = false;
            }
        });
        streamingThread.setName("cockroachdb-streaming");
        streamingThread.setDaemon(true);
        streamingThread.start();
        LOGGER.info("CockroachDB connector task started successfully");
    }

    private void executeStreaming(final CockroachDBConnectorConfig connectorConfig) throws InterruptedException {
        ChangeEventSource.ChangeEventSourceContext changeEventSourceContext = new ChangeEventSource.ChangeEventSourceContext(){

            public boolean isRunning() {
                return CockroachDBConnectorTask.this.running;
            }

            public boolean isPaused() {
                return false;
            }

            public void waitSnapshotCompletion() throws InterruptedException {
            }

            public void waitStreamingPaused() throws InterruptedException {
            }

            public void streamingPaused() {
            }

            public void resumeStreaming() {
            }
        };
        TopicNamingStrategy<TableId> topicNamingStrategy = new TopicNamingStrategy<TableId>(){

            public String dataChangeTopic(TableId tableId) {
                return connectorConfig.getLogicalName() + "." + tableId.schema() + "." + tableId.table();
            }

            public String schemaChangeTopic() {
                return connectorConfig.getLogicalName() + ".schema-changes";
            }

            public String sanitizedTopicName(String topicName) {
                return topicName.replaceAll("[^a-zA-Z0-9._-]", "_");
            }

            public String transactionTopic() {
                return connectorConfig.getLogicalName() + ".transaction";
            }

            public String heartbeatTopic() {
                return connectorConfig.getLogicalName() + ".heartbeat";
            }

            public void configure(Properties props) {
            }
        };
        SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjuster();
        EventDispatcher dispatcher = new EventDispatcher((CommonConnectorConfig)connectorConfig, (TopicNamingStrategy)topicNamingStrategy, (DatabaseSchema)this.schema, null, (DataCollectionFilters.DataCollectionFilter)connectorConfig.getTableFilters().dataCollectionFilter(), (ChangeEventCreator)new CockroachDBChangeEventCreator(), (EventMetadataProvider)new CockroachDBEventMetadataProvider(), schemaNameAdjuster, null, null);
        CockroachDBStreamingChangeEventSource streamingChangeEventSource = new CockroachDBStreamingChangeEventSource(connectorConfig, (EventDispatcher<CockroachDBPartition, TableId>)dispatcher, this.schema, Clock.SYSTEM);
        streamingChangeEventSource.execute(changeEventSourceContext, this.partition, (OffsetContext)this.offsetContext);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<SourceRecord> poll() throws InterruptedException {
        if (!this.running) {
            return List.of();
        }
        List<SourceRecord> list = this.pendingRecords;
        synchronized (list) {
            if (this.pendingRecords.isEmpty()) {
                return List.of();
            }
            ArrayList<SourceRecord> records = new ArrayList<SourceRecord>(this.pendingRecords);
            this.pendingRecords.clear();
            return records;
        }
    }

    public void stop() {
        LOGGER.info("Stopping CockroachDB connector task");
        this.running = false;
        if (this.schema != null) {
            try {
                this.schema.close();
            }
            catch (Exception e) {
                LOGGER.warn("Error closing schema", (Throwable)e);
            }
        }
    }
}

