/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.postgres.source.fetch;

import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresObjectUtils;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresPartition;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.Utils;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.SnapshotChangeRecordEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.ColumnUtils;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Objects;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext;
import org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask;
import org.apache.flink.cdc.connectors.postgres.source.fetch.StoppableChangeEventSourceContext;
import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetUtils;
import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresQueryUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresScanFetchTask
extends AbstractScanFetchTask {
    private static final Logger LOG = LoggerFactory.getLogger(PostgresScanFetchTask.class);

    public PostgresScanFetchTask(SnapshotSplit split) {
        super(split);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void execute(FetchTask.Context context) throws Exception {
        PostgresSourceFetchTaskContext ctx = (PostgresSourceFetchTaskContext)context;
        PostgresSourceConfig sourceConfig = (PostgresSourceConfig)context.getSourceConfig();
        try {
            this.maybeCreateSlotForBackFillReadTask(ctx.getConnection(), ctx.getReplicationConnection(), sourceConfig.getSlotNameForBackfillTask(), ctx.getPluginName(), sourceConfig.isSkipSnapshotBackfill());
            super.execute(context);
        }
        finally {
            this.maybeDropSlotForBackFillReadTask((PostgresReplicationConnection)ctx.getReplicationConnection(), sourceConfig.isSkipSnapshotBackfill());
        }
    }

    @Override
    protected void executeDataSnapshot(FetchTask.Context context) throws Exception {
        StoppableChangeEventSourceContext changeEventSourceContext;
        PostgresSourceFetchTaskContext ctx = (PostgresSourceFetchTaskContext)context;
        PostgresSnapshotSplitReadTask snapshotSplitReadTask = new PostgresSnapshotSplitReadTask(ctx.getConnection(), ctx.getDbzConnectorConfig(), ctx.getDatabaseSchema(), ctx.getOffsetContext(), ctx.getDispatcher(), ctx.getSnapshotChangeEventSourceMetrics(), this.snapshotSplit);
        SnapshotResult<PostgresOffsetContext> snapshotResult = snapshotSplitReadTask.execute(changeEventSourceContext = new StoppableChangeEventSourceContext(), ctx.getPartition(), ctx.getOffsetContext());
        if (!snapshotResult.isCompletedOrSkipped()) {
            this.taskRunning = false;
            throw new IllegalStateException(String.format("Read snapshot for postgres split %s fail", snapshotResult));
        }
    }

    @Override
    protected void executeBackfillTask(FetchTask.Context context, StreamSplit backfillStreamSplit) throws Exception {
        PostgresSourceFetchTaskContext ctx = (PostgresSourceFetchTaskContext)context;
        PostgresOffsetContext.Loader loader = new PostgresOffsetContext.Loader(ctx.getDbzConnectorConfig());
        PostgresOffsetContext postgresOffsetContext = PostgresOffsetUtils.getPostgresOffsetContext(loader, backfillStreamSplit.getStartingOffset());
        PostgresStreamFetchTask.StreamSplitReadTask backfillReadTask = new PostgresStreamFetchTask.StreamSplitReadTask(ctx.getDbzConnectorConfig(), ctx.getSnapShotter(), ctx.getConnection(), ctx.getDispatcher(), ctx.getPostgresDispatcher(), ctx.getErrorHandler(), ctx.getTaskContext().getClock(), ctx.getDatabaseSchema(), ctx.getTaskContext(), ctx.getReplicationConnection(), backfillStreamSplit);
        LOG.info("Execute backfillReadTask for split {} with slot name {}", (Object)this.snapshotSplit, (Object)((PostgresSourceConfig)ctx.getSourceConfig()).getSlotNameForBackfillTask());
        backfillReadTask.execute((ChangeEventSource.ChangeEventSourceContext)new StoppableChangeEventSourceContext(), ctx.getPartition(), postgresOffsetContext);
    }

    private void maybeCreateSlotForBackFillReadTask(PostgresConnection jdbcConnection, ReplicationConnection replicationConnection, String slotName, String pluginName, boolean skipSnapshotBackfill) {
        if (skipSnapshotBackfill) {
            return;
        }
        try {
            SlotState slotInfo = null;
            try {
                slotInfo = jdbcConnection.getReplicationSlotState(slotName, pluginName);
            }
            catch (SQLException e) {
                LOG.info("Unable to load info of replication slot, will try to create the slot");
            }
            if (slotInfo == null) {
                try {
                    replicationConnection.createReplicationSlot().orElse(null);
                }
                catch (SQLException ex) {
                    String message = "Creation of replication slot failed";
                    if (ex.getMessage().contains("already exists")) {
                        message = message + "; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.";
                    }
                    throw new FlinkRuntimeException(message, (Throwable)ex);
                }
            }
            PostgresObjectUtils.waitForReplicationSlotReady(30, jdbcConnection, slotName, pluginName);
        }
        catch (Throwable t) {
            throw new FlinkRuntimeException(t);
        }
    }

    private void maybeDropSlotForBackFillReadTask(PostgresReplicationConnection replicationConnection, boolean skipSnapshotBackfill) {
        if (skipSnapshotBackfill) {
            return;
        }
        try {
            replicationConnection.close(true);
        }
        catch (Throwable t) {
            LOG.info("here exception occurs");
            throw new FlinkRuntimeException(t);
        }
    }

    public static class PostgresSnapshotSplitReadTask
    extends AbstractSnapshotChangeEventSource<PostgresPartition, PostgresOffsetContext> {
        private static final Logger LOG = LoggerFactory.getLogger(PostgresSnapshotSplitReadTask.class);
        private final PostgresConnection jdbcConnection;
        private final PostgresConnectorConfig connectorConfig;
        private final JdbcSourceEventDispatcher<PostgresPartition> dispatcher;
        private final SnapshotSplit snapshotSplit;
        private final PostgresOffsetContext offsetContext;
        private final PostgresSchema databaseSchema;
        private final SnapshotProgressListener<PostgresPartition> snapshotProgressListener;
        private final Clock clock;

        public PostgresSnapshotSplitReadTask(PostgresConnection jdbcConnection, PostgresConnectorConfig connectorConfig, PostgresSchema databaseSchema, PostgresOffsetContext previousOffset, JdbcSourceEventDispatcher dispatcher, SnapshotProgressListener snapshotProgressListener, SnapshotSplit snapshotSplit) {
            super(connectorConfig, snapshotProgressListener);
            this.jdbcConnection = jdbcConnection;
            this.connectorConfig = connectorConfig;
            this.snapshotProgressListener = snapshotProgressListener;
            this.databaseSchema = databaseSchema;
            this.dispatcher = dispatcher;
            this.snapshotSplit = snapshotSplit;
            this.offsetContext = previousOffset;
            this.clock = Clock.SYSTEM;
        }

        @Override
        protected SnapshotResult<PostgresOffsetContext> doExecute(ChangeEventSource.ChangeEventSourceContext context, PostgresOffsetContext previousOffset, AbstractSnapshotChangeEventSource.SnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext, AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask) throws Exception {
            PostgresSnapshotContext ctx = (PostgresSnapshotContext)snapshotContext;
            ctx.offset = this.offsetContext;
            Utils.refreshSchema(this.databaseSchema, this.jdbcConnection, true);
            this.createDataEvents(ctx, this.snapshotSplit.getTableId());
            return SnapshotResult.completed(ctx.offset);
        }

        private void createDataEvents(PostgresSnapshotContext snapshotContext, TableId tableId) throws InterruptedException {
            EventDispatcher.SnapshotReceiver<PostgresPartition> snapshotReceiver = this.dispatcher.getSnapshotChangeEventReceiver();
            LOG.info("Snapshotting table {}", (Object)tableId);
            this.createDataEventsForTable(snapshotContext, snapshotReceiver, Objects.requireNonNull(this.databaseSchema.tableFor(tableId)));
            snapshotReceiver.completeSnapshot();
        }

        private void createDataEventsForTable(PostgresSnapshotContext snapshotContext, EventDispatcher.SnapshotReceiver<PostgresPartition> snapshotReceiver, Table table) throws InterruptedException {
            long exportStart = this.clock.currentTimeInMillis();
            LOG.info("Exporting data from split '{}' of table {}", (Object)this.snapshotSplit.splitId(), (Object)table.id());
            String selectSql = PostgresQueryUtils.buildSplitScanQuery(this.snapshotSplit.getTableId(), this.snapshotSplit.getSplitKeyType(), this.snapshotSplit.getSplitStart() == null, this.snapshotSplit.getSplitEnd() == null);
            LOG.debug("For split '{}' of table {} using select statement: '{}'", new Object[]{this.snapshotSplit.splitId(), table.id(), selectSql});
            try (PreparedStatement selectStatement = PostgresQueryUtils.readTableSplitDataStatement(this.jdbcConnection, selectSql, this.snapshotSplit.getSplitStart() == null, this.snapshotSplit.getSplitEnd() == null, this.snapshotSplit.getSplitStart(), this.snapshotSplit.getSplitEnd(), this.snapshotSplit.getSplitKeyType().getFieldCount(), this.connectorConfig.getQueryFetchSize());
                 ResultSet rs = selectStatement.executeQuery();){
                ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table);
                long rows = 0L;
                Threads.Timer logTimer = this.getTableScanLogTimer();
                while (rs.next()) {
                    ++rows;
                    Object[] row = new Object[columnArray.getGreatestColumnPosition()];
                    for (int i = 0; i < columnArray.getColumns().length; ++i) {
                        row[columnArray.getColumns()[i].position() - 1] = rs.getObject(i + 1);
                    }
                    if (logTimer.expired()) {
                        long stop = this.clock.currentTimeInMillis();
                        LOG.info("Exported {} records for split '{}' after {}", new Object[]{rows, this.snapshotSplit.splitId(), Strings.duration(stop - exportStart)});
                        this.snapshotProgressListener.rowsScanned((PostgresPartition)snapshotContext.partition, table.id(), rows);
                        logTimer = this.getTableScanLogTimer();
                    }
                    ((PostgresOffsetContext)snapshotContext.offset).event(table.id(), this.clock.currentTime());
                    SnapshotChangeRecordEmitter<Partition> emitter = new SnapshotChangeRecordEmitter<Partition>(snapshotContext.partition, snapshotContext.offset, row, this.clock);
                    this.dispatcher.dispatchSnapshotEvent((PostgresPartition)snapshotContext.partition, table.id(), (ChangeRecordEmitter<PostgresPartition>)emitter, snapshotReceiver);
                }
                LOG.info("Finished exporting {} records for split '{}', total duration '{}'", new Object[]{rows, this.snapshotSplit.splitId(), Strings.duration(this.clock.currentTimeInMillis() - exportStart)});
            }
            catch (SQLException e) {
                throw new FlinkRuntimeException("Snapshotting of table " + table.id() + " failed", (Throwable)e);
            }
        }

        private Threads.Timer getTableScanLogTimer() {
            return Threads.timer(this.clock, LOG_INTERVAL);
        }

        @Override
        protected AbstractSnapshotChangeEventSource.SnapshottingTask getSnapshottingTask(PostgresPartition partition, PostgresOffsetContext previousOffset) {
            return new AbstractSnapshotChangeEventSource.SnapshottingTask(false, true);
        }

        protected PostgresSnapshotContext prepare(PostgresPartition partition) throws Exception {
            return new PostgresSnapshotContext(partition);
        }

        private static class PostgresSnapshotContext
        extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> {
            public PostgresSnapshotContext(PostgresPartition partition) throws SQLException {
                super(partition, "");
            }
        }
    }
}

