/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.db2.source.fetch;

import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.db2.Db2Connection;
import io.debezium.connector.db2.Db2ConnectorConfig;
import io.debezium.connector.db2.Db2DatabaseSchema;
import io.debezium.connector.db2.Db2OffsetContext;
import io.debezium.connector.db2.Db2Partition;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.SnapshotChangeRecordEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.ColumnUtils;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Map;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.db2.source.fetch.Db2SourceFetchTaskContext;
import org.apache.flink.cdc.connectors.db2.source.fetch.Db2StreamFetchTask;
import org.apache.flink.cdc.connectors.db2.source.utils.Db2Utils;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Db2ScanFetchTask
extends AbstractScanFetchTask {
    public Db2ScanFetchTask(SnapshotSplit split) {
        super(split);
    }

    @Override
    protected void executeDataSnapshot(FetchTask.Context context) throws Exception {
        Db2SnapshotSplitChangeEventSourceContext changeEventSourceContext;
        Db2SourceFetchTaskContext sourceFetchContext = (Db2SourceFetchTaskContext)context;
        Db2SnapshotSplitReadTask snapshotSplitReadTask = new Db2SnapshotSplitReadTask(sourceFetchContext.getDbzConnectorConfig(), sourceFetchContext.getOffsetContext(), sourceFetchContext.getSnapshotChangeEventSourceMetrics(), sourceFetchContext.getDatabaseSchema(), sourceFetchContext.getConnection(), sourceFetchContext.getDispatcher(), sourceFetchContext.getSnapshotReceiver(), this.snapshotSplit);
        SnapshotResult<Db2OffsetContext> snapshotResult = snapshotSplitReadTask.execute((ChangeEventSource.ChangeEventSourceContext)(changeEventSourceContext = new Db2SnapshotSplitChangeEventSourceContext()), sourceFetchContext.getPartition(), sourceFetchContext.getOffsetContext());
        if (!snapshotResult.isCompletedOrSkipped()) {
            this.taskRunning = false;
            throw new IllegalStateException(String.format("Read snapshot for Db2 split %s fail", this.snapshotSplit));
        }
    }

    @Override
    protected void executeBackfillTask(FetchTask.Context context, StreamSplit backfillStreamSplit) throws Exception {
        Db2SourceFetchTaskContext sourceFetchContext = (Db2SourceFetchTaskContext)context;
        Db2OffsetContext.Loader loader = new Db2OffsetContext.Loader(sourceFetchContext.getDbzConnectorConfig());
        OffsetContext streamOffsetContext = loader.load((Map)backfillStreamSplit.getStartingOffset().getOffset());
        Db2StreamFetchTask.StreamSplitReadTask backfillBinlogReadTask = this.createBackFillLsnSplitReadTask(backfillStreamSplit, sourceFetchContext);
        backfillBinlogReadTask.execute((ChangeEventSource.ChangeEventSourceContext)new Db2SnapshotSplitChangeEventSourceContext(), sourceFetchContext.getPartition(), (Db2OffsetContext)streamOffsetContext);
    }

    private Db2StreamFetchTask.StreamSplitReadTask createBackFillLsnSplitReadTask(StreamSplit backfillBinlogSplit, Db2SourceFetchTaskContext context) {
        Configuration dezConf = ((Configuration.Builder)((Configuration.Builder)context.getDbzConnectorConfig().getConfig().edit().with("table.include.list", (Object)new TableId(null, this.snapshotSplit.getTableId().schema(), this.snapshotSplit.getTableId().table()))).with(Heartbeat.HEARTBEAT_INTERVAL, 0)).build();
        return new Db2StreamFetchTask.StreamSplitReadTask(new Db2ConnectorConfig(dezConf), context.getConnection(), context.getMetaDataConnection(), context.getDispatcher(), context.getErrorHandler(), context.getDatabaseSchema(), backfillBinlogSplit);
    }

    public class Db2SnapshotSplitChangeEventSourceContext
    implements ChangeEventSource.ChangeEventSourceContext {
        public void finished() {
            Db2ScanFetchTask.this.taskRunning = false;
        }

        @Override
        public boolean isRunning() {
            return Db2ScanFetchTask.this.taskRunning;
        }
    }

    public static class Db2SnapshotSplitReadTask
    extends AbstractSnapshotChangeEventSource<Db2Partition, Db2OffsetContext> {
        private static final Logger LOG = LoggerFactory.getLogger(Db2SnapshotSplitReadTask.class);
        private static final Duration LOG_INTERVAL = Duration.ofMillis(10000L);
        private final Db2ConnectorConfig connectorConfig;
        private final Db2DatabaseSchema databaseSchema;
        private final Db2Connection jdbcConnection;
        private final JdbcSourceEventDispatcher<Db2Partition> dispatcher;
        private final Clock clock;
        private final SnapshotSplit snapshotSplit;
        private final Db2OffsetContext offsetContext;
        private final SnapshotProgressListener<Db2Partition> snapshotProgressListener;
        private final EventDispatcher.SnapshotReceiver<Db2Partition> snapshotReceiver;

        public Db2SnapshotSplitReadTask(Db2ConnectorConfig connectorConfig, Db2OffsetContext previousOffset, SnapshotProgressListener<Db2Partition> snapshotProgressListener, Db2DatabaseSchema databaseSchema, Db2Connection jdbcConnection, JdbcSourceEventDispatcher<Db2Partition> dispatcher, EventDispatcher.SnapshotReceiver<Db2Partition> snapshotReceiver, SnapshotSplit snapshotSplit) {
            super(connectorConfig, snapshotProgressListener);
            this.offsetContext = previousOffset;
            this.connectorConfig = connectorConfig;
            this.databaseSchema = databaseSchema;
            this.jdbcConnection = jdbcConnection;
            this.dispatcher = dispatcher;
            this.clock = Clock.SYSTEM;
            this.snapshotSplit = snapshotSplit;
            this.snapshotProgressListener = snapshotProgressListener;
            this.snapshotReceiver = snapshotReceiver;
        }

        @Override
        public SnapshotResult<Db2OffsetContext> execute(ChangeEventSource.ChangeEventSourceContext context, Db2Partition partition, Db2OffsetContext previousOffset) throws InterruptedException {
            Db2SnapshotContext ctx;
            AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask = this.getSnapshottingTask(partition, previousOffset);
            try {
                ctx = this.prepare(partition);
            }
            catch (Exception e) {
                LOG.error("Failed to initialize snapshot context.", e);
                throw new RuntimeException(e);
            }
            try {
                return this.doExecute(context, previousOffset, (AbstractSnapshotChangeEventSource.SnapshotContext<Db2Partition, Db2OffsetContext>)ctx, snapshottingTask);
            }
            catch (InterruptedException e) {
                LOG.warn("Snapshot was interrupted before completion");
                throw e;
            }
            catch (Exception t) {
                throw new DebeziumException(t);
            }
        }

        @Override
        protected SnapshotResult<Db2OffsetContext> doExecute(ChangeEventSource.ChangeEventSourceContext context, Db2OffsetContext previousOffset, AbstractSnapshotChangeEventSource.SnapshotContext<Db2Partition, Db2OffsetContext> snapshotContext, AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask) throws Exception {
            Db2SnapshotContext ctx = (Db2SnapshotContext)snapshotContext;
            ctx.offset = this.offsetContext;
            this.createDataEvents(ctx, this.snapshotSplit.getTableId());
            return SnapshotResult.completed(ctx.offset);
        }

        @Override
        protected AbstractSnapshotChangeEventSource.SnapshottingTask getSnapshottingTask(Db2Partition partition, Db2OffsetContext previousOffset) {
            return new AbstractSnapshotChangeEventSource.SnapshottingTask(false, true);
        }

        protected Db2SnapshotContext prepare(Db2Partition partition) throws Exception {
            return new Db2SnapshotContext(partition);
        }

        private void createDataEvents(Db2SnapshotContext snapshotContext, TableId tableId) throws Exception {
            LOG.debug("Snapshotting table {}", (Object)tableId);
            this.createDataEventsForTable(snapshotContext, this.snapshotReceiver, this.databaseSchema.tableFor(tableId));
            this.snapshotReceiver.completeSnapshot();
        }

        private void createDataEventsForTable(Db2SnapshotContext snapshotContext, EventDispatcher.SnapshotReceiver<Db2Partition> snapshotReceiver, Table table) throws InterruptedException {
            long exportStart = this.clock.currentTimeInMillis();
            LOG.info("Exporting data from split '{}' of table {}", (Object)this.snapshotSplit.splitId(), (Object)table.id());
            String selectSql = Db2Utils.buildSplitScanQuery(this.snapshotSplit.getTableId(), this.snapshotSplit.getSplitKeyType(), this.snapshotSplit.getSplitStart() == null, this.snapshotSplit.getSplitEnd() == null);
            LOG.info("For split '{}' of table {} using select statement: '{}'", this.snapshotSplit.splitId(), table.id(), selectSql);
            try (PreparedStatement selectStatement = Db2Utils.readTableSplitDataStatement(this.jdbcConnection, selectSql, this.snapshotSplit.getSplitStart() == null, this.snapshotSplit.getSplitEnd() == null, this.snapshotSplit.getSplitStart(), this.snapshotSplit.getSplitEnd(), this.snapshotSplit.getSplitKeyType().getFieldCount(), this.connectorConfig.getQueryFetchSize());
                 ResultSet rs = selectStatement.executeQuery();){
                ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table);
                long rows = 0L;
                Threads.Timer logTimer = this.getTableScanLogTimer();
                while (rs.next()) {
                    ++rows;
                    Object[] row = this.jdbcConnection.rowToArray(table, this.databaseSchema, rs, columnArray);
                    if (logTimer.expired()) {
                        long stop = this.clock.currentTimeInMillis();
                        LOG.info("Exported {} records for split '{}' after {}", rows, this.snapshotSplit.splitId(), Strings.duration(stop - exportStart));
                        this.snapshotProgressListener.rowsScanned((Db2Partition)snapshotContext.partition, table.id(), rows);
                        logTimer = this.getTableScanLogTimer();
                    }
                    this.dispatcher.dispatchSnapshotEvent((Db2Partition)snapshotContext.partition, table.id(), this.getChangeRecordEmitter(snapshotContext, table.id(), row), snapshotReceiver);
                }
                LOG.info("Finished exporting {} records for split '{}', total duration '{}'", rows, this.snapshotSplit.splitId(), Strings.duration(this.clock.currentTimeInMillis() - exportStart));
            }
            catch (SQLException e) {
                throw new ConnectException("Snapshotting of table " + table.id() + " failed", e);
            }
        }

        protected ChangeRecordEmitter<Db2Partition> getChangeRecordEmitter(Db2SnapshotContext snapshotContext, TableId tableId, Object[] row) {
            ((Db2OffsetContext)snapshotContext.offset).event(tableId, this.clock.currentTime());
            return new SnapshotChangeRecordEmitter<Partition>(snapshotContext.partition, snapshotContext.offset, row, this.clock);
        }

        private Threads.Timer getTableScanLogTimer() {
            return Threads.timer(this.clock, LOG_INTERVAL);
        }

        private static class Db2SnapshotContext
        extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext<Db2Partition, Db2OffsetContext> {
            public Db2SnapshotContext(Db2Partition partition) throws SQLException {
                super(partition, "");
            }
        }
    }
}

