/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.NestedProjectedRowData;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.FileStoreSourceSplitReader;
import org.apache.paimon.flink.source.FileStoreSourceSplitState;
import org.apache.paimon.flink.source.FlinkRecordsWithSplitIds;
import org.apache.paimon.flink.source.ReaderConsumeProgressEvent;
import org.apache.paimon.flink.source.RecordLimiter;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.table.source.TableRead;

public class FileStoreSourceReader
extends SingleThreadMultiplexSourceReaderBase<BulkFormat.RecordIterator<RowData>, RowData, FileStoreSourceSplit, FileStoreSourceSplitState> {
    private final IOManager ioManager;
    private long lastConsumeSnapshotId = Long.MIN_VALUE;

    public FileStoreSourceReader(SourceReaderContext readerContext, TableRead tableRead, FileStoreSourceReaderMetrics metrics, IOManager ioManager, @Nullable Long limit, @Nullable NestedProjectedRowData rowData) {
        super(() -> new FileStoreSourceSplitReader(tableRead.withIOManager(ioManager), RecordLimiter.create(limit), metrics), (element, output, state) -> FlinkRecordsWithSplitIds.emitRecord(readerContext, (BulkFormat.RecordIterator<RowData>)element, (SourceOutput<RowData>)output, state, metrics, rowData), readerContext.getConfiguration(), readerContext);
        this.ioManager = ioManager;
    }

    public void start() {
        if (this.getNumberOfCurrentlyAssignedSplits() == 0) {
            this.context.sendSplitRequest();
        }
    }

    protected void onSplitFinished(Map<String, FileStoreSourceSplitState> finishedSplitIds) {
        long maxFinishedSplits;
        if (this.getNumberOfCurrentlyAssignedSplits() == 0) {
            this.context.sendSplitRequest();
        }
        if (this.lastConsumeSnapshotId < (maxFinishedSplits = finishedSplitIds.values().stream().map(splitState -> TableScanUtils.getSnapshotId(splitState.toSourceSplit())).filter(Optional::isPresent).mapToLong(Optional::get).max().orElse(Long.MIN_VALUE))) {
            this.lastConsumeSnapshotId = maxFinishedSplits;
            this.context.sendSourceEventToCoordinator((SourceEvent)new ReaderConsumeProgressEvent(this.lastConsumeSnapshotId));
        }
    }

    protected FileStoreSourceSplitState initializedState(FileStoreSourceSplit split) {
        return new FileStoreSourceSplitState(split);
    }

    protected FileStoreSourceSplit toSplitType(String splitId, FileStoreSourceSplitState splitState) {
        return splitState.toSourceSplit();
    }

    public void close() throws Exception {
        super.close();
        this.ioManager.close();
    }
}

