/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.source.assigners;

import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.cdc.connectors.mysql.source.assigners.AssignerStatus;
import org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner;
import org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner;
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.HybridPendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlHybridSplitAssigner
implements MySqlSplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlHybridSplitAssigner.class);
    private static final String BINLOG_SPLIT_ID = "binlog-split";
    private final int splitMetaGroupSize;
    private final MySqlSourceConfig sourceConfig;
    private boolean isBinlogSplitAssigned;
    private final MySqlSnapshotSplitAssigner snapshotSplitAssigner;

    public MySqlHybridSplitAssigner(MySqlSourceConfig sourceConfig, int currentParallelism, List<TableId> remainingTables, boolean isTableIdCaseSensitive, SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
        this(sourceConfig, new MySqlSnapshotSplitAssigner(sourceConfig, currentParallelism, remainingTables, isTableIdCaseSensitive, enumeratorContext), false, sourceConfig.getSplitMetaGroupSize());
    }

    public MySqlHybridSplitAssigner(MySqlSourceConfig sourceConfig, int currentParallelism, HybridPendingSplitsState checkpoint, SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
        this(sourceConfig, new MySqlSnapshotSplitAssigner(sourceConfig, currentParallelism, checkpoint.getSnapshotPendingSplits(), enumeratorContext), checkpoint.isBinlogSplitAssigned(), sourceConfig.getSplitMetaGroupSize());
    }

    private MySqlHybridSplitAssigner(MySqlSourceConfig sourceConfig, MySqlSnapshotSplitAssigner snapshotSplitAssigner, boolean isBinlogSplitAssigned, int splitMetaGroupSize) {
        this.sourceConfig = sourceConfig;
        this.snapshotSplitAssigner = snapshotSplitAssigner;
        this.isBinlogSplitAssigned = isBinlogSplitAssigned;
        this.splitMetaGroupSize = splitMetaGroupSize;
    }

    @Override
    public void open() {
        this.snapshotSplitAssigner.open();
    }

    @Override
    public Optional<MySqlSplit> getNext() {
        if (AssignerStatus.isNewlyAddedAssigningSnapshotFinished(this.getAssignerStatus())) {
            return Optional.empty();
        }
        if (this.snapshotSplitAssigner.noMoreSplits()) {
            if (this.isBinlogSplitAssigned) {
                return Optional.empty();
            }
            if (AssignerStatus.isInitialAssigningFinished(this.snapshotSplitAssigner.getAssignerStatus())) {
                this.isBinlogSplitAssigned = true;
                return Optional.of(this.createBinlogSplit());
            }
            if (AssignerStatus.isNewlyAddedAssigningFinished(this.snapshotSplitAssigner.getAssignerStatus())) {
                this.isBinlogSplitAssigned = true;
                return Optional.empty();
            }
            return Optional.empty();
        }
        return this.snapshotSplitAssigner.getNext();
    }

    @Override
    public boolean waitingForFinishedSplits() {
        return this.snapshotSplitAssigner.waitingForFinishedSplits();
    }

    @Override
    public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
        return this.snapshotSplitAssigner.getFinishedSplitInfos();
    }

    @Override
    public void onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets) {
        this.snapshotSplitAssigner.onFinishedSplits(splitFinishedOffsets);
    }

    @Override
    public void addSplits(Collection<MySqlSplit> splits) {
        ArrayList<MySqlSplit> snapshotSplits = new ArrayList<MySqlSplit>();
        for (MySqlSplit split : splits) {
            if (split.isSnapshotSplit()) {
                snapshotSplits.add(split);
                continue;
            }
            this.isBinlogSplitAssigned = false;
        }
        this.snapshotSplitAssigner.addSplits(snapshotSplits);
    }

    @Override
    public PendingSplitsState snapshotState(long checkpointId) {
        return new HybridPendingSplitsState(this.snapshotSplitAssigner.snapshotState(checkpointId), this.isBinlogSplitAssigned);
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
        this.snapshotSplitAssigner.notifyCheckpointComplete(checkpointId);
    }

    @Override
    public AssignerStatus getAssignerStatus() {
        return this.snapshotSplitAssigner.getAssignerStatus();
    }

    @Override
    public boolean noMoreSplits() {
        return this.snapshotSplitAssigner.noMoreSplits() && this.isBinlogSplitAssigned;
    }

    @Override
    public void startAssignNewlyAddedTables() {
        this.snapshotSplitAssigner.startAssignNewlyAddedTables();
    }

    @Override
    public void onBinlogSplitUpdated() {
        this.snapshotSplitAssigner.onBinlogSplitUpdated();
    }

    @Override
    public void close() {
        this.snapshotSplitAssigner.close();
    }

    private MySqlBinlogSplit createBinlogSplit() {
        List assignedSnapshotSplit = this.snapshotSplitAssigner.getAssignedSplits().values().stream().sorted(Comparator.comparing(MySqlSplit::splitId)).collect(Collectors.toList());
        Map<String, BinlogOffset> splitFinishedOffsets = this.snapshotSplitAssigner.getSplitFinishedOffsets();
        ArrayList<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<FinishedSnapshotSplitInfo>();
        BinlogOffset minBinlogOffset = null;
        BinlogOffset maxBinlogOffset = null;
        for (MySqlSchemalessSnapshotSplit split : assignedSnapshotSplit) {
            BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
            if (minBinlogOffset == null || binlogOffset.isBefore(minBinlogOffset)) {
                minBinlogOffset = binlogOffset;
            }
            if (maxBinlogOffset == null || binlogOffset.isAfter(maxBinlogOffset)) {
                maxBinlogOffset = binlogOffset;
            }
            finishedSnapshotSplitInfos.add(new FinishedSnapshotSplitInfo(split.getTableId(), split.splitId(), split.getSplitStart(), split.getSplitEnd(), binlogOffset));
        }
        BinlogOffset stoppingOffset = BinlogOffset.ofNonStopping();
        if (this.sourceConfig.getStartupOptions().isSnapshotOnly()) {
            stoppingOffset = maxBinlogOffset;
        }
        boolean divideMetaToGroups = finishedSnapshotSplitInfos.size() > this.splitMetaGroupSize;
        return new MySqlBinlogSplit(BINLOG_SPLIT_ID, minBinlogOffset == null ? BinlogOffset.ofEarliest() : minBinlogOffset, stoppingOffset, divideMetaToGroups ? new ArrayList<FinishedSnapshotSplitInfo>() : finishedSnapshotSplitInfos, new HashMap<TableId, TableChanges.TableChange>(), finishedSnapshotSplitInfos.size());
    }
}

