/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.source.reader;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import io.debezium.connector.mysql.MySqlConnection;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader;
import org.apache.flink.cdc.connectors.mysql.debezium.reader.DebeziumReader;
import org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader;
import org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReaderContext;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlRecords;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
import org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlSplitReader
implements SplitReader<SourceRecords, MySqlSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlSplitReader.class);
    private final ArrayDeque<MySqlSnapshotSplit> snapshotSplits;
    private final ArrayDeque<MySqlBinlogSplit> binlogSplits;
    private final MySqlSourceConfig sourceConfig;
    private final int subtaskId;
    private final MySqlSourceReaderContext context;
    private final SnapshotPhaseHooks snapshotHooks;
    @Nullable
    private String currentSplitId;
    @Nullable
    private DebeziumReader<SourceRecords, MySqlSplit> currentReader;
    @Nullable
    private SnapshotSplitReader reusedSnapshotReader;
    @Nullable
    private BinlogSplitReader reusedBinlogReader;

    public MySqlSplitReader(MySqlSourceConfig sourceConfig, int subtaskId, MySqlSourceReaderContext context, SnapshotPhaseHooks snapshotHooks) {
        this.sourceConfig = sourceConfig;
        this.subtaskId = subtaskId;
        this.snapshotSplits = new ArrayDeque();
        this.binlogSplits = new ArrayDeque(1);
        this.context = context;
        this.snapshotHooks = snapshotHooks;
    }

    public RecordsWithSplitIds<SourceRecords> fetch() throws IOException {
        try {
            this.suspendBinlogReaderIfNeed();
            return this.pollSplitRecords();
        }
        catch (InterruptedException e) {
            LOG.warn("fetch data failed.", (Throwable)e);
            throw new IOException(e);
        }
    }

    private void suspendBinlogReaderIfNeed() {
        if (this.currentReader != null && this.currentReader instanceof BinlogSplitReader && this.context.isBinlogSplitReaderSuspended() && !this.currentReader.isFinished()) {
            ((BinlogSplitReader)this.currentReader).stopBinlogReadTask();
            LOG.info("Suspend binlog reader to wait the binlog split update.");
        }
    }

    private MySqlRecords pollSplitRecords() throws InterruptedException {
        if (this.currentReader == null) {
            if (this.binlogSplits.size() > 0) {
                MySqlSplit nextSplit = this.binlogSplits.poll();
                this.currentSplitId = nextSplit.splitId();
                this.currentReader = this.getBinlogSplitReader();
                this.currentReader.submitSplit(nextSplit);
            } else if (this.snapshotSplits.size() > 0) {
                MySqlSplit nextSplit = this.snapshotSplits.poll();
                this.currentSplitId = nextSplit.splitId();
                this.currentReader = this.getSnapshotSplitReader();
                this.currentReader.submitSplit(nextSplit);
            } else {
                LOG.info("No available split to read.");
            }
            Iterator<SourceRecords> dataIt = this.currentReader.pollSplitRecords();
            return dataIt == null ? this.finishedSplit() : this.forRecords(dataIt);
        }
        if (this.currentReader instanceof SnapshotSplitReader) {
            Iterator<SourceRecords> dataIt = this.currentReader.pollSplitRecords();
            if (dataIt != null) {
                MySqlRecords records;
                if (this.context.isHasAssignedBinlogSplit()) {
                    records = this.forNewAddedTableFinishedSplit(this.currentSplitId, dataIt);
                    this.closeSnapshotReader();
                    this.closeBinlogReader();
                } else {
                    records = this.forRecords(dataIt);
                    MySqlSplit nextSplit = this.snapshotSplits.poll();
                    if (nextSplit != null) {
                        this.currentSplitId = nextSplit.splitId();
                        this.currentReader.submitSplit(nextSplit);
                    } else {
                        this.closeSnapshotReader();
                    }
                }
                return records;
            }
            return this.finishedSplit();
        }
        if (this.currentReader instanceof BinlogSplitReader) {
            Iterator<SourceRecords> dataIt = this.currentReader.pollSplitRecords();
            if (dataIt != null) {
                MySqlSplit nextSplit = this.snapshotSplits.poll();
                if (nextSplit != null) {
                    this.closeBinlogReader();
                    LOG.info("It's turn to switch next fetch reader to snapshot split reader");
                    this.currentSplitId = nextSplit.splitId();
                    this.currentReader = this.getSnapshotSplitReader();
                    this.currentReader.submitSplit(nextSplit);
                }
                return MySqlRecords.forBinlogRecords("binlog-split", dataIt);
            }
            this.closeBinlogReader();
            return this.finishedSplit();
        }
        throw new IllegalStateException("Unsupported reader type.");
    }

    private MySqlRecords finishedSplit() {
        MySqlRecords finishedRecords = MySqlRecords.forFinishedSplit(this.currentSplitId);
        this.currentSplitId = null;
        return finishedRecords;
    }

    private MySqlRecords forRecords(Iterator<SourceRecords> dataIt) {
        if (this.currentReader instanceof SnapshotSplitReader) {
            MySqlRecords finishedRecords = MySqlRecords.forSnapshotRecords(this.currentSplitId, dataIt);
            this.closeSnapshotReader();
            return finishedRecords;
        }
        return MySqlRecords.forBinlogRecords(this.currentSplitId, dataIt);
    }

    private MySqlRecords forNewAddedTableFinishedSplit(String splitId, Iterator<SourceRecords> recordsForSplit) {
        HashSet<String> finishedSplits = new HashSet<String>();
        finishedSplits.add(splitId);
        finishedSplits.add("binlog-split");
        this.currentSplitId = null;
        return new MySqlRecords(splitId, recordsForSplit, finishedSplits);
    }

    public void handleSplitsChanges(SplitsChange<MySqlSplit> splitsChanges) {
        if (!(splitsChanges instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChanges.getClass()));
        }
        LOG.info("Handling split change {}", splitsChanges);
        for (MySqlSplit mySqlSplit : splitsChanges.splits()) {
            if (mySqlSplit.isSnapshotSplit()) {
                this.snapshotSplits.add(mySqlSplit.asSnapshotSplit());
                continue;
            }
            this.binlogSplits.add(mySqlSplit.asBinlogSplit());
        }
    }

    public void wakeUp() {
    }

    public void close() throws Exception {
        this.closeSnapshotReader();
        this.closeBinlogReader();
    }

    private SnapshotSplitReader getSnapshotSplitReader() {
        if (this.reusedSnapshotReader == null) {
            MySqlConnection jdbcConnection = DebeziumUtils.createMySqlConnection(this.sourceConfig);
            BinaryLogClient binaryLogClient = DebeziumUtils.createBinaryClient(this.sourceConfig.getDbzConfiguration());
            StatefulTaskContext statefulTaskContext = new StatefulTaskContext(this.sourceConfig, binaryLogClient, jdbcConnection);
            this.reusedSnapshotReader = new SnapshotSplitReader(statefulTaskContext, this.subtaskId, this.snapshotHooks);
        }
        return this.reusedSnapshotReader;
    }

    private BinlogSplitReader getBinlogSplitReader() {
        if (this.reusedBinlogReader == null) {
            MySqlConnection jdbcConnection = DebeziumUtils.createMySqlConnection(this.sourceConfig);
            BinaryLogClient binaryLogClient = DebeziumUtils.createBinaryClient(this.sourceConfig.getDbzConfiguration());
            StatefulTaskContext statefulTaskContext = new StatefulTaskContext(this.sourceConfig, binaryLogClient, jdbcConnection);
            this.reusedBinlogReader = new BinlogSplitReader(statefulTaskContext, this.subtaskId);
        }
        return this.reusedBinlogReader;
    }

    private void closeSnapshotReader() {
        if (this.reusedSnapshotReader != null) {
            LOG.debug("Close snapshot reader {}", (Object)this.reusedSnapshotReader.getClass().getCanonicalName());
            this.reusedSnapshotReader.close();
            if (this.reusedSnapshotReader == this.currentReader) {
                this.currentReader = null;
            }
            this.reusedSnapshotReader = null;
        }
    }

    private void closeBinlogReader() {
        if (this.reusedBinlogReader != null) {
            LOG.debug("Close binlog reader {}", (Object)this.reusedBinlogReader.getClass().getCanonicalName());
            this.reusedBinlogReader.close();
            if (this.reusedBinlogReader == this.currentReader) {
                this.currentReader = null;
            }
            this.reusedBinlogReader = null;
        }
    }
}

