/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.shuffle.common.impl;

import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.net.NetUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.InputIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.shuffle.common.FetchResult;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
import org.apache.tez.runtime.library.shuffle.common.Fetcher;
import org.apache.tez.runtime.library.shuffle.common.FetcherCallback;
import org.apache.tez.runtime.library.shuffle.common.HttpConnection;
import org.apache.tez.runtime.library.shuffle.common.InputHost;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;

public class ShuffleManager
implements FetcherCallback {
    private static final Log LOG = LogFactory.getLog(ShuffleManager.class);
    private final InputContext inputContext;
    private final int numInputs;
    private final DecimalFormat mbpsFormat = new DecimalFormat("0.00");
    private final FetchedInputAllocator inputManager;
    private final ListeningExecutorService fetcherExecutor;
    private final ListeningExecutorService schedulerExecutor;
    private final RunShuffleCallable schedulerCallable;
    private final BlockingQueue<FetchedInput> completedInputs;
    private final AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
    private final Set<InputIdentifier> completedInputSet;
    private final ConcurrentMap<String, InputHost> knownSrcHosts;
    private final BlockingQueue<InputHost> pendingHosts;
    private final Set<InputAttemptIdentifier> obsoletedInputs;
    private Set<Fetcher> runningFetchers;
    private final AtomicInteger numCompletedInputs = new AtomicInteger(0);
    private final long startTime;
    private long lastProgressTime;
    private long totalBytesShuffledTillNow;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition wakeLoop = this.lock.newCondition();
    private final int numFetchers;
    private final SecretKey shuffleSecret;
    private final CompressionCodec codec;
    private final boolean localDiskFetchEnabled;
    private final boolean sharedFetchEnabled;
    private final int ifileBufferSize;
    private final boolean ifileReadAhead;
    private final int ifileReadAheadLength;
    private final String srcNameTrimmed;
    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
    private final TezCounter shuffledInputsCounter;
    private final TezCounter failedShufflesCounter;
    private final TezCounter bytesShuffledCounter;
    private final TezCounter decompressedDataSizeCounter;
    private final TezCounter bytesShuffledToDiskCounter;
    private final TezCounter bytesShuffledToMemCounter;
    private final TezCounter bytesShuffledDirectDiskCounter;
    private volatile Throwable shuffleError;
    private final HttpConnection.HttpConnectionParams httpConnectionParams;
    private final LocalDirAllocator localDirAllocator;
    private final RawLocalFileSystem localFs;
    private final Path[] localDisks;
    private static final String localhostName = NetUtils.getHostname();

    public ShuffleManager(InputContext inputContext, Configuration conf, int numInputs, int bufferSize, boolean ifileReadAheadEnabled, int ifileReadAheadLength, CompressionCodec codec, FetchedInputAllocator inputAllocator) throws IOException {
        this.inputContext = inputContext;
        this.numInputs = numInputs;
        this.shuffledInputsCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.NUM_SHUFFLED_INPUTS);
        this.failedShufflesCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
        this.bytesShuffledCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.SHUFFLE_BYTES);
        this.decompressedDataSizeCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
        this.bytesShuffledToDiskCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.SHUFFLE_BYTES_TO_DISK);
        this.bytesShuffledToMemCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.SHUFFLE_BYTES_TO_MEM);
        this.bytesShuffledDirectDiskCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
        this.ifileBufferSize = bufferSize;
        this.ifileReadAhead = ifileReadAheadEnabled;
        this.ifileReadAheadLength = ifileReadAheadLength;
        this.codec = codec;
        this.inputManager = inputAllocator;
        this.localDiskFetchEnabled = conf.getBoolean("tez.runtime.optimize.local.fetch", false);
        this.sharedFetchEnabled = conf.getBoolean("tez.runtime.optimize.shared.fetch", false);
        this.srcNameTrimmed = TezUtilsInternal.cleanVertexName((String)inputContext.getSourceVertexName());
        this.completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap(numInputs));
        this.completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
        this.knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
        this.pendingHosts = new LinkedBlockingQueue<InputHost>();
        this.obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap());
        this.runningFetchers = Collections.newSetFromMap(new ConcurrentHashMap());
        int maxConfiguredFetchers = conf.getInt("tez.runtime.shuffle.parallel.copies", 20);
        this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
        ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(this.numFetchers, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Fetcher [" + this.srcNameTrimmed + "] #%d " + localhostName).build());
        this.fetcherExecutor = MoreExecutors.listeningDecorator((ExecutorService)fetcherRawExecutor);
        ExecutorService schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ShuffleRunner [" + this.srcNameTrimmed + "]").build());
        this.schedulerExecutor = MoreExecutors.listeningDecorator((ExecutorService)schedulerRawExecutor);
        this.schedulerCallable = new RunShuffleCallable(conf);
        this.lastProgressTime = this.startTime = System.currentTimeMillis();
        this.shuffleSecret = ShuffleUtils.getJobTokenSecretFromTokenBytes(inputContext.getServiceConsumerMetaData("mapreduce_shuffle"));
        this.httpConnectionParams = ShuffleUtils.constructHttpShuffleConnectionParams(conf);
        this.localFs = (RawLocalFileSystem)FileSystem.getLocal((Configuration)conf).getRaw();
        this.localDirAllocator = new LocalDirAllocator("tez.runtime.framework.local.dirs");
        this.localDisks = (Path[])Iterables.toArray((Iterable)this.localDirAllocator.getAllLocalPathsToRead(".", conf), Path.class);
        Arrays.sort(this.localDisks);
        LOG.info((Object)(this.getClass().getSimpleName() + " : numInputs=" + numInputs + ", compressionCodec=" + (codec == null ? "NoCompressionCodec" : codec.getClass().getName()) + ", numFetchers=" + this.numFetchers + ", ifileBufferSize=" + this.ifileBufferSize + ", ifileReadAheadEnabled=" + this.ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength + ", " + "localDiskFetchEnabled=" + this.localDiskFetchEnabled + ", " + "sharedFetchEnabled=" + this.sharedFetchEnabled + ", " + this.httpConnectionParams.toString()));
    }

    public void run() throws IOException {
        Preconditions.checkState((this.inputManager != null ? 1 : 0) != 0, (Object)"InputManager must be configured");
        ListenableFuture runShuffleFuture = this.schedulerExecutor.submit((Callable)this.schedulerCallable);
        Futures.addCallback((ListenableFuture)runShuffleFuture, (FutureCallback)new SchedulerFutureCallback());
        this.schedulerExecutor.shutdown();
    }

    private Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) {
        Path lockDisk = null;
        if (this.sharedFetchEnabled) {
            int h = Math.abs(Objects.hashCode((Object[])new Object[]{this.srcNameTrimmed, inputHost.getHost()}));
            lockDisk = new Path(this.localDisks[h % this.localDisks.length], "locks");
        }
        Fetcher.FetcherBuilder fetcherBuilder = new Fetcher.FetcherBuilder(this, this.httpConnectionParams, this.inputManager, this.inputContext.getApplicationId(), this.shuffleSecret, this.srcNameTrimmed, conf, this.localFs, this.localDirAllocator, lockDisk, this.localDiskFetchEnabled, this.sharedFetchEnabled);
        if (this.codec != null) {
            fetcherBuilder.setCompressionParameters(this.codec);
        }
        fetcherBuilder.setIFileParams(this.ifileReadAhead, this.ifileReadAheadLength);
        List<InputAttemptIdentifier> pendingInputsForHost = inputHost.clearAndGetPendingInputs();
        Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost.iterator();
        while (inputIter.hasNext()) {
            InputAttemptIdentifier input = inputIter.next();
            if (this.completedInputSet.contains(input.getInputIdentifier())) {
                inputIter.remove();
                continue;
            }
            if (!this.obsoletedInputs.contains(input)) continue;
            inputIter.remove();
        }
        fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), inputHost.getSrcPhysicalIndex(), pendingInputsForHost);
        LOG.info((Object)("Created Fetcher for host: " + inputHost.getHost() + ", with inputs: " + pendingInputsForHost));
        return fetcherBuilder.build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addKnownInput(String hostName, int port, InputAttemptIdentifier srcAttemptIdentifier, int srcPhysicalIndex) {
        String identifier = InputHost.createIdentifier(hostName, port);
        InputHost host = (InputHost)this.knownSrcHosts.get(identifier);
        if (host == null) {
            host = new InputHost(hostName, port, this.inputContext.getApplicationId(), srcPhysicalIndex);
            assert (identifier.equals(host.getIdentifier()));
            InputHost old = this.knownSrcHosts.putIfAbsent(identifier, host);
            if (old != null) {
                host = old;
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Adding input: " + srcAttemptIdentifier + ", to host: " + host));
        }
        host.addKnownInput(srcAttemptIdentifier);
        this.lock.lock();
        try {
            boolean added = this.pendingHosts.offer(host);
            if (!added) {
                String errorMessage = "Unable to add host: " + host.getIdentifier() + " to pending queue";
                LOG.error((Object)errorMessage);
                throw new TezUncheckedException(errorMessage);
            }
            this.wakeLoop.signal();
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addCompletedInputWithNoData(InputAttemptIdentifier srcAttemptIdentifier) {
        InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
        LOG.info((Object)("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete."));
        if (!this.completedInputSet.contains(inputIdentifier)) {
            Set<InputIdentifier> set = this.completedInputSet;
            synchronized (set) {
                if (!this.completedInputSet.contains(inputIdentifier)) {
                    this.registerCompletedInput(new NullFetchedInput(srcAttemptIdentifier));
                }
            }
        }
        this.lock.lock();
        try {
            this.wakeLoop.signal();
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addCompletedInputWithData(InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput) throws IOException {
        InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
        LOG.info((Object)("Received Data via Event: " + srcAttemptIdentifier + " to " + (Object)((Object)fetchedInput.getType())));
        this.lock.lock();
        try {
            this.lastProgressTime = System.currentTimeMillis();
        }
        finally {
            this.lock.unlock();
        }
        boolean committed = false;
        if (!this.completedInputSet.contains(inputIdentifier)) {
            Set<InputIdentifier> set = this.completedInputSet;
            synchronized (set) {
                if (!this.completedInputSet.contains(inputIdentifier)) {
                    fetchedInput.commit();
                    committed = true;
                    this.registerCompletedInput(fetchedInput);
                }
            }
        }
        if (!committed) {
            fetchedInput.abort();
        } else {
            this.lock.lock();
            try {
                this.wakeLoop.signal();
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    public synchronized void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) {
        this.obsoletedInputs.add(srcAttemptIdentifier);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void fetchSucceeded(String host, InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput, long fetchedBytes, long decompressedLength, long copyDuration) throws IOException {
        InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
        this.lock.lock();
        try {
            this.lastProgressTime = System.currentTimeMillis();
        }
        finally {
            this.lock.unlock();
        }
        boolean committed = false;
        if (!this.completedInputSet.contains(inputIdentifier)) {
            Set<InputIdentifier> set = this.completedInputSet;
            synchronized (set) {
                if (!this.completedInputSet.contains(inputIdentifier)) {
                    fetchedInput.commit();
                    committed = true;
                    this.logIndividualFetchComplete(copyDuration, fetchedBytes, decompressedLength, fetchedInput, srcAttemptIdentifier);
                    this.shuffledInputsCounter.increment(1L);
                    this.bytesShuffledCounter.increment(fetchedBytes);
                    if (fetchedInput.getType() == FetchedInput.Type.MEMORY) {
                        this.bytesShuffledToMemCounter.increment(fetchedBytes);
                    } else if (fetchedInput.getType() == FetchedInput.Type.DISK) {
                        this.bytesShuffledToDiskCounter.increment(fetchedBytes);
                    } else if (fetchedInput.getType() == FetchedInput.Type.DISK_DIRECT) {
                        this.bytesShuffledDirectDiskCounter.increment(fetchedBytes);
                    }
                    this.decompressedDataSizeCounter.increment(decompressedLength);
                    this.registerCompletedInput(fetchedInput);
                    this.lock.lock();
                    try {
                        this.totalBytesShuffledTillNow += fetchedBytes;
                    }
                    finally {
                        this.lock.unlock();
                    }
                    this.logProgress();
                }
            }
        }
        if (!committed) {
            fetchedInput.abort();
        } else {
            this.lock.lock();
            try {
                this.wakeLoop.signal();
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    @Override
    public void fetchFailed(String host, InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
        LOG.info((Object)("Fetch failed for src: " + srcAttemptIdentifier + "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: " + connectFailed));
        this.failedShufflesCounter.increment(1L);
        if (srcAttemptIdentifier == null) {
            String message = "Received fetchFailure for an unknown src (null)";
            LOG.fatal((Object)message);
            this.inputContext.fatalError(null, message);
        } else {
            InputReadErrorEvent readError = InputReadErrorEvent.create((String)("Fetch failure while fetching from " + TezRuntimeUtils.getTaskAttemptIdentifier(this.inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier().getInputIndex(), srcAttemptIdentifier.getAttemptNumber())), (int)srcAttemptIdentifier.getInputIdentifier().getInputIndex(), (int)srcAttemptIdentifier.getAttemptNumber());
            ArrayList failedEvents = Lists.newArrayListWithCapacity((int)1);
            failedEvents.add(readError);
            this.inputContext.sendEvents((List)failedEvents);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() throws InterruptedException {
        if (!this.isShutdown.getAndSet(true)) {
            LOG.info((Object)("Shutting down pending fetchers on source" + this.srcNameTrimmed + ": " + this.runningFetchers.size()));
            this.lock.lock();
            try {
                this.wakeLoop.signal();
                for (Fetcher fetcher : this.runningFetchers) {
                    fetcher.shutdown();
                }
            }
            finally {
                this.lock.unlock();
            }
            if (this.schedulerExecutor != null && !this.schedulerExecutor.isShutdown()) {
                this.schedulerExecutor.shutdownNow();
            }
            if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
                this.fetcherExecutor.shutdownNow();
            }
        }
        if (this.httpConnectionParams.isSSLShuffleEnabled()) {
            HttpConnection.cleanupSSLFactory();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void registerCompletedInput(FetchedInput fetchedInput) {
        this.lock.lock();
        try {
            int numComplete;
            this.completedInputSet.add(fetchedInput.getInputAttemptIdentifier().getInputIdentifier());
            this.completedInputs.add(fetchedInput);
            if (!this.inputReadyNotificationSent.getAndSet(true)) {
                this.inputContext.inputIsReady();
            }
            if ((numComplete = this.numCompletedInputs.incrementAndGet()) == this.numInputs) {
                LOG.info((Object)("All inputs fetched for input vertex : " + this.inputContext.getSourceVertexName()));
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    public boolean newInputAvailable() {
        FetchedInput head = (FetchedInput)this.completedInputs.peek();
        return head != null && !(head instanceof NullFetchedInput);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean allInputsFetched() {
        this.lock.lock();
        try {
            boolean bl = this.numCompletedInputs.get() == this.numInputs;
            return bl;
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FetchedInput getNextInput() throws InterruptedException {
        FetchedInput input = null;
        do {
            this.lock.lock();
            try {
                input = (FetchedInput)this.completedInputs.peek();
                if (input != null || !this.allInputsFetched()) continue;
                break;
            }
            finally {
                this.lock.unlock();
            }
        } while ((input = this.completedInputs.take()) instanceof NullFetchedInput);
        return input;
    }

    private void logProgress() {
        double mbs = (double)this.totalBytesShuffledTillNow / 1048576.0;
        int inputsDone = this.numInputs - this.numCompletedInputs.get();
        long secsSinceStart = (System.currentTimeMillis() - this.startTime) / 1000L + 1L;
        double transferRate = mbs / (double)secsSinceStart;
        LOG.info((Object)("copy(" + inputsDone + " of " + this.numInputs + ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) " + this.mbpsFormat.format(transferRate) + " MB/s)"));
    }

    private void logIndividualFetchComplete(long millis, long fetchedBytes, long decompressedLength, FetchedInput fetchedInput, InputAttemptIdentifier srcAttemptIdentifier) {
        double rate = 0.0;
        if (millis != 0L) {
            rate = (double)fetchedBytes / ((double)millis / 1000.0);
            rate /= 1048576.0;
        }
        LOG.info((Object)("Completed fetch for attempt: " + srcAttemptIdentifier + " to " + (Object)((Object)fetchedInput.getType()) + ", CompressedSize=" + fetchedBytes + ", DecompressedSize=" + decompressedLength + ",EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" + this.mbpsFormat.format(rate) + " MB/s"));
    }

    private class FetchFutureCallback
    implements FutureCallback<FetchResult> {
        private final Fetcher fetcher;

        public FetchFutureCallback(Fetcher fetcher) {
            this.fetcher = fetcher;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doBookKeepingForFetcherComplete() {
            ShuffleManager.this.lock.lock();
            try {
                ShuffleManager.this.runningFetchers.remove(this.fetcher);
                ShuffleManager.this.wakeLoop.signal();
            }
            finally {
                ShuffleManager.this.lock.unlock();
            }
        }

        public void onSuccess(FetchResult result) {
            this.fetcher.shutdown();
            if (ShuffleManager.this.isShutdown.get()) {
                LOG.info((Object)"Already shutdown. Ignoring event from fetcher");
            } else {
                Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
                if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
                    InputHost inputHost = (InputHost)ShuffleManager.this.knownSrcHosts.get(InputHost.createIdentifier(result.getHost(), result.getPort()));
                    assert (inputHost != null);
                    for (InputAttemptIdentifier input : pendingInputs) {
                        inputHost.addKnownInput(input);
                    }
                    ShuffleManager.this.pendingHosts.add(inputHost);
                }
                this.doBookKeepingForFetcherComplete();
            }
        }

        public void onFailure(Throwable t) {
            this.fetcher.shutdown();
            if (ShuffleManager.this.isShutdown.get()) {
                LOG.info((Object)("Already shutdown. Ignoring error from fetcher: " + t));
            } else {
                LOG.error((Object)"Fetcher failed with error: ", t);
                ShuffleManager.this.shuffleError = t;
                ShuffleManager.this.inputContext.fatalError(t, "Fetch failed");
                this.doBookKeepingForFetcherComplete();
            }
        }
    }

    private class SchedulerFutureCallback
    implements FutureCallback<Void> {
        private SchedulerFutureCallback() {
        }

        public void onSuccess(Void result) {
            LOG.info((Object)"Scheduler thread completed");
        }

        public void onFailure(Throwable t) {
            if (ShuffleManager.this.isShutdown.get()) {
                LOG.info((Object)("Already shutdown. Ignoring error: " + t));
            } else {
                LOG.error((Object)"Scheduler failed with error: ", t);
                ShuffleManager.this.inputContext.fatalError(t, "Shuffle Scheduler Failed");
            }
        }
    }

    private class NullFetchedInput
    extends FetchedInput {
        public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
            super(FetchedInput.Type.MEMORY, -1L, -1L, inputAttemptIdentifier, null);
        }

        @Override
        public OutputStream getOutputStream() throws IOException {
            throw new UnsupportedOperationException("Not supported for NullFetchedInput");
        }

        @Override
        public InputStream getInputStream() throws IOException {
            throw new UnsupportedOperationException("Not supported for NullFetchedInput");
        }

        @Override
        public void commit() throws IOException {
            throw new UnsupportedOperationException("Not supported for NullFetchedInput");
        }

        @Override
        public void abort() throws IOException {
            throw new UnsupportedOperationException("Not supported for NullFetchedInput");
        }

        @Override
        public void free() {
            throw new UnsupportedOperationException("Not supported for NullFetchedInput");
        }
    }

    private class RunShuffleCallable
    implements Callable<Void> {
        private final Configuration conf;

        public RunShuffleCallable(Configuration conf) {
            this.conf = conf;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws Exception {
            block8: while (!ShuffleManager.this.isShutdown.get() && ShuffleManager.this.numCompletedInputs.get() < ShuffleManager.this.numInputs) {
                ShuffleManager.this.lock.lock();
                try {
                    if ((ShuffleManager.this.runningFetchers.size() >= ShuffleManager.this.numFetchers || ShuffleManager.this.pendingHosts.isEmpty()) && ShuffleManager.this.numCompletedInputs.get() < ShuffleManager.this.numInputs) {
                        ShuffleManager.this.wakeLoop.await();
                    }
                }
                finally {
                    ShuffleManager.this.lock.unlock();
                }
                if (ShuffleManager.this.shuffleError != null) break;
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("NumCompletedInputs: " + ShuffleManager.this.numCompletedInputs));
                }
                if (ShuffleManager.this.numCompletedInputs.get() >= ShuffleManager.this.numInputs || ShuffleManager.this.isShutdown.get()) continue;
                ShuffleManager.this.lock.lock();
                try {
                    int maxFetchersToRun = ShuffleManager.this.numFetchers - ShuffleManager.this.runningFetchers.size();
                    int count = 0;
                    while (ShuffleManager.this.pendingHosts.peek() != null && !ShuffleManager.this.isShutdown.get()) {
                        InputHost inputHost = null;
                        try {
                            inputHost = (InputHost)ShuffleManager.this.pendingHosts.take();
                        }
                        catch (InterruptedException e) {
                            if (ShuffleManager.this.isShutdown.get()) {
                                LOG.info((Object)"Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
                                continue block8;
                            }
                            throw e;
                        }
                        if (LOG.isDebugEnabled()) {
                            LOG.debug((Object)("Processing pending host: " + inputHost.toDetailedString()));
                        }
                        if (inputHost.getNumPendingInputs() > 0 && !ShuffleManager.this.isShutdown.get()) {
                            LOG.info((Object)("Scheduling fetch for inputHost: " + inputHost.getIdentifier()));
                            Fetcher fetcher = ShuffleManager.this.constructFetcherForHost(inputHost, this.conf);
                            ShuffleManager.this.runningFetchers.add(fetcher);
                            if (ShuffleManager.this.isShutdown.get()) {
                                LOG.info((Object)"hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
                            }
                            ListenableFuture future = ShuffleManager.this.fetcherExecutor.submit((Callable)fetcher);
                            Futures.addCallback((ListenableFuture)future, (FutureCallback)new FetchFutureCallback(fetcher));
                            if (++count < maxFetchersToRun) continue;
                            continue block8;
                        }
                        if (!LOG.isDebugEnabled()) continue;
                        LOG.debug((Object)("Skipping host: " + inputHost.getIdentifier() + " since it has no inputs to process"));
                    }
                }
                finally {
                    ShuffleManager.this.lock.unlock();
                }
            }
            LOG.info((Object)("Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted()));
            if (!ShuffleManager.this.fetcherExecutor.isShutdown()) {
                ShuffleManager.this.fetcherExecutor.shutdownNow();
            }
            return null;
        }
    }
}

