/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.util.Collection;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.metrics.MetricGroup;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.NestedProjectedRowData;
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.flink.source.DynamicPartitionFilteringInfo;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.FileStoreSourceSplitGenerator;
import org.apache.paimon.flink.source.FlinkSource;
import org.apache.paimon.flink.source.PendingSplitsCheckpoint;
import org.apache.paimon.flink.source.StaticFileStoreSplitEnumerator;
import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableScan;

public class StaticFileStoreSource
extends FlinkSource {
    private static final long serialVersionUID = 3L;
    private final int splitBatchSize;
    private final FlinkConnectorOptions.SplitAssignMode splitAssignMode;
    @Nullable
    private final DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo;

    public StaticFileStoreSource(ReadBuilder readBuilder, @Nullable Long limit, int splitBatchSize, FlinkConnectorOptions.SplitAssignMode splitAssignMode) {
        this(readBuilder, limit, splitBatchSize, splitAssignMode, null, null);
    }

    public StaticFileStoreSource(ReadBuilder readBuilder, @Nullable Long limit, int splitBatchSize, FlinkConnectorOptions.SplitAssignMode splitAssignMode, @Nullable DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo, @Nullable NestedProjectedRowData rowData) {
        super(readBuilder, limit, rowData);
        this.splitBatchSize = splitBatchSize;
        this.splitAssignMode = splitAssignMode;
        this.dynamicPartitionFilteringInfo = dynamicPartitionFilteringInfo;
    }

    public Boundedness getBoundedness() {
        return Boundedness.BOUNDED;
    }

    public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(SplitEnumeratorContext<FileStoreSourceSplit> context, PendingSplitsCheckpoint checkpoint) {
        Collection<FileStoreSourceSplit> splits = checkpoint == null ? this.getSplits(context) : checkpoint.splits();
        SplitAssigner splitAssigner = StaticFileStoreSource.createSplitAssigner(context, this.splitBatchSize, this.splitAssignMode, splits);
        return new StaticFileStoreSplitEnumerator(context, null, splitAssigner, this.dynamicPartitionFilteringInfo);
    }

    private List<FileStoreSourceSplit> getSplits(SplitEnumeratorContext context) {
        FileStoreSourceSplitGenerator splitGenerator = new FileStoreSourceSplitGenerator();
        TableScan scan = this.readBuilder.newScan();
        if (context.metricGroup() != null) {
            ((InnerTableScan)scan).withMetricRegistry(new FlinkMetricRegistry((MetricGroup)context.metricGroup()));
        }
        return splitGenerator.createSplits(scan.plan());
    }

    public static SplitAssigner createSplitAssigner(SplitEnumeratorContext<FileStoreSourceSplit> context, int splitBatchSize, FlinkConnectorOptions.SplitAssignMode splitAssignMode, Collection<FileStoreSourceSplit> splits) {
        switch (splitAssignMode) {
            case FAIR: {
                return new PreAssignSplitAssigner(splitBatchSize, context, splits);
            }
            case PREEMPTIVE: {
                return new FIFOSplitAssigner(splits);
            }
        }
        throw new UnsupportedOperationException("Unsupported assign mode " + splitAssignMode);
    }
}

