/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.base.source.assigner;

import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.assigner.AssignerStatus;
import org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner;
import org.apache.flink.cdc.connectors.base.source.assigner.SplitAssigner;
import org.apache.flink.cdc.connectors.base.source.assigner.state.HybridPendingSplitsState;
import org.apache.flink.cdc.connectors.base.source.assigner.state.PendingSplitsState;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.base.source.meta.split.SchemalessSnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.metrics.SourceEnumeratorMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HybridSplitAssigner<C extends SourceConfig>
implements SplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(HybridSplitAssigner.class);
    private static final String STREAM_SPLIT_ID = "stream-split";
    private final int splitMetaGroupSize;
    private final C sourceConfig;
    private boolean isStreamSplitAssigned;
    private final SnapshotSplitAssigner<C> snapshotSplitAssigner;
    private final OffsetFactory offsetFactory;
    private final SplitEnumeratorContext<? extends SourceSplit> enumeratorContext;
    private SourceEnumeratorMetrics enumeratorMetrics;

    public HybridSplitAssigner(C sourceConfig, int currentParallelism, List<TableId> remainingTables, boolean isTableIdCaseSensitive, DataSourceDialect<C> dialect, OffsetFactory offsetFactory, SplitEnumeratorContext<? extends SourceSplit> enumeratorContext) {
        this(sourceConfig, new SnapshotSplitAssigner<C>(sourceConfig, currentParallelism, remainingTables, isTableIdCaseSensitive, dialect, offsetFactory), false, sourceConfig.getSplitMetaGroupSize(), offsetFactory, enumeratorContext);
    }

    public HybridSplitAssigner(C sourceConfig, int currentParallelism, HybridPendingSplitsState checkpoint, DataSourceDialect<C> dialect, OffsetFactory offsetFactory, SplitEnumeratorContext<? extends SourceSplit> enumeratorContext) {
        this(sourceConfig, new SnapshotSplitAssigner<C>(sourceConfig, currentParallelism, checkpoint.getSnapshotPendingSplits(), dialect, offsetFactory), checkpoint.isStreamSplitAssigned(), sourceConfig.getSplitMetaGroupSize(), offsetFactory, enumeratorContext);
    }

    private HybridSplitAssigner(C sourceConfig, SnapshotSplitAssigner<C> snapshotSplitAssigner, boolean isStreamSplitAssigned, int splitMetaGroupSize, OffsetFactory offsetFactory, SplitEnumeratorContext<? extends SourceSplit> enumeratorContext) {
        this.sourceConfig = sourceConfig;
        this.snapshotSplitAssigner = snapshotSplitAssigner;
        this.isStreamSplitAssigned = isStreamSplitAssigned;
        this.splitMetaGroupSize = splitMetaGroupSize;
        this.offsetFactory = offsetFactory;
        this.enumeratorContext = enumeratorContext;
    }

    @Override
    public void open() {
        this.enumeratorMetrics = new SourceEnumeratorMetrics(this.enumeratorContext.metricGroup());
        if (this.isStreamSplitAssigned) {
            this.enumeratorMetrics.enterStreamReading();
        } else {
            this.enumeratorMetrics.exitStreamReading();
        }
        this.snapshotSplitAssigner.open();
        this.snapshotSplitAssigner.initEnumeratorMetrics(this.enumeratorMetrics);
    }

    @Override
    public Optional<SourceSplitBase> getNext() {
        if (AssignerStatus.isNewlyAddedAssigningSnapshotFinished(this.getAssignerStatus())) {
            return Optional.empty();
        }
        if (this.snapshotSplitAssigner.noMoreSplits()) {
            this.enumeratorMetrics.exitSnapshotPhase();
            if (this.isStreamSplitAssigned) {
                LOG.trace("No more splits for the SnapshotSplitAssigner. StreamSplit is already assigned.");
                return Optional.empty();
            }
            if (AssignerStatus.isInitialAssigningFinished(this.snapshotSplitAssigner.getAssignerStatus())) {
                this.isStreamSplitAssigned = true;
                this.enumeratorMetrics.enterStreamReading();
                StreamSplit streamSplit = this.createStreamSplit();
                LOG.trace("SnapshotSplitAssigner is finished: creating a new stream split {}", (Object)streamSplit);
                return Optional.of(streamSplit);
            }
            if (AssignerStatus.isNewlyAddedAssigningFinished(this.snapshotSplitAssigner.getAssignerStatus())) {
                this.isStreamSplitAssigned = true;
                this.enumeratorMetrics.enterStreamReading();
                return Optional.empty();
            }
            LOG.trace("Waiting for SnapshotSplitAssigner to be finished before assigning a new stream split.");
            return Optional.empty();
        }
        return this.snapshotSplitAssigner.getNext();
    }

    @Override
    public boolean waitingForFinishedSplits() {
        return this.snapshotSplitAssigner.waitingForFinishedSplits();
    }

    @Override
    public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
        return this.snapshotSplitAssigner.getFinishedSplitInfos();
    }

    @Override
    public void onFinishedSplits(Map<String, Offset> splitFinishedOffsets) {
        this.snapshotSplitAssigner.onFinishedSplits(splitFinishedOffsets);
    }

    @Override
    public void addSplits(Collection<SourceSplitBase> splits) {
        ArrayList<SourceSplitBase> snapshotSplits = new ArrayList<SourceSplitBase>();
        for (SourceSplitBase split : splits) {
            if (split.isSnapshotSplit()) {
                snapshotSplits.add(split);
                continue;
            }
            this.isStreamSplitAssigned = false;
        }
        if (!snapshotSplits.isEmpty()) {
            this.enumeratorMetrics.exitStreamReading();
        }
        this.snapshotSplitAssigner.addSplits(snapshotSplits);
    }

    @Override
    public PendingSplitsState snapshotState(long checkpointId) {
        return new HybridPendingSplitsState(this.snapshotSplitAssigner.snapshotState(checkpointId), this.isStreamSplitAssigned);
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
        this.snapshotSplitAssigner.notifyCheckpointComplete(checkpointId);
    }

    @Override
    public AssignerStatus getAssignerStatus() {
        return this.snapshotSplitAssigner.getAssignerStatus();
    }

    @Override
    public void startAssignNewlyAddedTables() {
        this.snapshotSplitAssigner.startAssignNewlyAddedTables();
    }

    @Override
    public void onStreamSplitUpdated() {
        this.snapshotSplitAssigner.onStreamSplitUpdated();
    }

    @Override
    public boolean noMoreSplits() {
        return this.snapshotSplitAssigner.noMoreSplits() && this.isStreamSplitAssigned;
    }

    @Override
    public void close() throws IOException {
        this.snapshotSplitAssigner.close();
    }

    public StreamSplit createStreamSplit() {
        List assignedSnapshotSplit = this.snapshotSplitAssigner.getAssignedSplits().values().stream().sorted(Comparator.comparing(SourceSplitBase::splitId)).collect(Collectors.toList());
        Map<String, Offset> splitFinishedOffsets = this.snapshotSplitAssigner.getSplitFinishedOffsets();
        ArrayList<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<FinishedSnapshotSplitInfo>();
        Offset minOffset = null;
        Offset maxOffset = null;
        for (SchemalessSnapshotSplit split : assignedSnapshotSplit) {
            Offset changeLogOffset = splitFinishedOffsets.get(split.splitId());
            if (minOffset == null || changeLogOffset.isBefore(minOffset)) {
                minOffset = changeLogOffset;
            }
            if (maxOffset == null || changeLogOffset.isAfter(maxOffset)) {
                maxOffset = changeLogOffset;
            }
            finishedSnapshotSplitInfos.add(new FinishedSnapshotSplitInfo(split.getTableId(), split.splitId(), split.getSplitStart(), split.getSplitEnd(), changeLogOffset, this.offsetFactory));
        }
        Offset stoppingOffset = this.offsetFactory.createNoStoppingOffset();
        if (this.sourceConfig.getStartupOptions().isSnapshotOnly()) {
            stoppingOffset = maxOffset;
        }
        boolean divideMetaToGroups = finishedSnapshotSplitInfos.size() > this.splitMetaGroupSize;
        return new StreamSplit(STREAM_SPLIT_ID, minOffset == null ? this.offsetFactory.createInitialOffset() : minOffset, stoppingOffset, divideMetaToGroups ? new ArrayList<FinishedSnapshotSplitInfo>() : finishedSnapshotSplitInfos, new HashMap<TableId, TableChanges.TableChange>(), finishedSnapshotSplitInfos.size());
    }
}

