/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.delta.input;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import com.google.common.primitives.Ints;
import io.delta.kernel.Scan;
import io.delta.kernel.ScanBuilder;
import io.delta.kernel.Snapshot;
import io.delta.kernel.Table;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.internal.data.ScanStateRow;
import io.delta.kernel.internal.util.Utils;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.delta.filter.DeltaFilter;
import org.apache.druid.delta.input.DeltaInputSourceReader;
import org.apache.druid.delta.input.DeltaSplit;
import org.apache.druid.delta.input.RowSerde;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.utils.Streams;
import org.apache.hadoop.conf.Configuration;

public class DeltaInputSource
implements SplittableInputSource<DeltaSplit> {
    public static final String TYPE_KEY = "delta";
    @JsonProperty
    private final String tablePath;
    @JsonProperty
    @Nullable
    private final DeltaSplit deltaSplit;
    @JsonProperty
    @Nullable
    private final DeltaFilter filter;
    @JsonProperty
    private final Long snapshotVersion;

    @JsonCreator
    public DeltaInputSource(@JsonProperty(value="tablePath") String tablePath, @JsonProperty(value="deltaSplit") @Nullable DeltaSplit deltaSplit, @JsonProperty(value="filter") @Nullable DeltaFilter filter, @JsonProperty(value="snapshotVersion") @Nullable Long snapshotVersion) {
        if (tablePath == null) {
            throw InvalidInput.exception((String)"tablePath cannot be null.", (Object[])new Object[0]);
        }
        this.tablePath = tablePath;
        this.deltaSplit = deltaSplit;
        this.filter = filter;
        this.snapshotVersion = snapshotVersion;
    }

    public boolean needsFormat() {
        return false;
    }

    public InputSourceReader reader(InputRowSchema inputRowSchema, @Nullable InputFormat inputFormat, File temporaryDirectory) {
        Engine engine = this.createDeltaEngine();
        try {
            ArrayList<CloseableIterator<FilteredColumnarBatch>> scanFileDataIters = new ArrayList<CloseableIterator<FilteredColumnarBatch>>();
            if (this.deltaSplit != null) {
                Row scanState = this.deserialize(engine, this.deltaSplit.getStateRow());
                StructType physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema((Engine)engine, (Row)scanState);
                for (String file : this.deltaSplit.getFiles()) {
                    Row scanFile = this.deserialize(engine, file);
                    scanFileDataIters.add(this.getTransformedDataIterator(engine, scanState, scanFile, physicalReadSchema, Optional.empty()));
                }
            } else {
                Table table = Table.forPath((Engine)engine, (String)this.tablePath);
                Snapshot snapshot = this.getSnapshotForTable(table, engine);
                StructType fullSnapshotSchema = snapshot.getSchema(engine);
                StructType prunedSchema = this.pruneSchema(fullSnapshotSchema, inputRowSchema.getColumnsFilter());
                ScanBuilder scanBuilder = snapshot.getScanBuilder(engine);
                if (this.filter != null) {
                    scanBuilder.withFilter(engine, this.filter.getFilterPredicate(fullSnapshotSchema));
                }
                Scan scan = scanBuilder.withReadSchema(engine, prunedSchema).build();
                CloseableIterator scanFilesIter = scan.getScanFiles(engine);
                Row scanState = scan.getScanState(engine);
                StructType physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema((Engine)engine, (Row)scanState);
                while (scanFilesIter.hasNext()) {
                    FilteredColumnarBatch scanFileBatch = (FilteredColumnarBatch)scanFilesIter.next();
                    CloseableIterator scanFileRows = scanFileBatch.getRows();
                    while (scanFileRows.hasNext()) {
                        Row scanFile = (Row)scanFileRows.next();
                        scanFileDataIters.add(this.getTransformedDataIterator(engine, scanState, scanFile, physicalReadSchema, scan.getRemainingFilter()));
                    }
                }
            }
            return new DeltaInputSourceReader(scanFileDataIters.iterator(), inputRowSchema);
        }
        catch (TableNotFoundException e) {
            throw InvalidInput.exception((Throwable)e, (String)"tablePath[%s] not found.", (Object[])new Object[]{this.tablePath});
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public Stream<InputSplit<DeltaSplit>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) {
        Snapshot snapshot;
        if (this.deltaSplit != null) {
            return Stream.of(new InputSplit((Object)this.deltaSplit));
        }
        Engine engine = this.createDeltaEngine();
        Table table = Table.forPath((Engine)engine, (String)this.tablePath);
        try {
            snapshot = this.getSnapshotForTable(table, engine);
        }
        catch (TableNotFoundException e) {
            throw InvalidInput.exception((Throwable)e, (String)"tablePath[%s] not found.", (Object[])new Object[]{this.tablePath});
        }
        StructType fullSnapshotSchema = snapshot.getSchema(engine);
        ScanBuilder scanBuilder = snapshot.getScanBuilder(engine);
        if (this.filter != null) {
            scanBuilder.withFilter(engine, this.filter.getFilterPredicate(fullSnapshotSchema));
        }
        Scan scan = scanBuilder.withReadSchema(engine, fullSnapshotSchema).build();
        CloseableIterator scanFilesIterator = scan.getScanFiles(engine);
        Row scanState = scan.getScanState(engine);
        String scanStateStr = RowSerde.serializeRowToJson(scanState);
        Iterator deltaSplitIterator = Iterators.transform((Iterator)scanFilesIterator, scanFile -> {
            CloseableIterator rows = scanFile.getRows();
            ArrayList<String> fileRows = new ArrayList<String>();
            while (rows.hasNext()) {
                fileRows.add(RowSerde.serializeRowToJson((Row)rows.next()));
            }
            return new DeltaSplit(scanStateStr, fileRows);
        });
        return Streams.sequentialStreamFrom((Iterator)deltaSplitIterator).map(InputSplit::new);
    }

    public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) {
        return Ints.checkedCast((long)this.createSplits(inputFormat, splitHintSpec).count());
    }

    public InputSource withSplit(InputSplit<DeltaSplit> split) {
        return new DeltaInputSource(this.tablePath, (DeltaSplit)split.get(), this.filter, this.snapshotVersion);
    }

    private Row deserialize(Engine engine, String row) {
        return RowSerde.deserializeRowFromJson(engine, row);
    }

    private StructType pruneSchema(StructType baseSchema, ColumnsFilter columnsFilter) {
        List columnNames = baseSchema.fieldNames();
        List fiteredColumnNames = columnNames.stream().filter(arg_0 -> ((ColumnsFilter)columnsFilter).apply(arg_0)).collect(Collectors.toList());
        if (fiteredColumnNames.equals(columnNames)) {
            return baseSchema;
        }
        List selectedFields = fiteredColumnNames.stream().map(arg_0 -> ((StructType)baseSchema).get(arg_0)).collect(Collectors.toList());
        return new StructType(selectedFields);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Engine createDeltaEngine() {
        ClassLoader currCtxClassloader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
            Configuration conf = new Configuration();
            DefaultEngine defaultEngine = DefaultEngine.create((Configuration)conf);
            return defaultEngine;
        }
        finally {
            Thread.currentThread().setContextClassLoader(currCtxClassloader);
        }
    }

    private CloseableIterator<FilteredColumnarBatch> getTransformedDataIterator(Engine engine, Row scanState, Row scanFile, StructType physicalReadSchema, Optional<Predicate> optionalPredicate) throws IOException {
        FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus((Row)scanFile);
        CloseableIterator physicalDataIter = engine.getParquetHandler().readParquetFiles(Utils.singletonCloseableIterator((Object)fileStatus), physicalReadSchema, optionalPredicate);
        return Scan.transformPhysicalData((Engine)engine, (Row)scanState, (Row)scanFile, (CloseableIterator)physicalDataIter);
    }

    private Snapshot getSnapshotForTable(Table table, Engine engine) {
        if (this.snapshotVersion != null) {
            return table.getSnapshotAsOfVersion(engine, this.snapshotVersion.longValue());
        }
        return table.getLatestSnapshot(engine);
    }

    @VisibleForTesting
    String getTablePath() {
        return this.tablePath;
    }

    @VisibleForTesting
    DeltaFilter getFilter() {
        return this.filter;
    }

    @VisibleForTesting
    Long getSnapshotVersion() {
        return this.snapshotVersion;
    }
}

