/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle.impl;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.api.TezTaskContext;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.combine.Combiner;
import org.apache.tez.runtime.library.common.shuffle.impl.ExceptionReporter;
import org.apache.tez.runtime.library.common.shuffle.impl.Fetcher;
import org.apache.tez.runtime.library.common.shuffle.impl.MergeManager;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleClientMetrics;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleInputEventHandler;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleScheduler;
import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class Shuffle
implements ExceptionReporter {
    private static final Log LOG = LogFactory.getLog(Shuffle.class);
    private static final int PROGRESS_FREQUENCY = 2000;
    private final Configuration conf;
    private final TezInputContext inputContext;
    private final ShuffleClientMetrics metrics;
    private final ShuffleInputEventHandler eventHandler;
    private final ShuffleScheduler scheduler;
    private final MergeManager merger;
    private Throwable throwable = null;
    private String throwingThreadName = null;
    private final int numInputs;
    private final SecretKey jobTokenSecret;
    private final CompressionCodec codec;
    private final boolean ifileReadAhead;
    private final int ifileReadAheadLength;
    private FutureTask<TezRawKeyValueIterator> runShuffleFuture;

    public Shuffle(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
        this.inputContext = inputContext;
        this.conf = conf;
        this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(), inputContext.getTaskVertexName(), inputContext.getTaskIndex(), this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
        this.numInputs = numInputs;
        this.jobTokenSecret = ShuffleUtils.getJobTokenSecretFromTokenBytes(inputContext.getServiceConsumerMetaData("mapreduce_shuffle"));
        if (ConfigUtils.isIntermediateInputCompressed(conf)) {
            Class<? extends CompressionCodec> codecClass = ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
            this.codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, (Configuration)conf);
        } else {
            this.codec = null;
        }
        this.ifileReadAhead = conf.getBoolean("tez.runtime.ifile.readahead", true);
        this.ifileReadAheadLength = this.ifileReadAhead ? conf.getInt("tez.runtime.ifile.readahead.bytes", 0x400000) : 0;
        Combiner combiner = TezRuntimeUtils.instantiateCombiner(conf, (TezTaskContext)inputContext);
        LocalFileSystem localFS = FileSystem.getLocal((Configuration)this.conf);
        LocalDirAllocator localDirAllocator = new LocalDirAllocator("tez.runtime.local.dirs");
        TezCounter shuffledMapsCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.SHUFFLED_MAPS);
        TezCounter reduceShuffleBytes = inputContext.getCounters().findCounter((Enum)TaskCounter.REDUCE_SHUFFLE_BYTES);
        TezCounter failedShuffleCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.FAILED_SHUFFLE);
        TezCounter spilledRecordsCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.SPILLED_RECORDS);
        TezCounter reduceCombineInputCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.COMBINE_INPUT_RECORDS);
        TezCounter mergedMapOutputsCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.MERGED_MAP_OUTPUTS);
        LOG.info((Object)("Shuffle assigned with " + numInputs + " inputs" + ", codec: " + (this.codec == null ? "None" : this.codec.getClass().getName()) + "ifileReadAhead: " + this.ifileReadAhead));
        this.scheduler = new ShuffleScheduler(this.inputContext, this.conf, this.numInputs, this, shuffledMapsCounter, reduceShuffleBytes, failedShuffleCounter);
        this.eventHandler = new ShuffleInputEventHandler(inputContext, this.scheduler);
        this.merger = new MergeManager(this.conf, (FileSystem)localFS, localDirAllocator, inputContext, combiner, spilledRecordsCounter, reduceCombineInputCounter, mergedMapOutputsCounter, this);
    }

    public void handleEvents(List<Event> events) {
        this.eventHandler.handleEvents(events);
    }

    public boolean isInputReady() {
        if (this.runShuffleFuture == null) {
            return false;
        }
        return this.runShuffleFuture.isDone();
    }

    public TezRawKeyValueIterator waitForInput() throws IOException, InterruptedException {
        TezRawKeyValueIterator kvIter;
        Preconditions.checkState((this.runShuffleFuture != null ? 1 : 0) != 0, (Object)"waitForInput can only be called after run");
        try {
            kvIter = this.runShuffleFuture.get();
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof IOException) {
                throw (IOException)cause;
            }
            if (cause instanceof InterruptedException) {
                throw (InterruptedException)cause;
            }
            throw new TezUncheckedException("Unexpected exception type while running Shuffle and Merge", cause);
        }
        return kvIter;
    }

    public void run() {
        RunShuffleCallable runShuffle = new RunShuffleCallable();
        this.runShuffleFuture = new FutureTask<TezRawKeyValueIterator>(runShuffle);
        new Thread(this.runShuffleFuture, "ShuffleMergeRunner").start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void reportException(Throwable t) {
        if (this.throwable == null) {
            this.throwable = t;
            this.throwingThreadName = Thread.currentThread().getName();
            ShuffleScheduler shuffleScheduler = this.scheduler;
            synchronized (shuffleScheduler) {
                this.scheduler.notifyAll();
            }
        }
    }

    public static class ShuffleError
    extends IOException {
        private static final long serialVersionUID = 5753909320586607881L;

        ShuffleError(String msg, Throwable t) {
            super(msg, t);
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private class RunShuffleCallable
    implements Callable<TezRawKeyValueIterator> {
        private RunShuffleCallable() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public TezRawKeyValueIterator call() throws IOException, InterruptedException {
            int numFetchers = Shuffle.this.conf.getInt("tez.runtime.shuffle.parallel.copies", 20);
            Fetcher[] fetchers = new Fetcher[numFetchers];
            for (int i = 0; i < numFetchers; ++i) {
                fetchers[i] = new Fetcher(Shuffle.this.conf, Shuffle.this.scheduler, Shuffle.this.merger, Shuffle.this.metrics, Shuffle.this, Shuffle.this.jobTokenSecret, Shuffle.this.ifileReadAhead, Shuffle.this.ifileReadAheadLength, Shuffle.this.codec, Shuffle.this.inputContext);
                fetchers[i].start();
            }
            while (!Shuffle.this.scheduler.waitUntilDone(2000)) {
                RunShuffleCallable i = this;
                synchronized (i) {
                    if (Shuffle.this.throwable != null) {
                        throw new ShuffleError("error in shuffle in " + Shuffle.this.throwingThreadName, Shuffle.this.throwable);
                    }
                }
            }
            for (Fetcher fetcher : fetchers) {
                fetcher.shutDown();
            }
            fetchers = null;
            Shuffle.this.scheduler.close();
            TezRawKeyValueIterator kvIter = null;
            try {
                kvIter = Shuffle.this.merger.close();
            }
            catch (Throwable e) {
                throw new ShuffleError("Error while doing final merge ", e);
            }
            Shuffle shuffle = Shuffle.this;
            synchronized (shuffle) {
                if (Shuffle.this.throwable != null) {
                    throw new ShuffleError("error in shuffle in " + Shuffle.this.throwingThreadName, Shuffle.this.throwable);
                }
            }
            return kvIter;
        }
    }
}

