/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.function.IntUnaryOperator;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.lookup.LookupFileStoreTable;
import org.apache.paimon.io.SplitsParallelReadUtil;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.FunctionWithIOException;
import org.apache.paimon.utils.TypeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LookupStreamingReader {
    private static final Logger LOG = LoggerFactory.getLogger(LookupStreamingReader.class);
    private final LookupFileStoreTable table;
    private final int[] projection;
    @Nullable
    private final Filter<InternalRow> cacheRowFilter;
    private final ReadBuilder readBuilder;
    @Nullable
    private final Predicate projectedPredicate;
    private final StreamTableScan scan;

    public LookupStreamingReader(LookupFileStoreTable table, int[] projection, @Nullable Predicate predicate, Set<Integer> requireCachedBucketIds, @Nullable Filter<InternalRow> cacheRowFilter) {
        this.table = table;
        this.projection = projection;
        this.cacheRowFilter = cacheRowFilter;
        this.readBuilder = this.table.newReadBuilder().withProjection(projection).withFilter(predicate).withBucketFilter(requireCachedBucketIds == null ? null : requireCachedBucketIds::contains);
        this.scan = this.readBuilder.newStreamScan();
        if (predicate != null) {
            List<String> fieldNames = table.rowType().getFieldNames();
            List<String> primaryKeys = table.primaryKeys();
            IntUnaryOperator operator = i -> {
                int index = Ints.indexOf(projection, i);
                boolean safeFilter = primaryKeys.isEmpty() || primaryKeys.contains(fieldNames.get(i));
                return safeFilter ? index : -1;
            };
            int[] fieldIdxToProjectionIdx = IntStream.range(0, table.rowType().getFieldCount()).map(operator).toArray();
            this.projectedPredicate = PredicateBuilder.transformFieldMapping(predicate, fieldIdxToProjectionIdx).orElse(null);
        } else {
            this.projectedPredicate = null;
        }
    }

    public List<Split> nextSplits() {
        return this.scan.plan().splits();
    }

    public RecordReader<InternalRow> toRecordReader(List<Split> splits, boolean useParallelism) throws Exception {
        RecordReader<InternalRow> reader;
        this.log(splits);
        CoreOptions options = CoreOptions.fromMap(this.table.options());
        FunctionWithIOException readerSupplier = split -> this.readBuilder.newRead().createReader((Split)split);
        RowType readType = TypeUtils.project(this.table.rowType(), this.projection);
        if (useParallelism) {
            reader = SplitsParallelReadUtil.parallelExecute(readType, readerSupplier, splits, options.pageSize(), new Options(this.table.options()).get(FlinkConnectorOptions.LOOKUP_BOOTSTRAP_PARALLELISM));
        } else {
            ArrayList readers = new ArrayList();
            for (Split split2 : splits) {
                readers.add(() -> (RecordReader)readerSupplier.apply(split2));
            }
            reader = ConcatRecordReader.create(readers);
        }
        if (this.projectedPredicate != null) {
            reader = reader.filter(this.projectedPredicate::test);
        }
        if (this.cacheRowFilter != null) {
            reader = reader.filter(this.cacheRowFilter);
        }
        return reader;
    }

    private void log(List<Split> splits) {
        if (splits.isEmpty()) {
            LOG.info("LookupStreamingReader didn't get splits from {}.", (Object)this.table.name());
            return;
        }
        DataSplit dataSplit = (DataSplit)splits.get(0);
        LOG.info("LookupStreamingReader get splits from {} with snapshotId {}.", (Object)this.table.name(), (Object)dataSplit.snapshotId());
    }

    @Nullable
    public Long nextSnapshotId() {
        return this.scan.checkpoint();
    }
}

