/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.io.api.impl;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool;
import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
import org.apache.hadoop.hive.llap.io.api.impl.LlapInputFormat;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader;
import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
import org.apache.orc.impl.SchemaEvolution;
import org.apache.tez.common.counters.TezCounters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

class LlapRecordReader
implements RecordReader<NullWritable, VectorizedRowBatch>,
Consumer<ColumnVectorBatch> {
    private static final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class);
    private static final Object DONE_OBJECT = new Object();
    private final FileSplit split;
    private final IncludesImpl includes;
    private final SearchArgument sarg;
    private final VectorizedRowBatchCtx rbCtx;
    private final boolean isVectorized;
    private VectorizedOrcAcidRowBatchReader acidReader;
    private final Object[] partitionValues;
    private final LinkedBlockingQueue<Object> queue;
    private final AtomicReference<Throwable> pendingError = new AtomicReference<Object>(null);
    private ColumnVectorBatch lastCvb = null;
    private boolean isFirst = true;
    private int maxQueueSize = 0;
    private boolean isClosed = false;
    private boolean isInterrupted = false;
    private final ConsumerFeedback<ColumnVectorBatch> feedback;
    private final QueryFragmentCounters counters;
    private long firstReturnTime;
    private final JobConf jobConf;
    private final ReadPipeline rp;
    private final ExecutorService executor;
    private final boolean isAcidScan;
    private static final int COL_WEIGHT_COMPLEX = 16;
    private static final int COL_WEIGHT_HIVEDECIMAL = 4;
    private static final int COL_WEIGHT_STRING = 8;

    public static LlapRecordReader create(JobConf job, FileSplit split, List<Integer> tableIncludedCols, String hostName, ColumnVectorProducer cvp, ExecutorService executor, InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe, Reporter reporter, Configuration daemonConf) throws IOException, HiveException {
        MapWork mapWork = LlapRecordReader.findMapWork(job);
        if (mapWork == null) {
            return null;
        }
        LlapRecordReader rr = new LlapRecordReader(mapWork, job, split, tableIncludedCols, hostName, cvp, executor, sourceInputFormat, sourceSerDe, reporter, daemonConf);
        if (!rr.checkOrcSchemaEvolution()) {
            rr.close();
            return null;
        }
        return rr;
    }

    private LlapRecordReader(MapWork mapWork, JobConf job, FileSplit split, List<Integer> tableIncludedCols, String hostName, ColumnVectorProducer cvp, ExecutorService executor, InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe, Reporter reporter, Configuration daemonConf) throws IOException, HiveException {
        this.executor = executor;
        this.jobConf = job;
        this.split = split;
        this.sarg = ConvertAstToSearchArg.createFromConf((Configuration)job);
        String fragmentId = LlapTezUtils.getFragmentId((JobConf)job);
        String dagId = LlapTezUtils.getDagId((JobConf)job);
        String queryId = HiveConf.getVar((Configuration)job, (HiveConf.ConfVars)HiveConf.ConfVars.HIVEQUERYID);
        MDC.put((String)"dagId", (String)dagId);
        MDC.put((String)"queryId", (String)queryId);
        TezCounters taskCounters = null;
        if (fragmentId != null) {
            MDC.put((String)"fragmentId", (String)fragmentId);
            taskCounters = FragmentCountersMap.getCountersForFragment(fragmentId);
            LOG.info("Received fragment id: {}", (Object)fragmentId);
        } else {
            LOG.warn("Not using tez counters as fragment id string is null");
        }
        this.counters = new QueryFragmentCounters((Configuration)job, taskCounters);
        this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName);
        VectorizedRowBatchCtx ctx = mapWork.getVectorizedRowBatchCtx();
        this.rbCtx = ctx != null ? ctx : LlapInputFormat.createFakeVrbCtx(mapWork);
        this.isAcidScan = AcidUtils.isFullAcidScan((Configuration)this.jobConf);
        TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr((Configuration)job, (boolean)this.isAcidScan, (int)Integer.MAX_VALUE);
        this.includes = new IncludesImpl(tableIncludedCols, this.isAcidScan, this.rbCtx, schema, job);
        int queueLimitBase = LlapRecordReader.getQueueVar(HiveConf.ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_BASE, job, daemonConf);
        int queueLimitMin = LlapRecordReader.getQueueVar(HiveConf.ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_MIN, job, daemonConf);
        boolean decimal64Support = HiveConf.getVar((Configuration)job, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_VECTORIZED_INPUT_FORMAT_SUPPORTS_ENABLED).equalsIgnoreCase("decimal_64");
        int limit = LlapRecordReader.determineQueueLimit(queueLimitBase, queueLimitMin, this.rbCtx.getRowColumnTypeInfos(), decimal64Support);
        LOG.info("Queue limit for LlapRecordReader is " + limit);
        this.queue = new LinkedBlockingQueue(limit);
        int partitionColumnCount = this.rbCtx.getPartitionColumnCount();
        if (partitionColumnCount > 0) {
            this.partitionValues = new Object[partitionColumnCount];
            VectorizedRowBatchCtx.getPartitionValues((VectorizedRowBatchCtx)this.rbCtx, (MapWork)mapWork, (FileSplit)split, (Object[])this.partitionValues);
        } else {
            this.partitionValues = null;
        }
        this.isVectorized = HiveConf.getBoolVar((Configuration)this.jobConf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
        if (this.isAcidScan) {
            this.acidReader = new VectorizedOrcAcidRowBatchReader((OrcSplit)split, this.jobConf, Reporter.NULL, null, this.rbCtx, true);
        }
        this.feedback = this.rp = cvp.createReadPipeline(this, split, this.includes, this.sarg, this.counters, this.includes, sourceInputFormat, sourceSerDe, reporter, job, mapWork.getPathToPartitionInfo());
    }

    private static int getQueueVar(HiveConf.ConfVars var, JobConf jobConf, Configuration daemonConf) {
        int jobVal = jobConf.getInt(var.varname, -1);
        return jobVal != -1 ? jobVal : HiveConf.getIntVar((Configuration)daemonConf, (HiveConf.ConfVars)var);
    }

    private static int determineQueueLimit(int queueLimitBase, int queueLimitMin, TypeInfo[] typeInfos, boolean decimal64Support) {
        if (queueLimitBase == queueLimitMin) {
            return queueLimitBase;
        }
        if (typeInfos == null || typeInfos.length == 0) {
            return queueLimitBase;
        }
        double totalWeight = 0.0;
        for (TypeInfo ti : typeInfos) {
            int colWeight;
            if (ti.getCategory() != ObjectInspector.Category.PRIMITIVE) {
                colWeight = 16;
            } else {
                PrimitiveTypeInfo pti = (PrimitiveTypeInfo)ti;
                switch (pti.getPrimitiveCategory()) {
                    case BINARY: 
                    case CHAR: 
                    case VARCHAR: 
                    case STRING: {
                        colWeight = 8;
                        break;
                    }
                    case DECIMAL: {
                        DecimalTypeInfo dti;
                        boolean useDecimal64 = false;
                        if (ti instanceof DecimalTypeInfo && (dti = (DecimalTypeInfo)ti).getPrecision() <= 18 && decimal64Support) {
                            useDecimal64 = true;
                        }
                        if (useDecimal64) {
                            colWeight = 1;
                            break;
                        }
                        colWeight = 4;
                        break;
                    }
                    default: {
                        colWeight = 1;
                    }
                }
            }
            totalWeight += (double)colWeight;
        }
        return Math.max(queueLimitMin, (int)((double)queueLimitBase / totalWeight));
    }

    private static MapWork findMapWork(JobConf job) throws HiveException {
        String prefixes;
        String inputName = job.get("iocontext.input.name", null);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Initializing for input " + inputName);
        }
        if ((prefixes = job.get("hive.tez.merge.file.prefixes")) != null && !StringUtils.isBlank((CharSequence)prefixes)) {
            return null;
        }
        BaseWork work = null;
        if (!(inputName == null || prefixes != null && Lists.newArrayList((Object[])prefixes.split(",")).contains(inputName))) {
            inputName = null;
        }
        if (inputName != null) {
            work = Utilities.getMergeWork((Configuration)job, (String)inputName);
        }
        if (work == null || !(work instanceof MapWork)) {
            work = Utilities.getMapWork((Configuration)job);
        }
        return (MapWork)work;
    }

    public void start() {
        if (this.executor instanceof StatsRecordingThreadPool) {
            ((StatsRecordingThreadPool)this.executor).setUncaughtExceptionHandler(new IOUncaughtExceptionHandler());
        }
        this.executor.submit(this.rp.getReadCallable());
    }

    private boolean checkOrcSchemaEvolution() {
        SchemaEvolution evolution = this.rp.getSchemaEvolution();
        for (int i = 0; i < this.includes.getReaderLogicalColumnIds().size(); ++i) {
            int projectedColId = this.includes.getReaderLogicalColumnIds().get(i);
            int fileColId = OrcInputFormat.getRootColumn((!this.isAcidScan ? 1 : 0) != 0) + projectedColId + 1;
            if (evolution.isPPDSafeConversion(fileColId)) continue;
            LlapIoImpl.LOG.warn("Unsupported schema evolution! Disabling Llap IO for {}", (Object)this.split);
            return false;
        }
        return true;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public boolean next(NullWritable key, VectorizedRowBatch vrb) throws IOException {
        assert (vrb != null);
        if (this.isClosed) {
            throw new AssertionError((Object)"next called after close");
        }
        boolean wasFirst = this.isFirst;
        if (this.isFirst) {
            if (this.partitionValues != null) {
                this.rbCtx.addPartitionColsToBatch(vrb, this.partitionValues);
            }
            this.isFirst = false;
        }
        ColumnVectorBatch cvb = null;
        try {
            cvb = this.nextCvb();
        }
        catch (InterruptedException e) {
            this.feedback.stop();
            this.isInterrupted = true;
            throw new IOException(e);
        }
        if (cvb == null) {
            if (wasFirst) {
                this.firstReturnTime = this.counters.startTimeCounter();
            }
            this.counters.incrTimeCounter(LlapIOCounters.CONSUMER_TIME_NS, this.firstReturnTime);
            return false;
        }
        if (this.isAcidScan) {
            vrb.selectedInUse = true;
            if (!this.isVectorized) throw new AssertionError((Object)"Unsupported mode");
            int acidColCount = OrcInputFormat.getRootColumn((boolean)false) - 1;
            VectorizedRowBatch inputVrb = new VectorizedRowBatch(acidColCount + 1 + vrb.getDataColumnCount());
            System.arraycopy(cvb.cols, 0, inputVrb.cols, 0, acidColCount);
            for (int ixInReadSet = acidColCount; ixInReadSet < cvb.cols.length; ++ixInReadSet) {
                int ixInVrb = this.includes.getPhysicalColumnIds().get(ixInReadSet);
                inputVrb.cols[ixInVrb] = cvb.cols[ixInReadSet];
            }
            inputVrb.size = cvb.size;
            this.acidReader.setBaseAndInnerReader((RecordReader)new AcidWrapper(inputVrb));
            this.acidReader.next(NullWritable.get(), vrb);
        } else {
            if (this.includes.getPhysicalColumnIds().size() != cvb.cols.length) {
                throw new RuntimeException("Unexpected number of columns, VRB has " + this.includes.getPhysicalColumnIds().size() + " included, but the reader returned " + cvb.cols.length);
            }
            for (int ixInReadSet = 0; ixInReadSet < cvb.cols.length; ++ixInReadSet) {
                int ixInVrb = this.includes.getPhysicalColumnIds().get(ixInReadSet);
                cvb.swapColumnVector(ixInReadSet, vrb.cols, ixInVrb);
            }
            vrb.selectedInUse = false;
            vrb.size = cvb.size;
        }
        if (!wasFirst) return true;
        this.firstReturnTime = this.counters.startTimeCounter();
        return true;
    }

    public VectorizedRowBatchCtx getVectorizedRowBatchCtx() {
        return this.rbCtx;
    }

    ColumnVectorBatch nextCvb() throws InterruptedException, IOException {
        boolean doLogBlocking;
        boolean isFirst;
        boolean bl = isFirst = this.lastCvb == null;
        if (!isFirst) {
            this.feedback.returnData(this.lastCvb);
        }
        int queueSize = this.queue.size();
        this.maxQueueSize = Math.max(queueSize, this.maxQueueSize);
        boolean bl2 = doLogBlocking = LlapIoImpl.LOG.isTraceEnabled() && queueSize == 0;
        if (doLogBlocking) {
            LlapIoImpl.LOG.trace("next will block");
        }
        Object next = null;
        do {
            LlapRecordReader.rethrowErrorIfAny(this.pendingError.get());
        } while ((next = this.queue.poll(100L, TimeUnit.MILLISECONDS)) == null);
        if (doLogBlocking) {
            LlapIoImpl.LOG.trace("next is unblocked");
        }
        if (next == DONE_OBJECT) {
            return null;
        }
        if (next instanceof Throwable) {
            LlapRecordReader.rethrowErrorIfAny((Throwable)next);
            throw new AssertionError((Object)"Unreachable");
        }
        this.lastCvb = (ColumnVectorBatch)next;
        if (LlapIoImpl.LOG.isTraceEnabled()) {
            LlapIoImpl.LOG.trace("Processing will receive vector {}", (Object)this.lastCvb);
        }
        return this.lastCvb;
    }

    public NullWritable createKey() {
        return NullWritable.get();
    }

    public VectorizedRowBatch createValue() {
        return this.rbCtx.createVectorizedRowBatch();
    }

    public long getPos() throws IOException {
        return -1L;
    }

    public void close() throws IOException {
        if (LlapIoImpl.LOG.isTraceEnabled()) {
            LlapIoImpl.LOG.trace("close called; closed {}, interrupted {}, err {}, pending {}", new Object[]{this.isClosed, this.isInterrupted, this.pendingError.get(), this.queue.size()});
        }
        LlapIoImpl.LOG.info("Maximum queue length observed " + this.maxQueueSize);
        LlapIoImpl.LOG.info("Llap counters: {}", (Object)this.counters);
        this.feedback.stop();
        this.isClosed = true;
        LlapRecordReader.rethrowErrorIfAny(this.pendingError.get());
        MDC.clear();
    }

    private static void rethrowErrorIfAny(Throwable pendingError) throws IOException {
        if (pendingError == null) {
            return;
        }
        if (pendingError instanceof IOException) {
            throw (IOException)pendingError;
        }
        throw new IOException(pendingError);
    }

    public void setDone() throws InterruptedException {
        if (LlapIoImpl.LOG.isDebugEnabled()) {
            LlapIoImpl.LOG.debug("setDone called; closed {}, interrupted {}, err {}, pending {}", new Object[]{this.isClosed, this.isInterrupted, this.pendingError.get(), this.queue.size()});
        }
        this.enqueueInternal(DONE_OBJECT);
    }

    public void consumeData(ColumnVectorBatch data) throws InterruptedException {
        if (LlapIoImpl.LOG.isTraceEnabled()) {
            LlapIoImpl.LOG.trace("consume called; closed {}, interrupted {}, err {}, pending {}", new Object[]{this.isClosed, this.isInterrupted, this.pendingError.get(), this.queue.size()});
        }
        this.enqueueInternal(data);
    }

    public void setError(Throwable t) throws InterruptedException {
        this.counters.incrCounter(LlapIOCounters.NUM_ERRORS);
        LlapIoImpl.LOG.debug("setError called; closed {}, interrupted {},  err {}, pending {}", new Object[]{this.isClosed, this.isInterrupted, this.pendingError.get(), this.queue.size()});
        LlapIoImpl.LOG.warn("setError called with an error", t);
        assert (t != null);
        this.pendingError.compareAndSet(null, t);
        this.enqueueInternal(t);
    }

    private void enqueueInternal(Object o) throws InterruptedException {
        while (!(this.isClosed || this.isInterrupted || this.queue.offer(o, 100L, TimeUnit.MILLISECONDS))) {
        }
    }

    public float getProgress() throws IOException {
        return 0.0f;
    }

    private static class IncludesImpl
    implements ColumnVectorProducer.SchemaEvolutionFactory,
    ColumnVectorProducer.Includes {
        private List<Integer> readerLogicalColumnIds;
        private List<Integer> filePhysicalColumnIds;
        private Integer acidStructColumnId = null;
        private TypeDescription readerSchema;
        private JobConf jobConf;

        public IncludesImpl(List<Integer> tableIncludedCols, boolean isAcidScan, VectorizedRowBatchCtx rbCtx, TypeDescription readerSchema, JobConf jobConf) {
            this.readerSchema = readerSchema;
            this.jobConf = jobConf;
            if (tableIncludedCols == null) {
                tableIncludedCols = new ArrayList<Integer>(rbCtx.getRowColumnTypeInfos().length);
                for (int i = 0; i < rbCtx.getRowColumnTypeInfos().length; ++i) {
                    tableIncludedCols.add(i);
                }
            }
            LOG.debug("Logical table includes: {}", tableIncludedCols);
            List<Integer> filePhysicalColumnIds = this.readerLogicalColumnIds = tableIncludedCols;
            if (isAcidScan) {
                int rootCol = OrcInputFormat.getRootColumn((boolean)false);
                filePhysicalColumnIds = new ArrayList<Integer>(filePhysicalColumnIds.size() + rootCol);
                this.acidStructColumnId = rootCol - 1;
                for (int i = 0; i < rootCol; ++i) {
                    if (this.acidStructColumnId == i) continue;
                    filePhysicalColumnIds.add(i);
                }
                for (int tableColumnId : this.readerLogicalColumnIds) {
                    filePhysicalColumnIds.add(rootCol + tableColumnId);
                }
            }
            this.filePhysicalColumnIds = filePhysicalColumnIds;
        }

        public String toString() {
            return "logical columns " + this.readerLogicalColumnIds + ", physical columns " + this.filePhysicalColumnIds;
        }

        @Override
        public SchemaEvolution createSchemaEvolution(TypeDescription fileSchema) {
            if (this.readerSchema == null) {
                this.readerSchema = fileSchema;
            }
            boolean[] readerIncludes = OrcInputFormat.genIncludedColumns((TypeDescription)this.readerSchema, this.readerLogicalColumnIds);
            Reader.Options options = new Reader.Options((Configuration)this.jobConf).include(readerIncludes);
            return new SchemaEvolution(fileSchema, this.readerSchema, options);
        }

        @Override
        public boolean[] generateFileIncludes(TypeDescription fileSchema) {
            return OrcInputFormat.genIncludedColumns((TypeDescription)fileSchema, this.filePhysicalColumnIds, (Integer)this.acidStructColumnId);
        }

        @Override
        public List<Integer> getPhysicalColumnIds() {
            return this.filePhysicalColumnIds;
        }

        @Override
        public List<Integer> getReaderLogicalColumnIds() {
            return this.readerLogicalColumnIds;
        }

        @Override
        public TypeDescription[] getBatchReaderTypes(TypeDescription fileSchema) {
            return OrcInputFormat.genIncludedTypes((TypeDescription)fileSchema, this.filePhysicalColumnIds, (Integer)this.acidStructColumnId);
        }
    }

    private final class IOUncaughtExceptionHandler
    implements Thread.UncaughtExceptionHandler {
        private IOUncaughtExceptionHandler() {
        }

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            LlapIoImpl.LOG.error("Unhandled error from reader thread. threadName: {} threadId: {} Message: {}", new Object[]{t.getName(), t.getId(), e.getMessage()});
            try {
                LlapRecordReader.this.setError(e);
            }
            catch (InterruptedException e1) {
                LOG.info("IOUncaughtExceptionHandler interrupted; ignoring");
            }
        }
    }

    private static final class AcidWrapper
    implements RecordReader<NullWritable, VectorizedRowBatch> {
        private final VectorizedRowBatch acidVrb;

        private AcidWrapper(VectorizedRowBatch acidVrb) {
            this.acidVrb = acidVrb;
        }

        public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
            return true;
        }

        public NullWritable createKey() {
            return NullWritable.get();
        }

        public VectorizedRowBatch createValue() {
            return this.acidVrb;
        }

        public long getPos() throws IOException {
            return 0L;
        }

        public void close() throws IOException {
        }

        public float getProgress() throws IOException {
            return 0.0f;
        }
    }
}

