/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.table.source;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.SortValue;
import org.apache.paimon.predicate.TopN;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.AbstractDataTableScan;
import org.apache.paimon.table.source.DataFilePlan;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.PlanImpl;
import org.apache.paimon.table.source.PushDownUtils;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableQueryAuth;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.table.source.TopNDataSplitEvaluator;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.types.DataType;

public class DataTableBatchScan
extends AbstractDataTableScan {
    private StartingScanner startingScanner;
    private boolean hasNext = true;
    private Integer pushDownLimit;
    private TopN topN;
    private final SchemaManager schemaManager;

    public DataTableBatchScan(TableSchema schema, SchemaManager schemaManager, CoreOptions options, SnapshotReader snapshotReader, TableQueryAuth queryAuth) {
        super(schema, options, snapshotReader, queryAuth);
        this.schemaManager = schemaManager;
        if (!schema.primaryKeys().isEmpty() && options.batchScanSkipLevel0() && options.toConfiguration().get(CoreOptions.BATCH_SCAN_MODE).equals(CoreOptions.BatchScanMode.NONE)) {
            snapshotReader.withLevelFilter((Integer level) -> level > 0).enableValueFilter();
        }
        if (options.bucket() == -2) {
            snapshotReader.onlyReadRealBuckets();
        }
    }

    @Override
    public InnerTableScan withFilter(Predicate predicate) {
        super.withFilter(predicate);
        return this;
    }

    @Override
    public InnerTableScan withLimit(int limit) {
        this.pushDownLimit = limit;
        return this;
    }

    @Override
    public InnerTableScan withTopN(TopN topN) {
        this.topN = topN;
        return this;
    }

    @Override
    public TableScan.Plan plan() {
        this.authQuery();
        if (this.startingScanner == null) {
            this.startingScanner = this.createStartingScanner(false);
        }
        if (this.hasNext) {
            this.hasNext = false;
            Optional<StartingScanner.Result> pushed = this.applyPushDownLimit();
            if (pushed.isPresent()) {
                return DataFilePlan.fromResult(pushed.get());
            }
            pushed = this.applyPushDownTopN();
            if (pushed.isPresent()) {
                return DataFilePlan.fromResult(pushed.get());
            }
            return DataFilePlan.fromResult(this.startingScanner.scan(this.snapshotReader));
        }
        throw new EndOfScanException();
    }

    @Override
    public List<PartitionEntry> listPartitionEntries() {
        if (this.startingScanner == null) {
            this.startingScanner = this.createStartingScanner(false);
        }
        return this.startingScanner.scanPartitions(this.snapshotReader);
    }

    private Optional<StartingScanner.Result> applyPushDownLimit() {
        if (this.pushDownLimit == null) {
            return Optional.empty();
        }
        StartingScanner.Result result = this.startingScanner.scan(this.snapshotReader);
        if (!(result instanceof StartingScanner.ScannedResult)) {
            return Optional.of(result);
        }
        long scannedRowCount = 0L;
        SnapshotReader.Plan plan = ((StartingScanner.ScannedResult)result).plan();
        List<DataSplit> splits = plan.dataSplits();
        if (splits.isEmpty()) {
            return Optional.of(result);
        }
        ArrayList<Split> limitedSplits = new ArrayList<Split>();
        for (DataSplit dataSplit : splits) {
            if (!dataSplit.rawConvertible()) continue;
            long partialMergedRowCount = dataSplit.partialMergedRowCount();
            limitedSplits.add(dataSplit);
            if ((scannedRowCount += partialMergedRowCount) < (long)this.pushDownLimit.intValue()) continue;
            PlanImpl newPlan = new PlanImpl(plan.watermark(), plan.snapshotId(), limitedSplits);
            return Optional.of(new StartingScanner.ScannedResult(newPlan));
        }
        return Optional.of(result);
    }

    private Optional<StartingScanner.Result> applyPushDownTopN() {
        if (this.topN == null || this.pushDownLimit != null || !this.schema.primaryKeys().isEmpty() || this.options().deletionVectorsEnabled()) {
            return Optional.empty();
        }
        List<SortValue> orders = this.topN.orders();
        if (orders.size() != 1) {
            return Optional.empty();
        }
        if (this.topN.limit() > 100) {
            return Optional.empty();
        }
        SortValue order = orders.get(0);
        DataType type = order.field().type();
        if (!PushDownUtils.minmaxAvailable(type)) {
            return Optional.empty();
        }
        StartingScanner.Result result = this.startingScanner.scan(this.snapshotReader.keepStats());
        if (!(result instanceof StartingScanner.ScannedResult)) {
            return Optional.of(result);
        }
        SnapshotReader.Plan plan = ((StartingScanner.ScannedResult)result).plan();
        List<DataSplit> splits = plan.dataSplits();
        if (splits.isEmpty()) {
            return Optional.of(result);
        }
        TopNDataSplitEvaluator evaluator = new TopNDataSplitEvaluator(this.schema, this.schemaManager);
        ArrayList<Split> topNSplits = new ArrayList<Split>(evaluator.evaluate(order, this.topN.limit(), splits));
        PlanImpl newPlan = new PlanImpl(plan.watermark(), plan.snapshotId(), topNSplits);
        return Optional.of(new StartingScanner.ScannedResult(newPlan));
    }

    @Override
    public DataTableScan withShard(int indexOfThisSubtask, int numberOfParallelSubtasks) {
        this.snapshotReader.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
        return this;
    }
}

