/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.base.source.meta.split;

import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamSplit
extends SourceSplitBase {
    private static final Logger LOG = LoggerFactory.getLogger(StreamSplit.class);
    public static final String STREAM_SPLIT_ID = "stream-split";
    private final Offset startingOffset;
    private final Offset endingOffset;
    private final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos;
    private final Map<TableId, TableChanges.TableChange> tableSchemas;
    private final int totalFinishedSplitSize;
    private final boolean isSuspended;
    private final boolean isSnapshotCompleted;
    @Nullable
    transient byte[] serializedFormCache;

    public StreamSplit(String splitId, Offset startingOffset, Offset endingOffset, List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos, Map<TableId, TableChanges.TableChange> tableSchemas, int totalFinishedSplitSize, boolean isSuspended, boolean isSnapshotCompleted) {
        super(splitId);
        this.startingOffset = startingOffset;
        this.endingOffset = endingOffset;
        this.finishedSnapshotSplitInfos = finishedSnapshotSplitInfos;
        this.tableSchemas = tableSchemas;
        this.totalFinishedSplitSize = totalFinishedSplitSize;
        this.isSuspended = isSuspended;
        this.isSnapshotCompleted = isSnapshotCompleted;
    }

    public StreamSplit(String splitId, Offset startingOffset, Offset endingOffset, List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos, Map<TableId, TableChanges.TableChange> tableSchemas, int totalFinishedSplitSize) {
        super(splitId);
        this.startingOffset = startingOffset;
        this.endingOffset = endingOffset;
        this.finishedSnapshotSplitInfos = finishedSnapshotSplitInfos;
        this.tableSchemas = tableSchemas;
        this.totalFinishedSplitSize = totalFinishedSplitSize;
        this.isSuspended = false;
        this.isSnapshotCompleted = false;
    }

    public Offset getStartingOffset() {
        return this.startingOffset;
    }

    public Offset getEndingOffset() {
        return this.endingOffset;
    }

    public List<FinishedSnapshotSplitInfo> getFinishedSnapshotSplitInfos() {
        return this.finishedSnapshotSplitInfos;
    }

    @Override
    public Map<TableId, TableChanges.TableChange> getTableSchemas() {
        return this.tableSchemas;
    }

    public int getTotalFinishedSplitSize() {
        return this.totalFinishedSplitSize;
    }

    public boolean isCompletedSplit() {
        return this.totalFinishedSplitSize == this.finishedSnapshotSplitInfos.size();
    }

    public boolean isSuspended() {
        return this.isSuspended;
    }

    public boolean isSnapshotCompleted() {
        return this.isSnapshotCompleted;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof StreamSplit)) {
            return false;
        }
        if (!super.equals(o)) {
            return false;
        }
        StreamSplit that = (StreamSplit)o;
        return this.isSuspended == that.isSuspended && this.isSnapshotCompleted == that.isSnapshotCompleted && this.totalFinishedSplitSize == that.totalFinishedSplitSize && Objects.equals(this.startingOffset, that.startingOffset) && Objects.equals(this.endingOffset, that.endingOffset) && Objects.equals(this.finishedSnapshotSplitInfos, that.finishedSnapshotSplitInfos) && Objects.equals(this.tableSchemas, that.tableSchemas);
    }

    @Override
    public int hashCode() {
        return Objects.hash(super.hashCode(), this.startingOffset, this.endingOffset, this.finishedSnapshotSplitInfos, this.tableSchemas, this.totalFinishedSplitSize, this.isSuspended, this.isSnapshotCompleted);
    }

    public String toString() {
        return "StreamSplit{splitId='" + this.splitId + '\'' + ", offset=" + this.startingOffset + ", endOffset=" + this.endingOffset + ", isSuspended=" + this.isSuspended + ", isSnapshotCompleted=" + this.isSnapshotCompleted + '}';
    }

    public static StreamSplit appendFinishedSplitInfos(StreamSplit streamSplit, List<FinishedSnapshotSplitInfo> splitInfos) {
        Offset startingOffset = streamSplit.getStartingOffset();
        for (FinishedSnapshotSplitInfo splitInfo : splitInfos) {
            if (!splitInfo.getHighWatermark().isBefore(startingOffset)) continue;
            startingOffset = splitInfo.getHighWatermark();
        }
        splitInfos.addAll(streamSplit.getFinishedSnapshotSplitInfos());
        return new StreamSplit(streamSplit.splitId, startingOffset, streamSplit.getEndingOffset(), splitInfos, streamSplit.getTableSchemas(), streamSplit.getTotalFinishedSplitSize(), streamSplit.isSuspended(), streamSplit.isSnapshotCompleted());
    }

    public static StreamSplit filterOutdatedSplitInfos(StreamSplit streamSplit, Predicate<TableId> currentTableFilter) {
        Set tablesToRemove = streamSplit.getFinishedSnapshotSplitInfos().stream().filter(i -> !currentTableFilter.test(i.getTableId())).map(split -> split.getTableId()).collect(Collectors.toSet());
        if (tablesToRemove.isEmpty()) {
            return streamSplit;
        }
        LOG.info("Reader remove tables after restart: {}", tablesToRemove);
        List<FinishedSnapshotSplitInfo> allFinishedSnapshotSplitInfos = streamSplit.getFinishedSnapshotSplitInfos().stream().filter(i -> !tablesToRemove.contains(i.getTableId())).collect(Collectors.toList());
        Map<TableId, TableChanges.TableChange> previousTableSchemas = streamSplit.getTableSchemas();
        HashMap<TableId, TableChanges.TableChange> newTableSchemas = new HashMap<TableId, TableChanges.TableChange>();
        previousTableSchemas.keySet().stream().forEach(tableId -> {
            if (currentTableFilter.test((TableId)tableId)) {
                newTableSchemas.put((TableId)tableId, (TableChanges.TableChange)previousTableSchemas.get(tableId));
            }
        });
        return new StreamSplit(streamSplit.splitId, streamSplit.getStartingOffset(), streamSplit.getEndingOffset(), allFinishedSnapshotSplitInfos, newTableSchemas, streamSplit.getTotalFinishedSplitSize() - (streamSplit.getFinishedSnapshotSplitInfos().size() - allFinishedSnapshotSplitInfos.size()), streamSplit.isSuspended(), streamSplit.isSnapshotCompleted());
    }

    public static StreamSplit fillTableSchemas(StreamSplit streamSplit, Map<TableId, TableChanges.TableChange> tableSchemas) {
        tableSchemas.putAll(streamSplit.getTableSchemas());
        return new StreamSplit(streamSplit.splitId, streamSplit.getStartingOffset(), streamSplit.getEndingOffset(), streamSplit.getFinishedSnapshotSplitInfos(), tableSchemas, streamSplit.getTotalFinishedSplitSize(), streamSplit.isSuspended(), streamSplit.isSnapshotCompleted());
    }

    public static StreamSplit toNormalStreamSplit(StreamSplit suspendedStreamSplit, int totalFinishedSplitSize) {
        return new StreamSplit(suspendedStreamSplit.splitId, suspendedStreamSplit.getStartingOffset(), suspendedStreamSplit.getEndingOffset(), suspendedStreamSplit.getFinishedSnapshotSplitInfos(), suspendedStreamSplit.getTableSchemas(), totalFinishedSplitSize, false, suspendedStreamSplit.isSnapshotCompleted());
    }

    public static StreamSplit toSuspendedStreamSplit(StreamSplit normalStreamSplit) {
        return new StreamSplit(normalStreamSplit.splitId, normalStreamSplit.getStartingOffset(), normalStreamSplit.getEndingOffset(), StreamSplit.forwardHighWatermarkToStartingOffset(normalStreamSplit.getFinishedSnapshotSplitInfos(), normalStreamSplit.getStartingOffset()), normalStreamSplit.getTableSchemas(), normalStreamSplit.getTotalFinishedSplitSize(), true, normalStreamSplit.isSnapshotCompleted());
    }

    private static List<FinishedSnapshotSplitInfo> forwardHighWatermarkToStartingOffset(List<FinishedSnapshotSplitInfo> existedSplitInfos, Offset currentReadingOffset) {
        ArrayList<FinishedSnapshotSplitInfo> updatedSnapshotSplitInfos = new ArrayList<FinishedSnapshotSplitInfo>();
        for (FinishedSnapshotSplitInfo existedSplitInfo : existedSplitInfos) {
            if (existedSplitInfo.getHighWatermark().isBefore(currentReadingOffset)) {
                FinishedSnapshotSplitInfo forwardHighWatermarkSnapshotSplitInfo = new FinishedSnapshotSplitInfo(existedSplitInfo.getTableId(), existedSplitInfo.getSplitId(), existedSplitInfo.getSplitStart(), existedSplitInfo.getSplitEnd(), currentReadingOffset, existedSplitInfo.getOffsetFactory());
                updatedSnapshotSplitInfos.add(forwardHighWatermarkSnapshotSplitInfo);
                continue;
            }
            updatedSnapshotSplitInfos.add(existedSplitInfo);
        }
        return updatedSnapshotSplitInfos;
    }
}

