/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.db2.source.fetch;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.db2.Db2Connection;
import io.debezium.connector.db2.Db2Connector;
import io.debezium.connector.db2.Db2ConnectorConfig;
import io.debezium.connector.db2.Db2DatabaseSchema;
import io.debezium.connector.db2.Db2OffsetContext;
import io.debezium.connector.db2.Db2Partition;
import io.debezium.connector.db2.Db2TaskContext;
import io.debezium.connector.db2.Db2TopicSelector;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
import io.debezium.util.SchemaNameAdjuster;
import java.time.Instant;
import java.util.Map;
import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.relational.handler.SchemaChangeEventHandler;
import org.apache.flink.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
import org.apache.flink.cdc.connectors.db2.source.config.Db2SourceConfig;
import org.apache.flink.cdc.connectors.db2.source.dialect.Db2Dialect;
import org.apache.flink.cdc.connectors.db2.source.handler.Db2SchemaChangeEventHandler;
import org.apache.flink.cdc.connectors.db2.source.offset.LsnOffset;
import org.apache.flink.cdc.connectors.db2.source.utils.Db2Utils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

public class Db2SourceFetchTaskContext
extends JdbcSourceFetchTaskContext {
    private final Db2Connection connection;
    private final Db2Connection metaDataConnection;
    private final Db2EventMetadataProvider metadataProvider;
    private Db2OffsetContext offsetContext;
    private Db2Partition partition;
    private Db2DatabaseSchema databaseSchema;
    private JdbcSourceEventDispatcher<Db2Partition> dispatcher;
    private ErrorHandler errorHandler;
    private ChangeEventQueue<DataChangeEvent> queue;
    private Db2TaskContext taskContext;
    private TopicSelector<TableId> topicSelector;
    private EventDispatcher.SnapshotReceiver<Db2Partition> snapshotReceiver;
    private SnapshotChangeEventSourceMetrics<Db2Partition> snapshotChangeEventSourceMetrics;
    private StreamingChangeEventSourceMetrics<Db2Partition> streamingChangeEventSourceMetrics;

    public Db2SourceFetchTaskContext(JdbcSourceConfig sourceConfig, Db2Dialect dataSourceDialect, Db2Connection connection, Db2Connection metaDataConnection) {
        super(sourceConfig, (JdbcDataSourceDialect)dataSourceDialect);
        this.connection = connection;
        this.metadataProvider = new Db2EventMetadataProvider();
        this.metaDataConnection = metaDataConnection;
    }

    public void configure(SourceSplitBase sourceSplitBase) {
        Db2ConnectorConfig connectorConfig = this.getDbzConnectorConfig();
        this.topicSelector = Db2TopicSelector.defaultSelector((Db2ConnectorConfig)connectorConfig);
        EmbeddedFlinkDatabaseHistory.registerHistory((String)this.sourceConfig.getDbzConfiguration().getString("database.history.instance.name"), sourceSplitBase.getTableSchemas().values());
        this.databaseSchema = Db2Utils.createDb2DatabaseSchema(connectorConfig, this.connection);
        this.offsetContext = this.loadStartingOffsetState(new Db2OffsetContext.Loader(connectorConfig), sourceSplitBase);
        String serverName = connectorConfig.getLogicalName();
        this.partition = new Db2Partition(serverName);
        this.validateAndLoadDatabaseHistory(this.offsetContext, this.databaseSchema);
        this.taskContext = new Db2TaskContext(connectorConfig, this.databaseSchema);
        int queueSize = sourceSplitBase.isSnapshotSplit() ? this.getSourceConfig().getSplitSize() : this.getSourceConfig().getDbzConnectorConfig().getMaxQueueSize();
        this.queue = new ChangeEventQueue.Builder().pollInterval(connectorConfig.getPollInterval()).maxBatchSize(connectorConfig.getMaxBatchSize()).maxQueueSize(queueSize).maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes()).loggingContextSupplier(() -> this.taskContext.configureLoggingContext("Db2-cdc-connector-task")).build();
        this.dispatcher = new JdbcSourceEventDispatcher((CommonConnectorConfig)connectorConfig, this.topicSelector, (DatabaseSchema)this.databaseSchema, this.queue, (DataCollectionFilters.DataCollectionFilter)connectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, (EventMetadataProvider)this.metadataProvider, this.schemaNameAdjuster, (SchemaChangeEventHandler)new Db2SchemaChangeEventHandler());
        this.snapshotReceiver = this.dispatcher.getSnapshotChangeEventReceiver();
        DefaultChangeEventSourceMetricsFactory changeEventSourceMetricsFactory = new DefaultChangeEventSourceMetricsFactory();
        this.snapshotChangeEventSourceMetrics = changeEventSourceMetricsFactory.getSnapshotMetrics((CdcSourceTaskContext)this.taskContext, this.queue, (EventMetadataProvider)this.metadataProvider);
        this.streamingChangeEventSourceMetrics = changeEventSourceMetricsFactory.getStreamingMetrics((CdcSourceTaskContext)this.taskContext, this.queue, (EventMetadataProvider)this.metadataProvider);
        this.errorHandler = new ErrorHandler(Db2Connector.class, (CommonConnectorConfig)connectorConfig, this.queue);
    }

    private Db2OffsetContext loadStartingOffsetState(Db2OffsetContext.Loader loader, SourceSplitBase sourceSplitBase) {
        LsnOffset offset = sourceSplitBase.isSnapshotSplit() ? LsnOffset.INITIAL_OFFSET : sourceSplitBase.asStreamSplit().getStartingOffset();
        return loader.load(offset.getOffset());
    }

    private void validateAndLoadDatabaseHistory(Db2OffsetContext offset, Db2DatabaseSchema schema) {
        schema.initializeStorage();
        schema.recover((Partition)this.partition, (OffsetContext)offset);
    }

    public ChangeEventQueue<DataChangeEvent> getQueue() {
        return this.queue;
    }

    public Tables.TableFilter getTableFilter() {
        return this.getDbzConnectorConfig().getTableFilters().dataCollectionFilter();
    }

    public Offset getStreamOffset(SourceRecord record) {
        return Db2Utils.getLsn(record);
    }

    public Db2DatabaseSchema getDatabaseSchema() {
        return this.databaseSchema;
    }

    public RowType getSplitType(Table table) {
        Column splitColumn = Db2Utils.getSplitColumn(table, this.sourceConfig.getChunkKeyColumn());
        return Db2Utils.getSplitType(splitColumn);
    }

    public ErrorHandler getErrorHandler() {
        return this.errorHandler;
    }

    public Db2ConnectorConfig getDbzConnectorConfig() {
        return (Db2ConnectorConfig)super.getDbzConnectorConfig();
    }

    public Db2SourceConfig getSourceConfig() {
        return (Db2SourceConfig)this.sourceConfig;
    }

    public JdbcSourceEventDispatcher getEventDispatcher() {
        return this.dispatcher;
    }

    public WatermarkDispatcher getWaterMarkDispatcher() {
        return this.dispatcher;
    }

    public EventDispatcher.SnapshotReceiver<Db2Partition> getSnapshotReceiver() {
        return this.snapshotReceiver;
    }

    public Db2OffsetContext getOffsetContext() {
        return this.offsetContext;
    }

    public Db2Partition getPartition() {
        return this.partition;
    }

    public Db2Connection getConnection() {
        return this.connection;
    }

    public Db2Connection getMetaDataConnection() {
        return this.metaDataConnection;
    }

    public SnapshotChangeEventSourceMetrics<Db2Partition> getSnapshotChangeEventSourceMetrics() {
        return this.snapshotChangeEventSourceMetrics;
    }

    public StreamingChangeEventSourceMetrics<Db2Partition> getStreamingChangeEventSourceMetrics() {
        return this.streamingChangeEventSourceMetrics;
    }

    public void close() throws Exception {
        this.metaDataConnection.commit();
        this.connection.commit();
        this.metaDataConnection.close();
        this.connection.close();
    }

    public SchemaNameAdjuster getSchemaNameAdjuster() {
        return this.schemaNameAdjuster;
    }

    public static class Db2EventMetadataProvider
    implements EventMetadataProvider {
        public Instant getEventTimestamp(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
            if (value == null) {
                return null;
            }
            Struct sourceInfo = value.getStruct("source");
            if (source == null) {
                return null;
            }
            Long timestamp = sourceInfo.getInt64("ts_ms");
            return timestamp == null ? null : Instant.ofEpochMilli(timestamp);
        }

        public Map<String, String> getEventSourcePosition(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
            if (value == null) {
                return null;
            }
            Struct sourceInfo = value.getStruct("source");
            if (source == null) {
                return null;
            }
            return Collect.hashMapOf((Object)"commit_lsn", (Object)sourceInfo.getString("commit_lsn"), (Object)"change_lsn", (Object)sourceInfo.getString("change_lsn"));
        }

        public String getTransactionId(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
            if (value == null) {
                return null;
            }
            Struct sourceInfo = value.getStruct("source");
            if (source == null) {
                return null;
            }
            return sourceInfo.getString("commit_lsn");
        }
    }
}

