/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.mapreduce.input;

import com.google.common.base.Preconditions;
import java.io.DataInput;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.split.JobSplit;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.api.TezTaskContext;
import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
import org.apache.tez.runtime.library.api.KeyValueReader;

public class MRInput
implements LogicalInput {
    private static final Log LOG = LogFactory.getLog(MRInput.class);
    private final Lock rrLock = new ReentrantLock();
    private Condition rrInited = this.rrLock.newCondition();
    private TezInputContext inputContext;
    private volatile boolean eventReceived = false;
    private JobConf jobConf;
    private Configuration incrementalConf;
    private boolean readerCreated = false;
    boolean useNewApi;
    TaskAttemptContext taskAttemptContext;
    private InputFormat newInputFormat;
    private RecordReader newRecordReader;
    protected org.apache.hadoop.mapreduce.InputSplit newInputSplit;
    private org.apache.hadoop.mapred.InputFormat oldInputFormat;
    protected org.apache.hadoop.mapred.RecordReader oldRecordReader;
    protected InputSplit oldInputSplit;
    protected JobSplit.TaskSplitIndex splitMetaInfo = new JobSplit.TaskSplitIndex();
    private TezCounter inputRecordCounter;
    @InterfaceAudience.Private
    volatile boolean splitInfoViaEvents;

    public List<Event> initialize(TezInputContext inputContext) throws IOException {
        this.inputContext = inputContext;
        this.inputContext.requestInitialMemory(0L, null);
        this.inputContext.inputIsReady();
        MRRuntimeProtos.MRInputUserPayloadProto mrUserPayload = MRHelpers.parseMRInputPayload(inputContext.getUserPayload());
        Preconditions.checkArgument((!mrUserPayload.hasSplits() ? 1 : 0) != 0, (Object)"Split information not expected in MRInput");
        Configuration conf = MRHelpers.createConfFromByteString(mrUserPayload.getConfigurationBytes());
        this.jobConf = new JobConf(conf);
        this.jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
        TaskAttemptID taskAttemptId = new TaskAttemptID(new TaskID(Long.toString(inputContext.getApplicationId().getClusterTimestamp()), inputContext.getApplicationId().getId(), TaskType.MAP, inputContext.getTaskIndex()), inputContext.getTaskAttemptNumber());
        this.jobConf.set("mapreduce.task.attempt.id", taskAttemptId.toString());
        this.jobConf.setInt("mapreduce.job.application.attempt.id", inputContext.getDAGAttemptNumber());
        this.inputRecordCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.INPUT_RECORDS_PROCESSED);
        this.useNewApi = this.jobConf.getUseNewMapper();
        this.splitInfoViaEvents = this.jobConf.getBoolean("mapreduce.tez.splits.via.events", true);
        LOG.info((Object)("Using New mapreduce API: " + this.useNewApi + ", split information via event: " + this.splitInfoViaEvents));
        this.initializeInternal();
        return null;
    }

    public void start() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @InterfaceAudience.Private
    void initializeInternal() throws IOException {
        this.rrLock.lock();
        try {
            if (this.splitInfoViaEvents) {
                if (this.useNewApi) {
                    this.setupNewInputFormat();
                } else {
                    this.setupOldInputFormat();
                }
            } else {
                JobSplit.TaskSplitMetaInfo[] allMetaInfo = this.readSplits((Configuration)this.jobConf);
                JobSplit.TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[this.inputContext.getTaskIndex()];
                this.splitMetaInfo = new JobSplit.TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(), thisTaskMetaInfo.getStartOffset());
                if (this.useNewApi) {
                    this.setupNewInputFormat();
                    this.newInputSplit = this.getNewSplitDetailsFromDisk(this.splitMetaInfo);
                    this.setupNewRecordReader();
                } else {
                    this.setupOldInputFormat();
                    this.oldInputSplit = this.getOldSplitDetailsFromDisk(this.splitMetaInfo);
                    this.setupOldRecordReader();
                }
            }
        }
        finally {
            this.rrLock.unlock();
        }
        LOG.info((Object)("Initialzed MRInput: " + this.inputContext.getSourceVertexName()));
    }

    private void setupOldInputFormat() {
        this.oldInputFormat = this.jobConf.getInputFormat();
    }

    private void setupOldRecordReader() throws IOException {
        Preconditions.checkNotNull((Object)this.oldInputSplit, (Object)"Input split hasn't yet been setup");
        this.oldRecordReader = this.oldInputFormat.getRecordReader(this.oldInputSplit, this.jobConf, (Reporter)new MRReporter((TezTaskContext)this.inputContext, this.oldInputSplit));
        this.setIncrementalConfigParams(this.oldInputSplit);
    }

    private void setupNewInputFormat() throws IOException {
        Class inputFormatClazz;
        this.taskAttemptContext = this.createTaskAttemptContext();
        try {
            inputFormatClazz = this.taskAttemptContext.getInputFormatClass();
        }
        catch (ClassNotFoundException e) {
            throw new IOException("Unable to instantiate InputFormat class", e);
        }
        this.newInputFormat = (InputFormat)ReflectionUtils.newInstance((Class)inputFormatClazz, (Configuration)this.jobConf);
    }

    private void setupNewRecordReader() throws IOException {
        Preconditions.checkNotNull((Object)this.newInputSplit, (Object)"Input split hasn't yet been setup");
        try {
            this.newRecordReader = this.newInputFormat.createRecordReader(this.newInputSplit, this.taskAttemptContext);
            this.newRecordReader.initialize(this.newInputSplit, this.taskAttemptContext);
        }
        catch (InterruptedException e) {
            throw new IOException("Interrupted while creating record reader", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public KeyValueReader getReader() throws IOException {
        Preconditions.checkState((!this.readerCreated ? 1 : 0) != 0, (Object)"Only a single instance of record reader can be created for this input.");
        this.readerCreated = true;
        this.rrLock.lock();
        try {
            if (this.newRecordReader == null && this.oldRecordReader == null) {
                this.checkAndAwaitRecordReaderInitialization();
            }
        }
        finally {
            this.rrLock.unlock();
        }
        LOG.info((Object)("Creating reader for MRInput: " + this.inputContext.getSourceVertexName()));
        return new MRInputKVReader();
    }

    public void handleEvents(List<Event> inputEvents) throws Exception {
        if (this.eventReceived || inputEvents.size() != 1) {
            throw new IllegalStateException("MRInput expects only a single input. Received: current eventListSize: " + inputEvents.size() + "Received previous input: " + this.eventReceived);
        }
        Event event = inputEvents.iterator().next();
        Preconditions.checkArgument((boolean)(event instanceof RootInputDataInformationEvent), (Object)(this.getClass().getSimpleName() + " can only handle a single event of type: " + RootInputDataInformationEvent.class.getSimpleName()));
        this.processSplitEvent((RootInputDataInformationEvent)event);
    }

    public void setNumPhysicalInputs(int numInputs) {
    }

    public List<Event> close() throws IOException {
        if (this.useNewApi) {
            this.newRecordReader.close();
        } else {
            this.oldRecordReader.close();
        }
        return null;
    }

    public Configuration getConfigUpdates() {
        if (this.incrementalConf != null) {
            return new Configuration(this.incrementalConf);
        }
        return null;
    }

    public float getProgress() throws IOException, InterruptedException {
        if (this.useNewApi) {
            return this.newRecordReader.getProgress();
        }
        return this.oldRecordReader.getProgress();
    }

    private TaskAttemptContext createTaskAttemptContext() {
        return new TaskAttemptContextImpl((Configuration)this.jobConf, (TezTaskContext)this.inputContext, true, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void processSplitEvent(RootInputDataInformationEvent event) throws IOException {
        this.rrLock.lock();
        try {
            this.initFromEventInternal(event);
            LOG.info((Object)"Notifying on RecordReader Initialized");
            this.rrInited.signal();
        }
        finally {
            this.rrLock.unlock();
        }
    }

    void checkAndAwaitRecordReaderInitialization() throws IOException {
        try {
            LOG.info((Object)"Awaiting RecordReader initialization");
            this.rrInited.await();
        }
        catch (Exception e) {
            throw new IOException("Interrupted waiting for RecordReader initiailization");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @InterfaceAudience.Private
    void initFromEvent(RootInputDataInformationEvent initEvent) throws IOException {
        this.rrLock.lock();
        try {
            this.initFromEventInternal(initEvent);
        }
        finally {
            this.rrLock.unlock();
        }
    }

    private void initFromEventInternal(RootInputDataInformationEvent initEvent) throws IOException {
        LOG.info((Object)"Initializing RecordReader from event");
        Preconditions.checkState((initEvent != null ? 1 : 0) != 0, (Object)"InitEvent must be specified");
        MRRuntimeProtos.MRSplitProto splitProto = MRRuntimeProtos.MRSplitProto.parseFrom(initEvent.getUserPayload());
        if (this.useNewApi) {
            this.newInputSplit = MRInput.getNewSplitDetailsFromEvent(splitProto, (Configuration)this.jobConf);
            LOG.info((Object)("Split Details -> SplitClass: " + this.newInputSplit.getClass().getName() + ", NewSplit: " + this.newInputSplit));
            this.setupNewRecordReader();
        } else {
            this.oldInputSplit = MRInput.getOldSplitDetailsFromEvent(splitProto, (Configuration)this.jobConf);
            LOG.info((Object)("Split Details -> SplitClass: " + this.oldInputSplit.getClass().getName() + ", OldSplit: " + this.oldInputSplit));
            this.setupOldRecordReader();
        }
        LOG.info((Object)"Initialized RecordReader from event");
    }

    @InterfaceAudience.Private
    public static InputSplit getOldSplitDetailsFromEvent(MRRuntimeProtos.MRSplitProto splitProto, Configuration conf) throws IOException {
        SerializationFactory serializationFactory = new SerializationFactory(conf);
        return MRHelpers.createOldFormatSplitFromUserPayload(splitProto, serializationFactory);
    }

    private InputSplit getOldSplitDetailsFromDisk(JobSplit.TaskSplitIndex splitMetaInfo) throws IOException {
        Class cls;
        Path file = new Path(splitMetaInfo.getSplitLocation());
        LocalFileSystem fs = FileSystem.getLocal((Configuration)this.jobConf);
        file = fs.makeQualified(file);
        LOG.info((Object)("Reading input split file from : " + file));
        long offset = splitMetaInfo.getStartOffset();
        FSDataInputStream inFile = fs.open(file);
        inFile.seek(offset);
        String className = Text.readString((DataInput)inFile);
        try {
            cls = this.jobConf.getClassByName(className);
        }
        catch (ClassNotFoundException ce) {
            IOException wrap = new IOException("Split class " + className + " not found");
            wrap.initCause(ce);
            throw wrap;
        }
        SerializationFactory factory = new SerializationFactory((Configuration)this.jobConf);
        Deserializer deserializer = factory.getDeserializer(cls);
        deserializer.open((InputStream)inFile);
        InputSplit split = (InputSplit)deserializer.deserialize(null);
        long pos = inFile.getPos();
        this.inputContext.getCounters().findCounter((Enum)TaskCounter.SPLIT_RAW_BYTES).increment(pos - offset);
        inFile.close();
        return split;
    }

    @InterfaceAudience.Private
    public static org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromEvent(MRRuntimeProtos.MRSplitProto splitProto, Configuration conf) throws IOException {
        SerializationFactory serializationFactory = new SerializationFactory(conf);
        return MRHelpers.createNewFormatSplitFromUserPayload(splitProto, serializationFactory);
    }

    private org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromDisk(JobSplit.TaskSplitIndex splitMetaInfo) throws IOException {
        Class cls;
        Path file = new Path(splitMetaInfo.getSplitLocation());
        long offset = splitMetaInfo.getStartOffset();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)this.jobConf);
        file = fs.makeQualified(file);
        LOG.info((Object)("Reading input split file from : " + file));
        FSDataInputStream inFile = fs.open(file);
        inFile.seek(offset);
        String className = Text.readString((DataInput)inFile);
        try {
            cls = this.jobConf.getClassByName(className);
        }
        catch (ClassNotFoundException ce) {
            IOException wrap = new IOException("Split class " + className + " not found");
            wrap.initCause(ce);
            throw wrap;
        }
        SerializationFactory factory = new SerializationFactory((Configuration)this.jobConf);
        Deserializer deserializer = factory.getDeserializer(cls);
        deserializer.open((InputStream)inFile);
        org.apache.hadoop.mapreduce.InputSplit split = (org.apache.hadoop.mapreduce.InputSplit)deserializer.deserialize(null);
        long pos = inFile.getPos();
        this.inputContext.getCounters().findCounter((Enum)TaskCounter.SPLIT_RAW_BYTES).increment(pos - offset);
        inFile.close();
        return split;
    }

    private void setIncrementalConfigParams(InputSplit inputSplit) {
        if (inputSplit instanceof FileSplit) {
            FileSplit fileSplit = (FileSplit)inputSplit;
            this.incrementalConf = new Configuration(false);
            this.incrementalConf.set("mapreduce.map.input.file", fileSplit.getPath().toString());
            this.incrementalConf.setLong("mapreduce.map.input.start", fileSplit.getStart());
            this.incrementalConf.setLong("mapreduce.map.input.length", fileSplit.getLength());
        }
        LOG.info((Object)("Processing split: " + inputSplit));
    }

    protected JobSplit.TaskSplitMetaInfo[] readSplits(Configuration conf) throws IOException {
        JobSplit.TaskSplitMetaInfo[] allTaskSplitMetaInfo = SplitMetaInfoReaderTez.readSplitMetaInfo(conf, (FileSystem)FileSystem.getLocal((Configuration)conf));
        return allTaskSplitMetaInfo;
    }

    private class MRInputKVReader
    implements KeyValueReader {
        Object key;
        Object value;
        private final boolean localNewApi;

        MRInputKVReader() {
            this.localNewApi = MRInput.this.useNewApi;
            if (!this.localNewApi) {
                this.key = MRInput.this.oldRecordReader.createKey();
                this.value = MRInput.this.oldRecordReader.createValue();
            }
        }

        public boolean next() throws IOException {
            boolean hasNext = false;
            if (this.localNewApi) {
                try {
                    hasNext = MRInput.this.newRecordReader.nextKeyValue();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Interrupted while checking for next key-value", e);
                }
            } else {
                hasNext = MRInput.this.oldRecordReader.next(this.key, this.value);
            }
            if (hasNext) {
                MRInput.this.inputRecordCounter.increment(1L);
            }
            return hasNext;
        }

        public Object getCurrentKey() throws IOException {
            if (this.localNewApi) {
                try {
                    return MRInput.this.newRecordReader.getCurrentKey();
                }
                catch (InterruptedException e) {
                    throw new IOException("Interrupted while fetching next key", e);
                }
            }
            return this.key;
        }

        public Object getCurrentValue() throws IOException {
            if (this.localNewApi) {
                try {
                    return MRInput.this.newRecordReader.getCurrentValue();
                }
                catch (InterruptedException e) {
                    throw new IOException("Interrupted while fetching next value", e);
                }
            }
            return this.value;
        }
    }
}

