/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresErrorHandler;
import io.debezium.connector.postgresql.PostgresEventDispatcher;
import io.debezium.connector.postgresql.PostgresObjectUtils;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresPartition;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.PostgresTaskContext;
import io.debezium.connector.postgresql.PostgresTopicSelector;
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.history.TableChanges;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.TopicSelector;
import io.debezium.util.LoggingContext;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.exception.PostgresConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.utils.PostgresConnectionUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.utils.PostgresUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresSourceFetchTaskContext
extends JdbcSourceFetchTaskContext {
    private static final Logger log = LoggerFactory.getLogger(PostgresSourceFetchTaskContext.class);
    private static final String CONTEXT_NAME = "postgres-cdc-connector-task";
    private final PostgresConnection dataConnection;
    private ReplicationConnection replicationConnection;
    private final EventMetadataProvider metadataProvider;
    private Snapshotter snapshotter;
    private PostgresSchema databaseSchema;
    private PostgresOffsetContext offsetContext;
    private PostgresPartition partition;
    private TopicSelector<TableId> topicSelector;
    private JdbcSourceEventDispatcher<PostgresPartition> dispatcher;
    private PostgresEventDispatcher<TableId> pgEventDispatcher;
    private ChangeEventQueue<DataChangeEvent> queue;
    private PostgresErrorHandler errorHandler;
    private PostgresTaskContext taskContext;
    private SnapshotChangeEventSourceMetrics<PostgresPartition> snapshotChangeEventSourceMetrics;
    private PostgresConnection.PostgresValueConverterBuilder postgresValueConverterBuilder;
    private Collection<TableChanges.TableChange> engineHistory;

    public PostgresSourceFetchTaskContext(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect, PostgresConnection dataConnection, Collection<TableChanges.TableChange> engineHistory) {
        super(sourceConfig, dataSourceDialect);
        this.dataConnection = dataConnection;
        this.metadataProvider = PostgresObjectUtils.newEventMetadataProvider();
        this.engineHistory = engineHistory;
        this.postgresValueConverterBuilder = PostgresConnectionUtils.newPostgresValueConverterBuilder(this.getDbzConnectorConfig(), "postgres-source-fetch-task-context", sourceConfig.getServerTimeZone());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void configure(SourceSplitBase sourceSplitBase) {
        super.registerDatabaseHistory(sourceSplitBase, (JdbcConnection)this.dataConnection);
        PostgresConnectorConfig connectorConfig = this.getDbzConnectorConfig();
        PostgresConnectorConfig.SnapshotMode snapshotMode = PostgresConnectorConfig.SnapshotMode.parse(connectorConfig.getConfig().getString(PostgresConnectorConfig.SNAPSHOT_MODE));
        this.snapshotter = snapshotMode.getSnapshotter(connectorConfig.getConfig());
        this.topicSelector = PostgresTopicSelector.create(connectorConfig);
        TypeRegistry typeRegistry = this.dataConnection.getTypeRegistry();
        try {
            this.databaseSchema = PostgresObjectUtils.newSchema(this.dataConnection, connectorConfig, typeRegistry, this.topicSelector, this.postgresValueConverterBuilder.build(typeRegistry));
        }
        catch (SQLException e) {
            throw new SeaTunnelRuntimeException((SeaTunnelErrorCode)PostgresConnectorErrorCode.NEW_SCHEMA_FAILED, (Throwable)e);
        }
        this.taskContext = PostgresObjectUtils.newTaskContext(connectorConfig, this.databaseSchema, this.topicSelector);
        this.offsetContext = this.loadStartingOffsetState(new PostgresOffsetContext.Loader(connectorConfig), sourceSplitBase);
        this.partition = new PostgresPartition(connectorConfig.getLogicalName());
        int queueSize = sourceSplitBase.isSnapshotSplit() && this.isExactlyOnce() ? Integer.MAX_VALUE : this.getSourceConfig().getDbzConnectorConfig().getMaxQueueSize();
        LoggingContext.PreviousContext previousContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
        try {
            SlotState slotInfo = null;
            try {
                if (log.isInfoEnabled()) {
                    log.info(this.dataConnection.serverInfo().toString());
                }
                PostgresConnectorConfig.LogicalDecoder logicalDecoder = PostgresConnectorConfig.LogicalDecoder.parse(connectorConfig.getConfig().getString(PostgresConnectorConfig.PLUGIN_NAME));
                slotInfo = this.dataConnection.getReplicationSlotState(connectorConfig.getConfig().getString(PostgresConnectorConfig.SLOT_NAME), logicalDecoder.getPostgresPluginName());
            }
            catch (SQLException e) {
                log.warn("unable to load info of replication slot, Debezium will try to create the slot");
            }
            if (this.offsetContext == null) {
                log.info("No previous offset found");
                this.snapshotter.init(connectorConfig, null, slotInfo);
            } else {
                log.info("Found previous offset {}", (Object)this.offsetContext);
                this.snapshotter.init(connectorConfig, this.offsetContext.asOffsetState(), slotInfo);
            }
            if (this.snapshotter.shouldStream() && this.replicationConnection == null) {
                this.replicationConnection = PostgresObjectUtils.createReplicationConnection(this.taskContext, this.dataConnection, this.snapshotter.shouldSnapshot(), connectorConfig);
                try {
                    this.replicationConnection.createReplicationSlot().orElse(null);
                }
                catch (SQLException ex) {
                    String message = "Creation of replication slot failed";
                    if (ex.getMessage().contains("already exists")) {
                        message = message + "; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.";
                        log.warn(message);
                    }
                    throw new DebeziumException(message, (Throwable)ex);
                }
            }
            try {
                this.dataConnection.commit();
            }
            catch (SQLException e) {
                throw new DebeziumException((Throwable)e);
            }
            this.queue = new ChangeEventQueue.Builder().pollInterval(connectorConfig.getPollInterval()).maxBatchSize(connectorConfig.getMaxBatchSize()).maxQueueSize(queueSize).maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes()).loggingContextSupplier(() -> this.taskContext.configureLoggingContext(CONTEXT_NAME)).build();
            this.dispatcher = new JdbcSourceEventDispatcher((CommonConnectorConfig)connectorConfig, this.topicSelector, (DatabaseSchema)this.databaseSchema, this.queue, (DataCollectionFilters.DataCollectionFilter)connectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, this.metadataProvider, this.schemaNameAdjuster);
            this.pgEventDispatcher = new PostgresEventDispatcher<TableId>(connectorConfig, this.topicSelector, (DatabaseSchema<TableId>)this.databaseSchema, this.queue, (DataCollectionFilters.DataCollectionFilter<TableId>)connectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, this.metadataProvider, this.schemaNameAdjuster);
            this.snapshotChangeEventSourceMetrics = new DefaultChangeEventSourceMetricsFactory().getSnapshotMetrics((CdcSourceTaskContext)this.taskContext, this.queue, this.metadataProvider);
            this.errorHandler = new PostgresErrorHandler(connectorConfig, this.queue);
        }
        finally {
            previousContext.restore();
        }
    }

    public PostgresSourceConfig getSourceConfig() {
        return (PostgresSourceConfig)this.sourceConfig;
    }

    public PostgresConnection getDataConnection() {
        return this.dataConnection;
    }

    public SnapshotChangeEventSourceMetrics<PostgresPartition> getSnapshotChangeEventSourceMetrics() {
        return this.snapshotChangeEventSourceMetrics;
    }

    public PostgresConnectorConfig getDbzConnectorConfig() {
        return (PostgresConnectorConfig)super.getDbzConnectorConfig();
    }

    public PostgresOffsetContext getOffsetContext() {
        return this.offsetContext;
    }

    public PostgresPartition getPartition() {
        return this.partition;
    }

    public ErrorHandler getErrorHandler() {
        return this.errorHandler;
    }

    public PostgresSchema getDatabaseSchema() {
        return this.databaseSchema;
    }

    public TableId getTableId(SourceRecord record) {
        Struct value = (Struct)record.value();
        Struct source = value.getStruct("source");
        String schemaName = source.getString("schema");
        String tableName = source.getString("table");
        return new TableId(null, schemaName, tableName);
    }

    public SeaTunnelRowType getSplitType(Table table) {
        return PostgresUtils.getSplitType(table);
    }

    public JdbcSourceEventDispatcher<PostgresPartition> getDispatcher() {
        return this.dispatcher;
    }

    public PostgresEventDispatcher<TableId> getPgEventDispatcher() {
        return this.pgEventDispatcher;
    }

    public ChangeEventQueue<DataChangeEvent> getQueue() {
        return this.queue;
    }

    public Tables.TableFilter getTableFilter() {
        return this.getDbzConnectorConfig().getTableFilters().dataCollectionFilter();
    }

    public Offset getStreamOffset(SourceRecord sourceRecord) {
        return PostgresUtils.getLsnPosition(sourceRecord);
    }

    public void close() {
        try {
            if (Objects.nonNull((Object)this.dataConnection)) {
                this.dataConnection.close();
            }
            if (Objects.nonNull(this.replicationConnection)) {
                this.replicationConnection.close();
            }
        }
        catch (Exception e) {
            log.warn("Failed to close connection", (Throwable)e);
        }
    }

    private PostgresOffsetContext loadStartingOffsetState(PostgresOffsetContext.Loader loader, SourceSplitBase split) {
        LsnOffset offset = split.isSnapshotSplit() ? LsnOffset.INITIAL_OFFSET : split.asIncrementalSplit().getStartupOffset();
        Map offsetStrMap = ((Offset)Objects.requireNonNull(offset, "offset is null for the sourceSplitBase")).getOffset();
        HashMap<String, Long> offsetMap = new HashMap<String, Long>();
        for (String key : offsetStrMap.keySet()) {
            String value = (String)offsetStrMap.get(key);
            if (value == null) continue;
            offsetMap.put(key, Long.parseLong(value));
        }
        return loader.load(offsetMap);
    }

    public ReplicationConnection getReplicationConnection() {
        return this.replicationConnection;
    }

    public Snapshotter getSnapshotter() {
        return this.snapshotter;
    }

    public PostgresTaskContext getTaskContext() {
        return this.taskContext;
    }
}

