/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.base.source.reader;

import java.util.List;
import java.util.TreeMap;
import java.util.function.Supplier;
import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitSerializer;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
public class IncrementalSourceReaderWithCommit
extends IncrementalSourceReader {
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalSourceReaderWithCommit.class);
    protected final TreeMap<Long, Offset> lastCheckpointOffsets = new TreeMap();
    private long maxCompletedCheckpointId = 0L;

    public IncrementalSourceReaderWithCommit(FutureCompletingBlockingQueue elementQueue, Supplier supplier, RecordEmitter recordEmitter, Configuration config, IncrementalSourceReaderContext incrementalSourceReaderContext, SourceConfig sourceConfig, SourceSplitSerializer sourceSplitSerializer, DataSourceDialect dialect) {
        super((FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>>)elementQueue, supplier, recordEmitter, config, incrementalSourceReaderContext, sourceConfig, sourceSplitSerializer, dialect);
    }

    @Override
    public List<SourceSplitBase> snapshotState(long checkpointId) {
        List<SourceSplitBase> stateSplits = super.snapshotState(checkpointId);
        stateSplits.stream().filter(SourceSplitBase::isStreamSplit).findAny().map(SourceSplitBase::asStreamSplit).ifPresent(streamSplit -> {
            this.lastCheckpointOffsets.put(checkpointId, streamSplit.getStartingOffset());
            LOG.debug("Starting offset of stream split is: {}, and checkpoint id is {}.", (Object)streamSplit.getStartingOffset(), (Object)checkpointId);
        });
        return stateSplits;
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (checkpointId > this.maxCompletedCheckpointId) {
            Offset offset = this.lastCheckpointOffsets.get(checkpointId);
            this.dialect.notifyCheckpointComplete(checkpointId, offset);
            this.lastCheckpointOffsets.headMap(checkpointId, true).clear();
            this.maxCompletedCheckpointId = checkpointId;
        }
    }
}

