/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.mr.mapreduce;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.GenericDeleteFilter;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.mapreduce.IcebergSplit;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.iceberg.util.SerializationUtil;

public class IcebergInputFormat<T>
extends InputFormat<Void, T> {
    public static InputFormatConfig.ConfigBuilder configure(Job job) {
        job.setInputFormatClass(IcebergInputFormat.class);
        return new InputFormatConfig.ConfigBuilder(job.getConfiguration());
    }

    public List<InputSplit> getSplits(JobContext context) {
        Expression filter;
        String[] selectedColumns;
        String schemaStr;
        long splitSize;
        long asOfTime;
        Configuration conf = context.getConfiguration();
        Table table = conf.get("iceberg.mr.serialized.table") != null ? (Table)SerializationUtil.deserializeFromBase64(conf.get("iceberg.mr.serialized.table")) : Catalogs.loadTable(conf);
        TableScan scan = table.newScan().caseSensitive(conf.getBoolean("iceberg.mr.case.sensitive", true));
        long snapshotId = conf.getLong("iceberg.mr.snapshot.id", -1L);
        if (snapshotId != -1L) {
            scan = scan.useSnapshot(snapshotId);
        }
        if ((asOfTime = conf.getLong("iceberg.mr.as.of.time", -1L)) != -1L) {
            scan = scan.asOfTime(asOfTime);
        }
        if ((splitSize = conf.getLong("iceberg.mr.split.size", 0L)) > 0L) {
            scan = scan.option("read.split.target-size", String.valueOf(splitSize));
        }
        if ((schemaStr = conf.get("iceberg.mr.read.schema")) != null) {
            scan.project(SchemaParser.fromJson(schemaStr));
        }
        if ((selectedColumns = conf.getStrings("iceberg.mr.selected.columns")) != null) {
            scan.select(selectedColumns);
        }
        if ((filter = (Expression)SerializationUtil.deserializeFromBase64(conf.get("iceberg.mr.filter.expression"))) != null) {
            scan = scan.filter(filter);
        }
        ArrayList<InputSplit> splits = Lists.newArrayList();
        boolean applyResidual = !conf.getBoolean("skip.residual.filtering", false);
        InputFormatConfig.InMemoryDataModel model = (InputFormatConfig.InMemoryDataModel)conf.getEnum("iceberg.mr.in.memory.data.model", (Enum)InputFormatConfig.InMemoryDataModel.GENERIC);
        try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks();){
            tasksIterable.forEach(task -> {
                if (applyResidual && (model == InputFormatConfig.InMemoryDataModel.HIVE || model == InputFormatConfig.InMemoryDataModel.PIG)) {
                    IcebergInputFormat.checkResiduals(task);
                }
                splits.add(new IcebergSplit(conf, (CombinedScanTask)task, table.io(), table.encryption()));
            });
        }
        catch (IOException e) {
            throw new UncheckedIOException(String.format("Failed to close table scan: %s", scan), e);
        }
        return splits;
    }

    private static void checkResiduals(CombinedScanTask task) {
        task.files().forEach(fileScanTask -> {
            Expression residual = fileScanTask.residual();
            if (residual != null && !residual.equals(Expressions.alwaysTrue())) {
                throw new UnsupportedOperationException(String.format("Filter expression %s is not completely satisfied. Additional rows can be returned not satisfied by the filter expression", residual));
            }
        });
    }

    public RecordReader<Void, T> createRecordReader(InputSplit split, TaskAttemptContext context) {
        return new IcebergRecordReader();
    }

    private static final class IcebergRecordReader<T>
    extends RecordReader<Void, T> {
        private TaskAttemptContext context;
        private Schema tableSchema;
        private Schema expectedSchema;
        private boolean reuseContainers;
        private boolean caseSensitive;
        private InputFormatConfig.InMemoryDataModel inMemoryDataModel;
        private Iterator<FileScanTask> tasks;
        private T currentRow;
        private CloseableIterator<T> currentIterator;
        private FileIO io;
        private EncryptionManager encryptionManager;

        private IcebergRecordReader() {
        }

        public void initialize(InputSplit split, TaskAttemptContext newContext) {
            Configuration conf = newContext.getConfiguration();
            CombinedScanTask task = ((IcebergSplit)split).task();
            this.context = newContext;
            this.io = ((IcebergSplit)split).io();
            this.encryptionManager = ((IcebergSplit)split).encryptionManager();
            this.tasks = task.files().iterator();
            this.tableSchema = InputFormatConfig.tableSchema(conf);
            this.caseSensitive = conf.getBoolean("iceberg.mr.case.sensitive", true);
            this.expectedSchema = IcebergRecordReader.readSchema(conf, this.tableSchema, this.caseSensitive);
            this.reuseContainers = conf.getBoolean("iceberg.mr.reuse.containers", false);
            this.inMemoryDataModel = (InputFormatConfig.InMemoryDataModel)conf.getEnum("iceberg.mr.in.memory.data.model", (Enum)InputFormatConfig.InMemoryDataModel.GENERIC);
            this.currentIterator = this.open(this.tasks.next(), this.expectedSchema).iterator();
        }

        public boolean nextKeyValue() throws IOException {
            while (true) {
                if (this.currentIterator.hasNext()) {
                    this.currentRow = this.currentIterator.next();
                    return true;
                }
                if (!this.tasks.hasNext()) break;
                this.currentIterator.close();
                this.currentIterator = this.open(this.tasks.next(), this.expectedSchema).iterator();
            }
            this.currentIterator.close();
            return false;
        }

        public Void getCurrentKey() {
            return null;
        }

        public T getCurrentValue() {
            return this.currentRow;
        }

        public float getProgress() {
            return this.context.getProgress();
        }

        public void close() throws IOException {
            this.currentIterator.close();
        }

        private CloseableIterable<T> openTask(FileScanTask currentTask, Schema readSchema) {
            CloseableIterable<T> iterable;
            DataFile file = currentTask.file();
            InputFile inputFile = this.encryptionManager.decrypt(EncryptedFiles.encryptedInput(this.io.newInputFile(file.path().toString()), file.keyMetadata()));
            switch (file.format()) {
                case AVRO: {
                    iterable = this.newAvroIterable(inputFile, currentTask, readSchema);
                    break;
                }
                case ORC: {
                    iterable = this.newOrcIterable(inputFile, currentTask, readSchema);
                    break;
                }
                case PARQUET: {
                    iterable = this.newParquetIterable(inputFile, currentTask, readSchema);
                    break;
                }
                default: {
                    throw new UnsupportedOperationException(String.format("Cannot read %s file: %s", file.format().name(), file.path()));
                }
            }
            return iterable;
        }

        private CloseableIterable<T> open(FileScanTask currentTask, Schema readSchema) {
            switch (this.inMemoryDataModel) {
                case PIG: 
                case HIVE: {
                    throw new UnsupportedOperationException("Pig and Hive object models are not supported.");
                }
                case GENERIC: {
                    GenericDeleteFilter deletes = new GenericDeleteFilter(this.io, currentTask, this.tableSchema, readSchema);
                    Schema requiredSchema = deletes.requiredSchema();
                    return deletes.filter(this.openTask(currentTask, requiredSchema));
                }
            }
            throw new UnsupportedOperationException("Unsupported memory model");
        }

        private CloseableIterable<T> applyResidualFiltering(CloseableIterable<T> iter, Expression residual, Schema readSchema) {
            boolean applyResidual;
            boolean bl = applyResidual = !this.context.getConfiguration().getBoolean("skip.residual.filtering", false);
            if (applyResidual && residual != null && residual != Expressions.alwaysTrue()) {
                Evaluator filter = new Evaluator(readSchema.asStruct(), residual, this.caseSensitive);
                return CloseableIterable.filter(iter, record -> filter.eval((StructLike)record));
            }
            return iter;
        }

        private CloseableIterable<T> newAvroIterable(InputFile inputFile, FileScanTask task, Schema readSchema) {
            Avro.ReadBuilder avroReadBuilder = Avro.read(inputFile).project(readSchema).split(task.start(), task.length());
            if (this.reuseContainers) {
                avroReadBuilder.reuseContainers();
            }
            switch (this.inMemoryDataModel) {
                case PIG: 
                case HIVE: {
                    throw new UnsupportedOperationException("Avro support not yet supported for Pig and Hive");
                }
                case GENERIC: {
                    avroReadBuilder.createReaderFunc((expIcebergSchema, expAvroSchema) -> DataReader.create(expIcebergSchema, expAvroSchema, this.constantsMap(task, IdentityPartitionConverters::convertConstant)));
                }
            }
            return this.applyResidualFiltering(avroReadBuilder.build(), task.residual(), readSchema);
        }

        private CloseableIterable<T> newParquetIterable(InputFile inputFile, FileScanTask task, Schema readSchema) {
            Parquet.ReadBuilder parquetReadBuilder = Parquet.read(inputFile).project(readSchema).filter(task.residual()).caseSensitive(this.caseSensitive).split(task.start(), task.length());
            if (this.reuseContainers) {
                parquetReadBuilder.reuseContainers();
            }
            switch (this.inMemoryDataModel) {
                case PIG: 
                case HIVE: {
                    throw new UnsupportedOperationException("Parquet support not yet supported for Pig and Hive");
                }
                case GENERIC: {
                    parquetReadBuilder.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(readSchema, fileSchema, this.constantsMap(task, IdentityPartitionConverters::convertConstant)));
                }
            }
            return this.applyResidualFiltering(parquetReadBuilder.build(), task.residual(), readSchema);
        }

        private CloseableIterable<T> newOrcIterable(InputFile inputFile, FileScanTask task, Schema readSchema) {
            Map<Integer, ?> idToConstant = this.constantsMap(task, IdentityPartitionConverters::convertConstant);
            Schema readSchemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(readSchema, Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds()));
            ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile).project(readSchemaWithoutConstantAndMetadataFields).filter(task.residual()).caseSensitive(this.caseSensitive).split(task.start(), task.length());
            switch (this.inMemoryDataModel) {
                case PIG: 
                case HIVE: {
                    throw new UnsupportedOperationException("ORC support not yet supported for Pig and Hive");
                }
                case GENERIC: {
                    orcReadBuilder.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(readSchema, fileSchema, idToConstant));
                }
            }
            return this.applyResidualFiltering(orcReadBuilder.build(), task.residual(), readSchema);
        }

        private Map<Integer, ?> constantsMap(FileScanTask task, BiFunction<Type, Object, Object> converter) {
            boolean projectsIdentityPartitionColumns;
            PartitionSpec spec = task.spec();
            Set<Integer> idColumns = spec.identitySourceIds();
            Schema partitionSchema = TypeUtil.select(this.expectedSchema, idColumns);
            boolean bl = projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty();
            if (projectsIdentityPartitionColumns) {
                return PartitionUtil.constantsMap(task, converter);
            }
            return Collections.emptyMap();
        }

        private static Schema readSchema(Configuration conf, Schema tableSchema, boolean caseSensitive) {
            Schema readSchema = InputFormatConfig.readSchema(conf);
            if (readSchema != null) {
                return readSchema;
            }
            String[] selectedColumns = InputFormatConfig.selectedColumns(conf);
            if (selectedColumns == null) {
                return tableSchema;
            }
            return caseSensitive ? tableSchema.select(selectedColumns) : tableSchema.caseInsensitiveSelect(selectedColumns);
        }
    }
}

