/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.util.Collections;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.util.RecordAndPosition;
import org.apache.flink.metrics.Counter;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.flink.NestedProjectedRowData;
import org.apache.paimon.flink.source.FileStoreSourceSplitState;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.utils.Reference;

public class FlinkRecordsWithSplitIds
implements RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>> {
    @Nullable
    private String splitId;
    @Nullable
    private Reference<BulkFormat.RecordIterator<RowData>> recordsForSplitCurrent;
    @Nullable
    private final BulkFormat.RecordIterator<RowData> recordsForSplit;
    private final Set<String> finishedSplits;

    private FlinkRecordsWithSplitIds(@Nullable String splitId, @Nullable BulkFormat.RecordIterator<RowData> recordsForSplit, Set<String> finishedSplits) {
        this.splitId = splitId;
        this.recordsForSplit = recordsForSplit;
        this.finishedSplits = finishedSplits;
    }

    @Nullable
    public String nextSplit() {
        String nextSplit = this.splitId;
        this.splitId = null;
        this.recordsForSplitCurrent = nextSplit != null ? new Reference<BulkFormat.RecordIterator<RowData>>(this.recordsForSplit) : null;
        return nextSplit;
    }

    @Nullable
    public BulkFormat.RecordIterator<RowData> nextRecordFromSplit() {
        if (this.recordsForSplitCurrent == null) {
            throw new IllegalStateException();
        }
        BulkFormat.RecordIterator<RowData> recordsForSplit = this.recordsForSplitCurrent.get();
        this.recordsForSplitCurrent.set(null);
        return recordsForSplit;
    }

    public Set<String> finishedSplits() {
        return this.finishedSplits;
    }

    public void recycle() {
        if (this.recordsForSplit != null) {
            this.recordsForSplit.releaseBatch();
        }
    }

    public static FlinkRecordsWithSplitIds forRecords(String splitId, BulkFormat.RecordIterator<RowData> recordsForSplit) {
        return new FlinkRecordsWithSplitIds(splitId, recordsForSplit, Collections.emptySet());
    }

    public static FlinkRecordsWithSplitIds finishedSplit(String splitId) {
        return new FlinkRecordsWithSplitIds(null, null, Collections.singleton(splitId));
    }

    public static void emitRecord(SourceReaderContext context, BulkFormat.RecordIterator<RowData> element, SourceOutput<RowData> output, FileStoreSourceSplitState state, FileStoreSourceReaderMetrics metrics, @Nullable NestedProjectedRowData nestedProjectedRowData) {
        RecordAndPosition record;
        long timestamp = Long.MIN_VALUE;
        if (metrics.getLatestFileCreationTime() != -1L) {
            timestamp = metrics.getLatestFileCreationTime();
        }
        Counter numRecordsIn = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
        boolean firstRecord = true;
        while ((record = element.next()) != null) {
            if (firstRecord) {
                firstRecord = false;
            } else {
                numRecordsIn.inc();
            }
            RowData rowData = (RowData)record.getRecord();
            if (nestedProjectedRowData != null) {
                rowData = nestedProjectedRowData.replaceRow(rowData);
            }
            output.collect((Object)rowData, timestamp);
            state.setPosition((RecordAndPosition<RowData>)record);
        }
    }
}

