/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cockroachdb;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.connector.cockroachdb.CockroachDBChangeRecordEmitter;
import io.debezium.connector.cockroachdb.CockroachDBConnectorConfig;
import io.debezium.connector.cockroachdb.CockroachDBOffsetContext;
import io.debezium.connector.cockroachdb.CockroachDBPartition;
import io.debezium.connector.cockroachdb.CockroachDBSchema;
import io.debezium.connector.cockroachdb.connection.CockroachDBConnection;
import io.debezium.connector.cockroachdb.serialization.ChangefeedSchemaParser;
import io.debezium.data.Envelope;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CockroachDBStreamingChangeEventSource
implements StreamingChangeEventSource<CockroachDBPartition, CockroachDBOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(CockroachDBStreamingChangeEventSource.class);
    private final CockroachDBConnectorConfig config;
    private final EventDispatcher<CockroachDBPartition, TableId> dispatcher;
    private final CockroachDBSchema schema;
    private final Clock clock;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final Set<String> processedEvents = ConcurrentHashMap.newKeySet();

    public CockroachDBStreamingChangeEventSource(CockroachDBConnectorConfig config, EventDispatcher<CockroachDBPartition, TableId> dispatcher, CockroachDBSchema schema, Clock clock) {
        if (config == null) {
            throw new IllegalArgumentException("Config cannot be null");
        }
        if (dispatcher == null) {
            throw new IllegalArgumentException("Dispatcher cannot be null");
        }
        if (schema == null) {
            throw new IllegalArgumentException("Schema cannot be null");
        }
        if (clock == null) {
            throw new IllegalArgumentException("Clock cannot be null");
        }
        this.config = config;
        this.dispatcher = dispatcher;
        this.schema = schema;
        this.clock = clock;
    }

    public void execute(ChangeEventSource.ChangeEventSourceContext context, CockroachDBPartition partition, CockroachDBOffsetContext offsetContext) throws InterruptedException {
        if (context == null) {
            throw new IllegalArgumentException("Context cannot be null");
        }
        if (partition == null) {
            throw new IllegalArgumentException("Partition cannot be null");
        }
        if (offsetContext == null) {
            throw new IllegalArgumentException("Offset context cannot be null");
        }
        LOGGER.info("Starting CockroachDB streaming change event source with sink changefeed approach");
        this.running.set(true);
        try (CockroachDBConnection connection = new CockroachDBConnection(this.config);){
            connection.connect();
            LOGGER.info("Successfully connected to CockroachDB");
            List<TableId> tables = this.schema.getDiscoveredTables();
            if (tables.isEmpty()) {
                LOGGER.warn("No tables found to monitor - check your table filters and permissions");
                return;
            }
            LOGGER.info("Monitoring {} tables: {}", (Object)tables.size(), tables);
            for (TableId table : tables) {
                if (!context.isRunning()) {
                    LOGGER.info("Context stopped, breaking out of table processing loop");
                    break;
                }
                this.processTableWithSinkChangefeed(connection, table, offsetContext, context);
            }
            Duration pollInterval = Duration.ofMillis(this.config.getChangefeedPollIntervalMs());
            Metronome metronome = Metronome.sleeper((Duration)pollInterval, (Clock)this.clock);
            while (context.isRunning() && this.running.get()) {
                metronome.pause();
            }
        }
        catch (SQLException e) {
            LOGGER.error("Error in CockroachDB streaming: ", (Throwable)e);
            throw new RuntimeException("Failed to stream changes from CockroachDB", e);
        }
        finally {
            this.running.set(false);
            LOGGER.info("Stopped CockroachDB streaming change event source");
        }
    }

    private void processTableWithSinkChangefeed(CockroachDBConnection connection, TableId table, CockroachDBOffsetContext offsetContext, ChangeEventSource.ChangeEventSourceContext context) throws SQLException, InterruptedException {
        String changefeedQuery = this.buildSinkChangefeedQuery(table, offsetContext.getCursor());
        LOGGER.info("Creating sink changefeed for table {}: {}", (Object)table, (Object)changefeedQuery);
        try (Statement stmt = connection.connection().createStatement();){
            stmt.execute(changefeedQuery);
            LOGGER.info("Successfully created changefeed for table {}", (Object)table);
        }
        catch (SQLException e) {
            LOGGER.error("Failed to create changefeed for table {}: {}", new Object[]{table, e.getMessage(), e});
            throw e;
        }
        this.consumeFromKafkaTopic(table, offsetContext, context);
    }

    /*
     * Enabled aggressive exception aggregation
     */
    private boolean changefeedExists(CockroachDBConnection connection, TableId table) throws SQLException {
        try (Statement stmt = connection.connection().createStatement();){
            boolean bl;
            block14: {
                String query = "SELECT job_id FROM [SHOW CHANGEFEED JOBS] WHERE status = 'running' AND description LIKE '%" + table.toString() + "%'";
                ResultSet rs = stmt.executeQuery(query);
                try {
                    bl = rs.next();
                    if (rs == null) break block14;
                }
                catch (Throwable throwable) {
                    if (rs != null) {
                        try {
                            rs.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                rs.close();
            }
            return bl;
        }
        catch (SQLException e) {
            LOGGER.warn("Failed to check if changefeed exists: {}", (Object)e.getMessage());
            return false;
        }
    }

    private void consumeFromKafkaTopic(TableId table, CockroachDBOffsetContext offsetContext, ChangeEventSource.ChangeEventSourceContext context) throws InterruptedException {
        String topicPrefix = this.config.getChangefeedSinkTopicPrefix();
        if (topicPrefix == null || topicPrefix.trim().isEmpty()) {
            topicPrefix = "cockroachdb";
        }
        String databaseName = this.config.getDatabaseName();
        String topicName = topicPrefix + "." + databaseName + "." + table.schema() + "." + table.table();
        Properties props = new Properties();
        props.put("bootstrap.servers", this.config.getChangefeedSinkUri().replace("kafka://", ""));
        props.put("group.id", "cockroachdb-connector-" + table.table());
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", "false");
        try (KafkaConsumer consumer = new KafkaConsumer(props);){
            consumer.subscribe(Arrays.asList(topicName));
            LOGGER.info("Started consuming from Kafka topic: {}", (Object)topicName);
            Duration pollInterval = Duration.ofMillis(this.config.getChangefeedPollIntervalMs());
            Metronome metronome = Metronome.sleeper((Duration)pollInterval, (Clock)this.clock);
            while (context.isRunning() && this.running.get()) {
                ConsumerRecords records = consumer.poll(Duration.ofMillis(100L));
                for (ConsumerRecord record : records) {
                    if (!context.isRunning()) break;
                    try {
                        String valueJson = (String)record.value();
                        if (valueJson == null || valueJson.trim().isEmpty()) continue;
                        this.processChangefeedEventFromKafka(valueJson, table, offsetContext);
                    }
                    catch (Exception e) {
                        LOGGER.error("Error processing changefeed event from Kafka: {}", (Object)e.getMessage(), (Object)e);
                    }
                }
                metronome.pause();
            }
            LOGGER.info("Stopped consuming from Kafka topic: {}", (Object)topicName);
        }
        catch (Exception e) {
            LOGGER.error("Error consuming from Kafka topic {}: {}", new Object[]{topicName, e.getMessage(), e});
            throw new RuntimeException("Failed to consume from Kafka topic " + topicName, e);
        }
    }

    private void processChangefeedEventFromKafka(String valueJson, TableId table, CockroachDBOffsetContext offsetContext) {
        if (valueJson == null || valueJson.trim().isEmpty()) {
            LOGGER.warn("Received null or empty value JSON for table {}", (Object)table);
            return;
        }
        if (table == null) {
            LOGGER.warn("Received null table for value JSON: {}", (Object)valueJson);
            return;
        }
        if (offsetContext == null) {
            LOGGER.warn("Received null offset context for table {} and value: {}", (Object)table, (Object)valueJson);
            return;
        }
        try {
            JsonNode jsonNode = this.objectMapper.readTree(valueJson);
            if (jsonNode != null && jsonNode.has("resolved")) {
                JsonNode resolvedNode = jsonNode.get("resolved");
                if (resolvedNode != null) {
                    LOGGER.debug("Received resolved timestamp: {}", (Object)resolvedNode.asText());
                }
                return;
            }
            String eventId = this.createEventId(jsonNode);
            if (eventId != null && this.processedEvents.contains(eventId)) {
                LOGGER.debug("Skipping duplicate event: {}", (Object)eventId);
                return;
            }
            if (eventId != null) {
                this.processedEvents.add(eventId);
            }
            LOGGER.info("Received enriched envelope event for table {}: {}", (Object)table, (Object)valueJson);
            String keyJson = this.extractKeyFromEnrichedEnvelope(jsonNode);
            ChangefeedSchemaParser.ParsedChange change = ChangefeedSchemaParser.parse(keyJson, valueJson);
            if (change != null) {
                this.dispatchChangeEvent(table, change, offsetContext, jsonNode);
            } else {
                LOGGER.warn("Failed to parse changefeed event for table {} with key: {}", (Object)table, (Object)keyJson);
            }
        }
        catch (Exception e) {
            LOGGER.error("Error processing changefeed event from Kafka: {}", (Object)e.getMessage(), (Object)e);
        }
    }

    private String createEventId(JsonNode jsonNode) {
        try {
            JsonNode payloadNode = jsonNode.path("payload");
            if (payloadNode.isMissingNode()) {
                payloadNode = jsonNode;
            }
            String tableName = payloadNode.path("source").path("table_name").asText("");
            String operation = payloadNode.path("op").asText("");
            String timestamp = payloadNode.path("ts_ns").asText("");
            return tableName + ":" + operation + ":" + timestamp;
        }
        catch (Exception e) {
            try {
                String jsonString = jsonNode != null ? jsonNode.toString() : "";
                return String.valueOf(jsonString.hashCode());
            }
            catch (Exception hashException) {
                return "unknown:" + System.currentTimeMillis();
            }
        }
    }

    private String extractKeyFromEnrichedEnvelope(JsonNode jsonNode) {
        try {
            JsonNode beforeNode;
            JsonNode afterNode;
            JsonNode payloadNode = jsonNode.path("payload");
            if (payloadNode.isMissingNode()) {
                payloadNode = jsonNode;
            }
            if (payloadNode.has("after") && payloadNode.get("after").isObject() && (afterNode = payloadNode.get("after")).has("id")) {
                return "{\"id\":\"" + afterNode.get("id").asText() + "\"}";
            }
            if (payloadNode.has("before") && payloadNode.get("before").isObject() && (beforeNode = payloadNode.get("before")).has("id")) {
                return "{\"id\":\"" + beforeNode.get("id").asText() + "\"}";
            }
            LOGGER.warn("Could not extract primary key from enriched envelope");
            return null;
        }
        catch (Exception e) {
            LOGGER.error("Error extracting key from enriched envelope: {}", (Object)e.getMessage(), (Object)e);
            return null;
        }
    }

    private void dispatchChangeEvent(TableId table, ChangefeedSchemaParser.ParsedChange change, CockroachDBOffsetContext offsetContext, JsonNode jsonNode) {
        if (change.keySchema() == null && change.valueSchema() == null) {
            return;
        }
        try {
            Envelope.Operation operation = this.extractOperationFromEnvelope(jsonNode);
            JsonNode payloadNode = jsonNode.path("payload");
            if (payloadNode.isMissingNode()) {
                payloadNode = jsonNode;
            }
            JsonNode sourceNode = payloadNode.path("source");
            String clusterId = sourceNode.path("cluster_id").asText("unknown");
            String nodeId = sourceNode.path("node_id").asText("unknown");
            String jobId = sourceNode.path("job_id").asText("unknown");
            String tsHlc = sourceNode.path("ts_hlc").asText("unknown");
            CockroachDBPartition partition = new CockroachDBPartition();
            CockroachDBChangeRecordEmitter emitter = new CockroachDBChangeRecordEmitter(partition, change, offsetContext, this.clock, operation);
            LOGGER.info("Processed data event for table {} with operation {}: key={}, value={}, source={}", new Object[]{table, operation, change.key(), change.value(), "cluster_id=" + clusterId + ", node_id=" + nodeId + ", job_id=" + jobId + ", ts_hlc=" + tsHlc});
        }
        catch (Exception e) {
            LOGGER.error("Error processing change event for table {}: {}", new Object[]{table, e.getMessage(), e});
        }
    }

    private Envelope.Operation extractOperationFromEnvelope(JsonNode jsonNode) {
        try {
            boolean hasAfter;
            JsonNode payloadNode = jsonNode.path("payload");
            if (payloadNode.isMissingNode()) {
                payloadNode = jsonNode;
            }
            if (payloadNode.has("op")) {
                String op;
                switch (op = payloadNode.get("op").asText()) {
                    case "c": {
                        return Envelope.Operation.CREATE;
                    }
                    case "u": {
                        return Envelope.Operation.UPDATE;
                    }
                    case "d": {
                        return Envelope.Operation.DELETE;
                    }
                }
                LOGGER.warn("Unknown operation type in enriched envelope: {}", (Object)op);
                return Envelope.Operation.READ;
            }
            boolean hasBefore = payloadNode.has("before") && !payloadNode.get("before").isNull();
            boolean bl = hasAfter = payloadNode.has("after") && !payloadNode.get("after").isNull();
            if (hasBefore && hasAfter) {
                return Envelope.Operation.UPDATE;
            }
            if (hasAfter && !hasBefore) {
                return Envelope.Operation.CREATE;
            }
            if (hasBefore && !hasAfter) {
                return Envelope.Operation.DELETE;
            }
            LOGGER.warn("Cannot determine operation type from enriched envelope structure: before={}, after={}", (Object)hasBefore, (Object)hasAfter);
            return Envelope.Operation.READ;
        }
        catch (Exception e) {
            LOGGER.error("Error extracting operation from enriched envelope: {}", (Object)e.getMessage(), (Object)e);
            return Envelope.Operation.READ;
        }
    }

    private String buildSinkChangefeedQuery(TableId table, String cursor) {
        String sinkOptions;
        StringBuilder query = new StringBuilder();
        query.append("CREATE CHANGEFEED FOR TABLE ").append(table.toString());
        Object sinkUri = this.config.getChangefeedSinkUri();
        String topicPrefix = this.config.getChangefeedSinkTopicPrefix();
        if (topicPrefix == null || topicPrefix.trim().isEmpty()) {
            topicPrefix = "cockroachdb";
        }
        String databaseName = this.config.getDatabaseName();
        String topicName = topicPrefix + "." + databaseName + "." + table.schema() + "." + table.table();
        sinkUri = ((String)sinkUri).contains("?") ? (String)sinkUri + "&topic_name=" + topicName : (String)sinkUri + "?topic_name=" + topicName;
        query.append(" INTO '").append((String)sinkUri).append("'");
        String envelope = this.config.getChangefeedEnvelope();
        query.append(" WITH envelope = '").append(envelope).append("'");
        String enrichedProperties = this.config.getChangefeedEnrichedProperties();
        if (enrichedProperties != null && !enrichedProperties.trim().isEmpty()) {
            query.append(", enriched_properties = '").append(enrichedProperties).append("'");
        }
        if (this.config.isChangefeedIncludeUpdated()) {
            query.append(", updated");
        }
        if (this.config.isChangefeedIncludeDiff()) {
            query.append(", diff");
        }
        String resolvedInterval = this.config.getChangefeedResolvedInterval();
        query.append(", resolved = '").append(resolvedInterval).append("'");
        if (cursor != null && !cursor.trim().isEmpty()) {
            query.append(", cursor = '").append(cursor).append("'");
        }
        if ((sinkOptions = this.config.getChangefeedSinkOptions()) != null && !sinkOptions.trim().isEmpty()) {
            query.append(", ").append(sinkOptions);
        }
        return query.toString();
    }

    public void stop() {
        this.running.set(false);
        LOGGER.info("Stopping CockroachDB streaming change event source");
    }
}

