/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.source.reader;

import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.events.BinlogSplitAssignedEvent;
import org.apache.flink.cdc.connectors.mysql.source.events.BinlogSplitMetaEvent;
import org.apache.flink.cdc.connectors.mysql.source.events.BinlogSplitMetaRequestEvent;
import org.apache.flink.cdc.connectors.mysql.source.events.BinlogSplitUpdateAckEvent;
import org.apache.flink.cdc.connectors.mysql.source.events.BinlogSplitUpdateRequestEvent;
import org.apache.flink.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsAckEvent;
import org.apache.flink.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsReportEvent;
import org.apache.flink.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsRequestEvent;
import org.apache.flink.cdc.connectors.mysql.source.events.LatestFinishedSplitsNumberEvent;
import org.apache.flink.cdc.connectors.mysql.source.events.LatestFinishedSplitsNumberRequestEvent;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReaderContext;
import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader;
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplitState;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplitState;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState;
import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
import org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlSourceReader<T>
extends SingleThreadMultiplexSourceReaderBase<SourceRecords, T, MySqlSplit, MySqlSplitState> {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceReader.class);
    private final MySqlSourceConfig sourceConfig;
    private final Map<String, MySqlSnapshotSplit> finishedUnackedSplits;
    private final Map<String, MySqlBinlogSplit> uncompletedBinlogSplits;
    private final int subtaskId;
    private final MySqlSourceReaderContext mySqlSourceReaderContext;
    private final MySqlPartition partition;
    private volatile MySqlBinlogSplit suspendedBinlogSplit;

    public MySqlSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementQueue, Supplier<MySqlSplitReader> splitReaderSupplier, RecordEmitter<SourceRecords, T, MySqlSplitState> recordEmitter, Configuration config, MySqlSourceReaderContext context, MySqlSourceConfig sourceConfig) {
        super(elementQueue, new SingleThreadFetcherManager(elementQueue, splitReaderSupplier::get), recordEmitter, config, context.getSourceReaderContext());
        this.sourceConfig = sourceConfig;
        this.finishedUnackedSplits = new HashMap<String, MySqlSnapshotSplit>();
        this.uncompletedBinlogSplits = new HashMap<String, MySqlBinlogSplit>();
        this.subtaskId = context.getSourceReaderContext().getIndexOfSubtask();
        this.mySqlSourceReaderContext = context;
        this.suspendedBinlogSplit = null;
        this.partition = new MySqlPartition(sourceConfig.getMySqlConnectorConfig().getLogicalName());
    }

    public void start() {
        if (this.getNumberOfCurrentlyAssignedSplits() <= 1) {
            this.context.sendSplitRequest();
        }
    }

    protected MySqlSplitState initializedState(MySqlSplit split) {
        if (split.isSnapshotSplit()) {
            return new MySqlSnapshotSplitState(split.asSnapshotSplit());
        }
        return new MySqlBinlogSplitState(split.asBinlogSplit());
    }

    public List<MySqlSplit> snapshotState(long checkpointId) {
        List stateSplits = super.snapshotState(checkpointId);
        List<MySqlSplit> unfinishedSplits = stateSplits.stream().filter(split -> !this.finishedUnackedSplits.containsKey(split.splitId())).collect(Collectors.toList());
        unfinishedSplits.addAll(this.finishedUnackedSplits.values());
        unfinishedSplits.addAll(this.uncompletedBinlogSplits.values());
        if (this.suspendedBinlogSplit != null) {
            unfinishedSplits.add(this.suspendedBinlogSplit);
        }
        this.logCurrentBinlogOffsets(unfinishedSplits, checkpointId);
        return unfinishedSplits;
    }

    protected void onSplitFinished(Map<String, MySqlSplitState> finishedSplitIds) {
        boolean requestNextSplit = true;
        if (this.isNewlyAddedTableSplitAndBinlogSplit(finishedSplitIds)) {
            MySqlSplitState mySqlBinlogSplitState = finishedSplitIds.remove("binlog-split");
            finishedSplitIds.values().forEach(newAddedSplitState -> this.finishedUnackedSplits.put(newAddedSplitState.toMySqlSplit().splitId(), newAddedSplitState.toMySqlSplit().asSnapshotSplit()));
            Preconditions.checkState((finishedSplitIds.values().size() == 1 ? 1 : 0) != 0);
            LOG.info("Source reader {} finished binlog split and snapshot split {}", (Object)this.subtaskId, (Object)finishedSplitIds.values().iterator().next().toMySqlSplit().splitId());
            this.addSplits(Collections.singletonList(mySqlBinlogSplitState.toMySqlSplit()));
        } else {
            Preconditions.checkState((finishedSplitIds.size() == 1 ? 1 : 0) != 0);
            for (MySqlSplitState mySqlSplitState : finishedSplitIds.values()) {
                MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit();
                if (mySqlSplit.isBinlogSplit()) {
                    if (!this.mySqlSourceReaderContext.isBinlogSplitReaderSuspended()) continue;
                    this.suspendedBinlogSplit = MySqlBinlogSplit.toSuspendedBinlogSplit(mySqlSplit.asBinlogSplit());
                    LOG.info("Source reader {} suspended binlog split reader success after the newly added table process, current offset {}", (Object)this.subtaskId, (Object)this.suspendedBinlogSplit.getStartingOffset());
                    this.context.sendSourceEventToCoordinator((SourceEvent)new LatestFinishedSplitsNumberRequestEvent());
                    requestNextSplit = false;
                    continue;
                }
                this.finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit());
            }
            this.reportFinishedSnapshotSplitsIfNeed();
        }
        if (requestNextSplit) {
            this.context.sendSplitRequest();
        }
    }

    private boolean isNewlyAddedTableSplitAndBinlogSplit(Map<String, MySqlSplitState> finishedSplitIds) {
        return finishedSplitIds.containsKey("binlog-split") && finishedSplitIds.size() == 2;
    }

    public void addSplits(List<MySqlSplit> splits) {
        this.addSplits(splits, true);
    }

    private void addSplits(List<MySqlSplit> splits, boolean checkTableChangeForBinlogSplit) {
        ArrayList<MySqlSplit> unfinishedSplits = new ArrayList<MySqlSplit>();
        for (MySqlSplit split : splits) {
            LOG.info("Source reader {} adds split {}", (Object)this.subtaskId, (Object)split);
            if (split.isSnapshotSplit()) {
                MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();
                if (this.sourceConfig.getTableFilter().test(split.asSnapshotSplit().getTableId())) {
                    if (snapshotSplit.isSnapshotReadFinished()) {
                        this.finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
                        continue;
                    }
                    unfinishedSplits.add(split);
                    continue;
                }
                if (this.sourceConfig.isScanNewlyAddedTableEnabled()) {
                    LOG.info("The subtask {} is skipping split {} because it does not match new table filter.", (Object)this.subtaskId, (Object)split.splitId());
                    continue;
                }
                LOG.warn("The subtask {} is skipping split {} because it does not match new table filter, but ScanNewlyAddedTable is not enabled.", (Object)this.subtaskId, (Object)split.splitId());
                continue;
            }
            MySqlBinlogSplit binlogSplit = split.asBinlogSplit();
            if (checkTableChangeForBinlogSplit) {
                binlogSplit = MySqlBinlogSplit.filterOutdatedSplitInfos(binlogSplit, this.sourceConfig.getMySqlConnectorConfig().getTableFilters().dataCollectionFilter());
            }
            boolean checkNewlyAddedTableSchema = !this.mySqlSourceReaderContext.isHasAssignedBinlogSplit() && this.sourceConfig.isScanNewlyAddedTableEnabled();
            this.mySqlSourceReaderContext.setHasAssignedBinlogSplit(true);
            if (binlogSplit.isSuspended()) {
                this.suspendedBinlogSplit = binlogSplit;
            } else if (!binlogSplit.isCompletedSplit()) {
                this.uncompletedBinlogSplits.put(binlogSplit.splitId(), binlogSplit);
                this.requestBinlogSplitMetaIfNeeded(binlogSplit);
            } else {
                this.uncompletedBinlogSplits.remove(binlogSplit.splitId());
                MySqlBinlogSplit mySqlBinlogSplit = this.discoverTableSchemasForBinlogSplit(binlogSplit, this.sourceConfig, checkNewlyAddedTableSchema);
                unfinishedSplits.add(mySqlBinlogSplit);
            }
            LOG.info("Source reader {} received the binlog split : {}.", (Object)this.subtaskId, (Object)binlogSplit);
            this.context.sendSourceEventToCoordinator((SourceEvent)new BinlogSplitAssignedEvent());
        }
        this.reportFinishedSnapshotSplitsIfNeed();
        if (!unfinishedSplits.isEmpty()) {
            super.addSplits(unfinishedSplits);
        } else if (this.suspendedBinlogSplit != null || this.getNumberOfCurrentlyAssignedSplits() <= 1) {
            this.context.sendSplitRequest();
        }
    }

    public void handleSourceEvents(SourceEvent sourceEvent) {
        if (sourceEvent instanceof FinishedSnapshotSplitsAckEvent) {
            FinishedSnapshotSplitsAckEvent ackEvent = (FinishedSnapshotSplitsAckEvent)sourceEvent;
            LOG.debug("Source reader {} receives ack event for {} from enumerator.", (Object)this.subtaskId, ackEvent.getFinishedSplits());
            for (String splitId : ackEvent.getFinishedSplits()) {
                this.finishedUnackedSplits.remove(splitId);
            }
        } else if (sourceEvent instanceof FinishedSnapshotSplitsRequestEvent) {
            LOG.debug("Source reader {} receives request to report finished snapshot splits.", (Object)this.subtaskId);
            this.reportFinishedSnapshotSplitsIfNeed();
        } else if (sourceEvent instanceof BinlogSplitMetaEvent) {
            LOG.debug("Source reader {} receives binlog meta with group id {}.", (Object)this.subtaskId, (Object)((BinlogSplitMetaEvent)sourceEvent).getMetaGroupId());
            this.fillMetadataForBinlogSplit((BinlogSplitMetaEvent)sourceEvent);
        } else if (sourceEvent instanceof BinlogSplitUpdateRequestEvent) {
            LOG.info("Source reader {} receives binlog split update event.", (Object)this.subtaskId);
            this.handleBinlogSplitUpdateRequest();
        } else if (sourceEvent instanceof LatestFinishedSplitsNumberEvent) {
            this.updateBinlogSplit((LatestFinishedSplitsNumberEvent)sourceEvent);
        } else {
            super.handleSourceEvents(sourceEvent);
        }
    }

    private void handleBinlogSplitUpdateRequest() {
        this.mySqlSourceReaderContext.suspendBinlogSplitReader();
    }

    private void updateBinlogSplit(LatestFinishedSplitsNumberEvent sourceEvent) {
        if (this.suspendedBinlogSplit != null) {
            int finishedSplitsSize = sourceEvent.getLatestFinishedSplitsNumber();
            MySqlBinlogSplit binlogSplit = MySqlBinlogSplit.toNormalBinlogSplit(this.suspendedBinlogSplit, finishedSplitsSize);
            this.suspendedBinlogSplit = null;
            this.addSplits(Collections.singletonList(binlogSplit), false);
            this.context.sendSourceEventToCoordinator((SourceEvent)new BinlogSplitUpdateAckEvent());
            LOG.info("Source reader {} notifies enumerator that binlog split has been updated.", (Object)this.subtaskId);
            this.mySqlSourceReaderContext.wakeupSuspendedBinlogSplitReader();
            LOG.info("Source reader {} wakes up suspended binlog reader as binlog split has been updated.", (Object)this.subtaskId);
        } else {
            LOG.warn("Unexpected event {}, this should not happen.", (Object)sourceEvent);
        }
    }

    private void reportFinishedSnapshotSplitsIfNeed() {
        if (!this.finishedUnackedSplits.isEmpty()) {
            HashMap<String, BinlogOffset> finishedOffsets = new HashMap<String, BinlogOffset>();
            for (MySqlSnapshotSplit split : this.finishedUnackedSplits.values()) {
                finishedOffsets.put(split.splitId(), split.getHighWatermark());
            }
            FinishedSnapshotSplitsReportEvent reportEvent = new FinishedSnapshotSplitsReportEvent(finishedOffsets);
            this.context.sendSourceEventToCoordinator((SourceEvent)reportEvent);
            LOG.debug("Source reader {} reports offsets of finished snapshot splits {}.", (Object)this.subtaskId, finishedOffsets);
        }
    }

    private void requestBinlogSplitMetaIfNeeded(MySqlBinlogSplit binlogSplit) {
        String splitId = binlogSplit.splitId();
        if (!binlogSplit.isCompletedSplit()) {
            int nextMetaGroupId = ChunkUtils.getNextMetaGroupId(binlogSplit.getFinishedSnapshotSplitInfos().size(), this.sourceConfig.getSplitMetaGroupSize());
            BinlogSplitMetaRequestEvent splitMetaRequestEvent = new BinlogSplitMetaRequestEvent(splitId, nextMetaGroupId, binlogSplit.getTotalFinishedSplitSize());
            this.context.sendSourceEventToCoordinator((SourceEvent)splitMetaRequestEvent);
        } else {
            LOG.info("Source reader {} collects meta of binlog split success", (Object)this.subtaskId);
            this.addSplits(Collections.singletonList(binlogSplit));
        }
    }

    private void fillMetadataForBinlogSplit(BinlogSplitMetaEvent metadataEvent) {
        MySqlBinlogSplit binlogSplit = this.uncompletedBinlogSplits.get(metadataEvent.getSplitId());
        if (binlogSplit != null) {
            int receivedMetaGroupId = metadataEvent.getMetaGroupId();
            int receivedTotalFinishedSplitSize = metadataEvent.getTotalFinishedSplitSize();
            int expectedMetaGroupId = ChunkUtils.getNextMetaGroupId(binlogSplit.getFinishedSnapshotSplitInfos().size(), this.sourceConfig.getSplitMetaGroupSize());
            if (receivedTotalFinishedSplitSize < binlogSplit.getTotalFinishedSplitSize()) {
                LOG.warn("Source reader {} receives out of bound finished split size. The received finished split size is {}, but expected is {}, truncate it", new Object[]{this.subtaskId, receivedTotalFinishedSplitSize, binlogSplit.getTotalFinishedSplitSize()});
                binlogSplit = MySqlBinlogSplit.toNormalBinlogSplit(binlogSplit, receivedTotalFinishedSplitSize);
                this.uncompletedBinlogSplits.put(binlogSplit.splitId(), binlogSplit);
            } else if (receivedMetaGroupId == expectedMetaGroupId) {
                Set<String> existedSplitsOfLastGroup = this.getExistedSplitsOfLastGroup(binlogSplit.getFinishedSnapshotSplitInfos(), this.sourceConfig.getSplitMetaGroupSize());
                List<FinishedSnapshotSplitInfo> newAddedMetadataGroup = metadataEvent.getMetaGroup().stream().map(FinishedSnapshotSplitInfo::deserialize).filter(r -> !existedSplitsOfLastGroup.contains(r.getSplitId())).collect(Collectors.toList());
                this.uncompletedBinlogSplits.put(binlogSplit.splitId(), MySqlBinlogSplit.appendFinishedSplitInfos(binlogSplit, newAddedMetadataGroup));
                LOG.info("Source reader {} fills metadata of group {} to binlog split", (Object)this.subtaskId, (Object)newAddedMetadataGroup.size());
            } else {
                LOG.warn("Source reader {} receives out of oder binlog meta event for split {}, the received meta group id is {}, but expected is {}, ignore it", new Object[]{this.subtaskId, metadataEvent.getSplitId(), receivedMetaGroupId, expectedMetaGroupId});
            }
            this.requestBinlogSplitMetaIfNeeded(this.uncompletedBinlogSplits.get(binlogSplit.splitId()));
        } else {
            LOG.warn("Source reader {} receives binlog meta event for split {}, but the uncompleted split map does not contain it", (Object)this.subtaskId, (Object)metadataEvent.getSplitId());
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private MySqlBinlogSplit discoverTableSchemasForBinlogSplit(MySqlBinlogSplit split, MySqlSourceConfig sourceConfig, boolean checkNewlyAddedTableSchema) {
        if (!split.getTableSchemas().isEmpty() && !checkNewlyAddedTableSchema) {
            LOG.warn("Source reader {} skip the table schema discovery, the binlog split {} has table schemas yet.", (Object)this.subtaskId, (Object)split);
            return split;
        }
        try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig);){
            Map<TableId, TableChanges.TableChange> tableSchemas;
            if (split.getTableSchemas().isEmpty()) {
                tableSchemas = TableDiscoveryUtils.discoverSchemaForCapturedTables(this.partition, sourceConfig, jdbc);
                LOG.info("Source reader {} discovers table schema for binlog split {} success", (Object)this.subtaskId, (Object)split.splitId());
            } else {
                ArrayList<TableId> existedTables = new ArrayList<TableId>(split.getTableSchemas().keySet());
                tableSchemas = TableDiscoveryUtils.discoverSchemaForNewAddedTables(this.partition, existedTables, sourceConfig, jdbc);
                LOG.info("Source reader {} discovers table schema for new added tables of binlog split {} success", (Object)this.subtaskId, (Object)split.splitId());
            }
            MySqlBinlogSplit mySqlBinlogSplit = MySqlBinlogSplit.fillTableSchemas(split, tableSchemas);
            return mySqlBinlogSplit;
        }
        catch (SQLException e) {
            LOG.error("Source reader {} failed to obtains table schemas due to {}", (Object)this.subtaskId, (Object)e.getMessage());
            throw new FlinkRuntimeException((Throwable)e);
        }
    }

    private Set<String> getExistedSplitsOfLastGroup(List<FinishedSnapshotSplitInfo> finishedSnapshotSplits, int metaGroupSize) {
        int splitsNumOfLastGroup = finishedSnapshotSplits.size() % this.sourceConfig.getSplitMetaGroupSize();
        if (splitsNumOfLastGroup != 0) {
            int lastGroupStart = finishedSnapshotSplits.size() / this.sourceConfig.getSplitMetaGroupSize() * metaGroupSize;
            List sortedFinishedSnapshotSplits = finishedSnapshotSplits.stream().map(FinishedSnapshotSplitInfo::getSplitId).sorted().collect(Collectors.toList());
            return new HashSet<String>(sortedFinishedSnapshotSplits.subList(lastGroupStart, lastGroupStart + splitsNumOfLastGroup));
        }
        return new HashSet<String>();
    }

    private void logCurrentBinlogOffsets(List<MySqlSplit> splits, long checkpointId) {
        if (!LOG.isInfoEnabled()) {
            return;
        }
        for (MySqlSplit split : splits) {
            if (!split.isBinlogSplit()) {
                return;
            }
            BinlogOffset offset = split.asBinlogSplit().getStartingOffset();
            LOG.info("Binlog offset on checkpoint {}: {}", (Object)checkpointId, (Object)offset);
        }
    }

    protected MySqlSplit toSplitType(String splitId, MySqlSplitState splitState) {
        return splitState.toMySqlSplit();
    }

    @VisibleForTesting
    public Map<String, MySqlSnapshotSplit> getFinishedUnackedSplits() {
        return this.finishedUnackedSplits;
    }
}

