/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.crypto.SecretKey;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.InputIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetchedInputAllocatorOrderedGrouped;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetcherOrderedGrouped;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleClientMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ShuffleScheduler {
    @VisibleForTesting
    static final String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
    private final AtomicLong shuffleStart = new AtomicLong(0L);
    private static final Logger LOG = LoggerFactory.getLogger(ShuffleScheduler.class);
    private static final long INITIAL_PENALTY = 2000L;
    private static final float PENALTY_GROWTH_RATE = 1.3f;
    private final BitSet finishedMaps;
    private final int numInputs;
    private int numFetchedSpills;
    @VisibleForTesting
    final Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
    private final ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, InputAttemptIdentifier>();
    @VisibleForTesting
    final Map<InputIdentifier, ShuffleEventInfo> shuffleInfoEventsMap;
    private final Set<MapHost> pendingHosts = new HashSet<MapHost>();
    private final Set<InputAttemptIdentifier> obsoleteInputs = new HashSet<InputAttemptIdentifier>();
    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
    private final Random random = new Random(System.currentTimeMillis());
    private final DelayQueue<Penalty> penalties = new DelayQueue();
    private final Referee referee;
    private final Map<InputAttemptIdentifier, IntWritable> failureCounts = new HashMap<InputAttemptIdentifier, IntWritable>();
    private final Map<String, IntWritable> hostFailures = new HashMap<String, IntWritable>();
    private final InputContext inputContext;
    private final TezCounter shuffledInputsCounter;
    private final TezCounter skippedInputCounter;
    private final TezCounter reduceShuffleBytes;
    private final TezCounter reduceBytesDecompressed;
    private final TezCounter failedShuffleCounter;
    private final TezCounter bytesShuffledToDisk;
    private final TezCounter bytesShuffledToDiskDirect;
    private final TezCounter bytesShuffledToMem;
    private final TezCounter firstEventReceived;
    private final TezCounter lastEventReceived;
    private final String srcNameTrimmed;
    private final AtomicInteger remainingMaps;
    private final long startTime;
    private long lastProgressTime;
    private final int numFetchers;
    private final Set<FetcherOrderedGrouped> runningFetchers = Collections.newSetFromMap(new ConcurrentHashMap());
    private final ListeningExecutorService fetcherExecutor;
    private final HttpConnectionParams httpConnectionParams;
    private final FetchedInputAllocatorOrderedGrouped allocator;
    private final ShuffleClientMetrics shuffleMetrics;
    private final Shuffle shuffle;
    private final MergeManager mergeManager;
    private final JobTokenSecretManager jobTokenSecretManager;
    private final boolean ifileReadAhead;
    private final int ifileReadAheadLength;
    private final CompressionCodec codec;
    private final Configuration conf;
    private final boolean localDiskFetchEnabled;
    private final String localHostname;
    private final int shufflePort;
    private final boolean asyncHttp;
    private final TezCounter ioErrsCounter;
    private final TezCounter wrongLengthErrsCounter;
    private final TezCounter badIdErrsCounter;
    private final TezCounter wrongMapErrsCounter;
    private final TezCounter connectionErrsCounter;
    private final TezCounter wrongReduceErrsCounter;
    private final int maxTaskOutputAtOnce;
    private final int maxFetchFailuresBeforeReporting;
    private final boolean reportReadErrorImmediately;
    private final int maxFailedUniqueFetches;
    private final int abortFailureLimit;
    private int maxMapRuntime = 0;
    private long totalBytesShuffledTillNow = 0L;
    private DecimalFormat mbpsFormat = new DecimalFormat("0.00");

    public ShuffleScheduler(InputContext inputContext, Configuration conf, int numberOfInputs, Shuffle shuffle, MergeManager mergeManager, FetchedInputAllocatorOrderedGrouped allocator, long startTime, CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, String srcNameTrimmed) throws IOException {
        this.inputContext = inputContext;
        this.conf = conf;
        this.shuffle = shuffle;
        this.allocator = allocator;
        this.mergeManager = mergeManager;
        this.numInputs = numberOfInputs;
        this.abortFailureLimit = Math.max(30, numberOfInputs / 10);
        this.remainingMaps = new AtomicInteger(numberOfInputs);
        this.finishedMaps = new BitSet(numberOfInputs);
        this.ifileReadAhead = ifileReadAhead;
        this.ifileReadAheadLength = ifileReadAheadLength;
        this.srcNameTrimmed = srcNameTrimmed;
        this.codec = codec;
        int configuredNumFetchers = conf.getInt("tez.runtime.shuffle.parallel.copies", 20);
        this.numFetchers = Math.min(configuredNumFetchers, this.numInputs);
        LOG.info("Num fetchers determined to be: " + this.numFetchers);
        this.localDiskFetchEnabled = conf.getBoolean("tez.runtime.optimize.local.fetch", true);
        this.localHostname = inputContext.getExecutionContext().getHostName();
        ByteBuffer shuffleMetadata = inputContext.getServiceProviderMetaData("mapreduce_shuffle");
        this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
        this.referee = new Referee();
        this.shuffledInputsCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.NUM_SHUFFLED_INPUTS);
        this.reduceShuffleBytes = inputContext.getCounters().findCounter((Enum)TaskCounter.SHUFFLE_BYTES);
        this.reduceBytesDecompressed = inputContext.getCounters().findCounter((Enum)TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
        this.failedShuffleCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
        this.bytesShuffledToDisk = inputContext.getCounters().findCounter((Enum)TaskCounter.SHUFFLE_BYTES_TO_DISK);
        this.bytesShuffledToDiskDirect = inputContext.getCounters().findCounter((Enum)TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
        this.bytesShuffledToMem = inputContext.getCounters().findCounter((Enum)TaskCounter.SHUFFLE_BYTES_TO_MEM);
        this.ioErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.IO_ERROR.toString());
        this.wrongLengthErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_LENGTH.toString());
        this.badIdErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.BAD_ID.toString());
        this.wrongMapErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_MAP.toString());
        this.connectionErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.CONNECTION.toString());
        this.wrongReduceErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_REDUCE.toString());
        this.startTime = startTime;
        this.lastProgressTime = startTime;
        this.asyncHttp = conf.getBoolean("tez.runtime.shuffle.use.async.http", false);
        this.httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
        this.shuffleMetrics = new ShuffleClientMetrics(inputContext.getDAGName(), inputContext.getTaskVertexName(), inputContext.getTaskIndex(), this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
        SecretKey jobTokenSecret = ShuffleUtils.getJobTokenSecretFromTokenBytes(inputContext.getServiceConsumerMetaData("mapreduce_shuffle"));
        this.jobTokenSecretManager = new JobTokenSecretManager(jobTokenSecret);
        ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(this.numFetchers, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Fetcher [" + srcNameTrimmed + "] #%d").build());
        this.fetcherExecutor = MoreExecutors.listeningDecorator((ExecutorService)fetcherRawExecutor);
        this.maxFailedUniqueFetches = Math.min(numberOfInputs, 5);
        this.referee.start();
        this.maxFetchFailuresBeforeReporting = conf.getInt("tez.runtime.shuffle.fetch.failures.limit", 5);
        this.reportReadErrorImmediately = conf.getBoolean("tez.runtime.shuffle.notify.readerror", true);
        this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt("tez.runtime.shuffle.fetch.max.task.output.at.once", 20)));
        this.skippedInputCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.NUM_SKIPPED_INPUTS);
        this.firstEventReceived = inputContext.getCounters().findCounter((Enum)TaskCounter.FIRST_EVENT_RECEIVED);
        this.lastEventReceived = inputContext.getCounters().findCounter((Enum)TaskCounter.LAST_EVENT_RECEIVED);
        this.shuffleInfoEventsMap = new HashMap<InputIdentifier, ShuffleEventInfo>();
        LOG.info("ShuffleScheduler running for sourceVertex: " + inputContext.getSourceVertexName() + " with configuration: " + "maxFetchFailuresBeforeReporting=" + this.maxFetchFailuresBeforeReporting + ", reportReadErrorImmediately=" + this.reportReadErrorImmediately + ", maxFailedUniqueFetches=" + this.maxFailedUniqueFetches + ", abortFailureLimit=" + this.abortFailureLimit + ", maxMapRuntime=" + this.maxMapRuntime + ", maxTaskOutputAtOnce=" + this.maxTaskOutputAtOnce);
    }

    public void start() throws Exception {
        ShuffleSchedulerCallable schedulerCallable = new ShuffleSchedulerCallable();
        schedulerCallable.call();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws InterruptedException {
        if (!this.isShutdown.getAndSet(true)) {
            ShuffleScheduler shuffleScheduler = this;
            synchronized (shuffleScheduler) {
                this.notifyAll();
            }
            for (FetcherOrderedGrouped fetcher : this.runningFetchers) {
                fetcher.shutDown();
            }
            this.referee.interrupt();
            this.referee.join();
        }
    }

    protected synchronized void updateEventReceivedTime() {
        long relativeTime = System.currentTimeMillis() - this.startTime;
        if (this.firstEventReceived.getValue() == 0L) {
            this.firstEventReceived.setValue(relativeTime);
            this.lastEventReceived.setValue(relativeTime);
            return;
        }
        this.lastEventReceived.setValue(relativeTime);
    }

    public synchronized void copySucceeded(InputAttemptIdentifier srcAttemptIdentifier, MapHost host, long bytesCompressed, long bytesDecompressed, long millis, MapOutput output) throws IOException {
        if (!this.isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) {
            if (output != null) {
                this.failureCounts.remove(srcAttemptIdentifier);
                if (host != null) {
                    this.hostFailures.remove(host.getHostIdentifier());
                }
                output.commit();
                ShuffleUtils.logIndividualFetchComplete(LOG, millis, bytesCompressed, bytesDecompressed, output.getType().toString(), srcAttemptIdentifier);
                if (output.getType() == MapOutput.Type.DISK) {
                    this.bytesShuffledToDisk.increment(bytesCompressed);
                } else if (output.getType() == MapOutput.Type.DISK_DIRECT) {
                    this.bytesShuffledToDiskDirect.increment(bytesCompressed);
                } else {
                    this.bytesShuffledToMem.increment(bytesCompressed);
                }
                this.shuffledInputsCounter.increment(1L);
            } else {
                this.skippedInputCounter.increment(1L);
            }
            if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
                this.remainingMaps.decrementAndGet();
                this.setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex());
                ++this.numFetchedSpills;
            } else {
                InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
                if (!this.validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) {
                    return;
                }
                ShuffleEventInfo eventInfo = this.shuffleInfoEventsMap.get(inputIdentifier);
                if (eventInfo == null && output == null) {
                    eventInfo = new ShuffleEventInfo(srcAttemptIdentifier);
                    this.shuffleInfoEventsMap.put(inputIdentifier, eventInfo);
                }
                assert (eventInfo != null);
                eventInfo.spillProcessed(srcAttemptIdentifier.getSpillEventId());
                ++this.numFetchedSpills;
                if (srcAttemptIdentifier.getFetchTypeInfo() == InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE) {
                    eventInfo.setFinalEventId(srcAttemptIdentifier.getSpillEventId());
                }
                if (eventInfo.isDone()) {
                    this.remainingMaps.decrementAndGet();
                    this.setInputFinished(inputIdentifier.getInputIndex());
                    this.shuffleInfoEventsMap.remove(inputIdentifier);
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Removing : " + srcAttemptIdentifier + ", pending: " + this.shuffleInfoEventsMap);
                    }
                }
                if (LOG.isTraceEnabled()) {
                    LOG.trace("eventInfo " + eventInfo.toString());
                }
            }
            if (this.remainingMaps.get() == 0) {
                this.notifyAll();
                LOG.info("All inputs fetched for input vertex : " + this.inputContext.getSourceVertexName());
            }
            this.lastProgressTime = System.currentTimeMillis();
            this.totalBytesShuffledTillNow += bytesCompressed;
            this.logProgress();
            this.reduceShuffleBytes.increment(bytesCompressed);
            this.reduceBytesDecompressed.increment(bytesDecompressed);
            if (LOG.isDebugEnabled()) {
                LOG.debug("src task: " + TezRuntimeUtils.getTaskAttemptIdentifier(this.inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier().getInputIndex(), srcAttemptIdentifier.getAttemptNumber()) + " done");
            }
        } else {
            LOG.warn("Duplicate fetch of input no longer needs to be fetched: " + srcAttemptIdentifier);
            if (output != null) {
                output.abort();
            }
        }
    }

    private boolean validateInputAttemptForPipelinedShuffle(InputAttemptIdentifier input) {
        if (input.canRetrieveInputInChunks()) {
            ShuffleEventInfo eventInfo = this.shuffleInfoEventsMap.get(input.getInputIdentifier());
            if (eventInfo != null && input.getAttemptNumber() != eventInfo.attemptNum) {
                this.reportExceptionForInput(new IOException("Previous event already got scheduled for " + input + ". Previous attempt's data could have been already merged " + "to memory/disk outputs.  Failing the fetch early. currentAttemptNum=" + eventInfo.attemptNum + ", eventsProcessed=" + eventInfo.eventsProcessed + ", newAttemptNum=" + input.getAttemptNumber()));
                return false;
            }
            if (eventInfo == null) {
                this.shuffleInfoEventsMap.put(input.getInputIdentifier(), new ShuffleEventInfo(input));
            }
        }
        return true;
    }

    @VisibleForTesting
    void reportExceptionForInput(Exception exception) {
        LOG.error("Reporting exception for input", (Throwable)exception);
        this.shuffle.reportException(exception);
    }

    private void logProgress() {
        double mbs = (double)this.totalBytesShuffledTillNow / 1048576.0;
        int inputsDone = this.numInputs - this.remainingMaps.get();
        long secsSinceStart = (System.currentTimeMillis() - this.startTime) / 1000L + 1L;
        double transferRate = mbs / (double)secsSinceStart;
        LOG.info("copy(" + inputsDone + " (spillsFetched=" + this.numFetchedSpills + ") of " + this.numInputs + ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) " + this.mbpsFormat.format(transferRate) + " MB/s)");
    }

    public synchronized void copyFailed(InputAttemptIdentifier srcAttempt, MapHost host, boolean readError, boolean connectError) {
        host.penalize();
        int failures = 1;
        if (this.failureCounts.containsKey(srcAttempt)) {
            IntWritable x = this.failureCounts.get(srcAttempt);
            x.set(x.get() + 1);
            failures = x.get();
        } else {
            this.failureCounts.put(srcAttempt, new IntWritable(1));
        }
        String hostPort = host.getHostIdentifier();
        if (this.hostFailures.containsKey(hostPort)) {
            IntWritable x = this.hostFailures.get(hostPort);
            x.set(x.get() + 1);
        } else {
            this.hostFailures.put(hostPort, new IntWritable(1));
        }
        if (failures >= this.abortFailureLimit) {
            IOException ioe = new IOException(failures + " failures downloading " + TezRuntimeUtils.getTaskAttemptIdentifier(this.inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(), srcAttempt.getAttemptNumber()));
            ioe.fillInStackTrace();
            this.shuffle.reportException(ioe);
        }
        this.failedShuffleCounter.increment(1L);
        this.checkAndInformAM(failures, srcAttempt, readError, connectError);
        this.checkReducerHealth();
        long delay = (long)(2000.0 * Math.pow(1.3f, failures));
        this.penalties.add(new Penalty(host, delay));
    }

    public void reportLocalError(IOException ioe) {
        LOG.error("Shuffle failed : caused by local error", (Throwable)ioe);
        this.shuffle.reportException(ioe);
    }

    private void checkAndInformAM(int failures, InputAttemptIdentifier srcAttempt, boolean readError, boolean connectError) {
        if (this.reportReadErrorImmediately && (readError || connectError) || failures % this.maxFetchFailuresBeforeReporting == 0) {
            LOG.info("Reporting fetch failure for InputIdentifier: " + srcAttempt + " taskAttemptIdentifier: " + TezRuntimeUtils.getTaskAttemptIdentifier(this.inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(), srcAttempt.getAttemptNumber()) + " to AM.");
            ArrayList failedEvents = Lists.newArrayListWithCapacity((int)1);
            failedEvents.add(InputReadErrorEvent.create((String)("Fetch failure for " + TezRuntimeUtils.getTaskAttemptIdentifier(this.inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(), srcAttempt.getAttemptNumber()) + " to jobtracker."), (int)srcAttempt.getInputIdentifier().getInputIndex(), (int)srcAttempt.getAttemptNumber()));
            this.inputContext.sendEvents((List)failedEvents);
        }
    }

    private void checkReducerHealth() {
        boolean reducerStalled;
        int doneMaps;
        float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
        float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
        float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
        long totalFailures = this.failedShuffleCounter.getValue();
        boolean reducerHealthy = (float)totalFailures / (float)(totalFailures + (long)(doneMaps = this.numInputs - this.remainingMaps.get())) < 0.5f;
        boolean reducerProgressedEnough = (float)doneMaps / (float)this.numInputs >= 0.5f;
        int stallDuration = (int)(System.currentTimeMillis() - this.lastProgressTime);
        int shuffleProgressDuration = (int)(this.lastProgressTime - this.startTime);
        int minShuffleRunDuration = shuffleProgressDuration > this.maxMapRuntime ? shuffleProgressDuration : this.maxMapRuntime;
        boolean bl = reducerStalled = (float)stallDuration / (float)minShuffleRunDuration >= 0.5f;
        if (!(this.failureCounts.size() < this.maxFailedUniqueFetches && this.failureCounts.size() != this.numInputs - doneMaps || reducerHealthy || reducerProgressedEnough && !reducerStalled)) {
            LOG.error("Shuffle failed with too many fetch failures and insufficient progress!failureCounts=" + this.failureCounts.size() + ", pendingInputs=" + (this.numInputs - doneMaps) + ", reducerHealthy=" + reducerHealthy + ", reducerProgressedEnough=" + reducerProgressedEnough + ", reducerStalled=" + reducerStalled);
            String errorMsg = "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.";
            this.shuffle.reportException(new IOException(errorMsg));
        }
    }

    public synchronized void addKnownMapOutput(String inputHostName, int port, int partitionId, String hostUrl, InputAttemptIdentifier srcAttempt) {
        String hostPort = inputHostName + ":" + String.valueOf(port);
        String identifier = MapHost.createIdentifier(hostPort, partitionId);
        MapHost host = this.mapLocations.get(identifier);
        if (host == null) {
            host = new MapHost(partitionId, hostPort, hostUrl);
            assert (identifier.equals(host.getIdentifier()));
            this.mapLocations.put(identifier, host);
        }
        if (!this.validateInputAttemptForPipelinedShuffle(srcAttempt)) {
            return;
        }
        host.addKnownMap(srcAttempt);
        this.pathToIdentifierMap.put(this.getIdentifierFromPathAndReduceId(srcAttempt.getPathComponent(), partitionId), srcAttempt);
        if (host.getState() == MapHost.State.PENDING) {
            this.pendingHosts.add(host);
            this.notifyAll();
        }
    }

    public synchronized void obsoleteInput(InputAttemptIdentifier srcAttempt) {
        LOG.info("Adding obsolete input: " + srcAttempt);
        if (this.shuffleInfoEventsMap.containsKey(srcAttempt.getInputIdentifier())) {
            this.shuffle.reportException(new IOException(srcAttempt + " is marked as obsoleteInput, but it " + "exists in shuffleInfoEventMap. Some data could have been already merged " + "to memory/disk outputs.  Failing the fetch early."));
            return;
        }
        this.obsoleteInputs.add(srcAttempt);
    }

    public synchronized void putBackKnownMapOutput(MapHost host, InputAttemptIdentifier srcAttempt) {
        host.addKnownMap(srcAttempt);
    }

    public synchronized MapHost getHost() throws InterruptedException {
        while (this.pendingHosts.isEmpty() && this.remainingMaps.get() > 0) {
            LOG.info("PendingHosts=" + this.pendingHosts);
            this.wait();
        }
        if (!this.pendingHosts.isEmpty()) {
            MapHost host = null;
            Iterator<MapHost> iter = this.pendingHosts.iterator();
            int numToPick = this.random.nextInt(this.pendingHosts.size());
            for (int i = 0; i <= numToPick; ++i) {
                host = iter.next();
            }
            this.pendingHosts.remove(host);
            host.markBusy();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Assigning " + host + " with " + host.getNumKnownMapOutputs() + " to " + Thread.currentThread().getName());
            }
            this.shuffleStart.set(System.currentTimeMillis());
            return host;
        }
        return null;
    }

    public InputAttemptIdentifier getIdentifierForFetchedOutput(String path, int reduceId) {
        return (InputAttemptIdentifier)this.pathToIdentifierMap.get(this.getIdentifierFromPathAndReduceId(path, reduceId));
    }

    private boolean inputShouldBeConsumed(InputAttemptIdentifier id) {
        return !this.obsoleteInputs.contains(id) && !this.isInputFinished(id.getInputIdentifier().getInputIndex());
    }

    public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) {
        List<InputAttemptIdentifier> origList = host.getAndClearKnownMaps();
        LinkedListMultimap dedupedList = LinkedListMultimap.create();
        for (InputAttemptIdentifier id : origList) {
            if (this.inputShouldBeConsumed(id)) {
                Integer inputNumber = id.getInputIdentifier().getInputIndex();
                List oldIdList = dedupedList.get((Object)inputNumber);
                if (oldIdList == null || oldIdList.isEmpty()) {
                    dedupedList.put((Object)inputNumber, (Object)id);
                    continue;
                }
                boolean addIdentifierToList = false;
                Iterator oldIdIterator = oldIdList.iterator();
                while (oldIdIterator.hasNext()) {
                    InputAttemptIdentifier oldId = (InputAttemptIdentifier)oldIdIterator.next();
                    if (id.canRetrieveInputInChunks()) {
                        if (oldId.getSpillEventId() == id.getSpillEventId()) {
                            addIdentifierToList = false;
                            continue;
                        }
                        if (oldId.getAttemptNumber() == id.getAttemptNumber()) {
                            addIdentifierToList = true;
                            break;
                        }
                    }
                    if (oldId.getAttemptNumber() >= id.getAttemptNumber()) continue;
                    oldIdIterator.remove();
                    LOG.warn("Old Src for InputIndex: " + inputNumber + " with attemptNumber: " + oldId.getAttemptNumber() + " was not determined to be invalid. Ignoring it for now in favour of " + id.getAttemptNumber());
                    addIdentifierToList = true;
                    break;
                }
                if (!addIdentifierToList) continue;
                dedupedList.put((Object)inputNumber, (Object)id);
                continue;
            }
            LOG.info("Ignoring finished or obsolete source: " + id);
        }
        ArrayList<InputAttemptIdentifier> result = new ArrayList<InputAttemptIdentifier>();
        int includedMaps = 0;
        int totalSize = dedupedList.size();
        for (Integer inputIndex : dedupedList.keySet()) {
            List attemptIdentifiers = dedupedList.get((Object)inputIndex);
            for (InputAttemptIdentifier inputAttemptIdentifier : attemptIdentifiers) {
                if (includedMaps++ >= this.maxTaskOutputAtOnce) {
                    host.addKnownMap(inputAttemptIdentifier);
                    continue;
                }
                result.add(inputAttemptIdentifier);
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("assigned " + includedMaps + " of " + totalSize + " to " + host + " to " + Thread.currentThread().getName());
        }
        return result;
    }

    public synchronized void freeHost(MapHost host) {
        if (host.getState() != MapHost.State.PENALIZED && host.markAvailable() == MapHost.State.PENDING) {
            this.pendingHosts.add(host);
            this.notifyAll();
        }
        LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " + (System.currentTimeMillis() - this.shuffleStart.get()) + "ms");
    }

    public synchronized void resetKnownMaps() {
        this.mapLocations.clear();
        this.obsoleteInputs.clear();
        this.pendingHosts.clear();
        this.pathToIdentifierMap.clear();
    }

    public synchronized boolean isDone() {
        return this.remainingMaps.get() == 0;
    }

    private String getIdentifierFromPathAndReduceId(String path, int reduceId) {
        return path + "_" + reduceId;
    }

    public synchronized void informMaxMapRunTime(int duration) {
        if (duration > this.maxMapRuntime) {
            this.maxMapRuntime = duration;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void setInputFinished(int inputIndex) {
        BitSet bitSet = this.finishedMaps;
        synchronized (bitSet) {
            this.finishedMaps.set(inputIndex, true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean isInputFinished(int inputIndex) {
        BitSet bitSet = this.finishedMaps;
        synchronized (bitSet) {
            return this.finishedMaps.get(inputIndex);
        }
    }

    @VisibleForTesting
    FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) {
        return new FetcherOrderedGrouped(this.httpConnectionParams, this, this.allocator, this.shuffleMetrics, this.shuffle, this.jobTokenSecretManager, this.ifileReadAhead, this.ifileReadAheadLength, this.codec, this.conf, this.localDiskFetchEnabled, this.localHostname, this.shufflePort, this.srcNameTrimmed, mapHost, this.ioErrsCounter, this.wrongLengthErrsCounter, this.badIdErrsCounter, this.wrongMapErrsCounter, this.connectionErrsCounter, this.wrongReduceErrsCounter, this.asyncHttp);
    }

    private class FetchFutureCallback
    implements FutureCallback<Void> {
        private final FetcherOrderedGrouped fetcherOrderedGrouped;

        public FetchFutureCallback(FetcherOrderedGrouped fetcherOrderedGrouped) {
            this.fetcherOrderedGrouped = fetcherOrderedGrouped;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doBookKeepingForFetcherComplete() {
            ShuffleScheduler shuffleScheduler = ShuffleScheduler.this;
            synchronized (shuffleScheduler) {
                ShuffleScheduler.this.runningFetchers.remove((Object)this.fetcherOrderedGrouped);
                ShuffleScheduler.this.notifyAll();
            }
        }

        public void onSuccess(Void result) {
            this.fetcherOrderedGrouped.shutDown();
            if (ShuffleScheduler.this.isShutdown.get()) {
                LOG.info("Already shutdown. Ignoring fetch complete");
            } else {
                this.doBookKeepingForFetcherComplete();
            }
        }

        public void onFailure(Throwable t) {
            this.fetcherOrderedGrouped.shutDown();
            if (ShuffleScheduler.this.isShutdown.get()) {
                LOG.info("Already shutdown. Ignoring fetch complete");
            } else {
                LOG.error("Fetcher failed with error", t);
                ShuffleScheduler.this.shuffle.reportException(t);
                this.doBookKeepingForFetcherComplete();
            }
        }
    }

    private class ShuffleSchedulerCallable
    extends CallableWithNdc<Void> {
        private ShuffleSchedulerCallable() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected Void callInternal() throws InterruptedException {
            while (!ShuffleScheduler.this.isShutdown.get() && ShuffleScheduler.this.remainingMaps.get() > 0) {
                ShuffleScheduler shuffleScheduler = ShuffleScheduler.this;
                synchronized (shuffleScheduler) {
                    if ((ShuffleScheduler.this.runningFetchers.size() >= ShuffleScheduler.this.numFetchers || ShuffleScheduler.this.pendingHosts.isEmpty()) && ShuffleScheduler.this.remainingMaps.get() > 0) {
                        try {
                            ShuffleScheduler.this.wait();
                        }
                        catch (InterruptedException e) {
                            if (ShuffleScheduler.this.isShutdown.get()) {
                                LOG.info("Interrupted while waiting for fetchers to complete and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
                                Thread.currentThread().interrupt();
                                break;
                            }
                            throw e;
                        }
                    }
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("NumCompletedInputs: {}" + (ShuffleScheduler.this.numInputs - ShuffleScheduler.this.remainingMaps.get()));
                }
                try {
                    ShuffleScheduler.this.mergeManager.waitForInMemoryMerge();
                    ShuffleScheduler.this.mergeManager.waitForShuffleToMergeMemory();
                }
                catch (InterruptedException e) {
                    if (ShuffleScheduler.this.isShutdown.get()) {
                        LOG.info("Interrupted while waiting for merge to complete and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
                        Thread.currentThread().interrupt();
                        break;
                    }
                    throw e;
                }
                if (ShuffleScheduler.this.isShutdown.get() || ShuffleScheduler.this.remainingMaps.get() <= 0) continue;
                shuffleScheduler = ShuffleScheduler.this;
                synchronized (shuffleScheduler) {
                    int numFetchersToRun = ShuffleScheduler.this.numFetchers - ShuffleScheduler.this.runningFetchers.size();
                    int count = 0;
                    while (count < numFetchersToRun && !ShuffleScheduler.this.isShutdown.get() && ShuffleScheduler.this.remainingMaps.get() > 0) {
                        MapHost mapHost;
                        try {
                            mapHost = ShuffleScheduler.this.getHost();
                        }
                        catch (InterruptedException e) {
                            if (ShuffleScheduler.this.isShutdown.get()) {
                                LOG.info("Interrupted while waiting for host and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
                                Thread.currentThread().interrupt();
                                break;
                            }
                            throw e;
                        }
                        if (mapHost == null) break;
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Processing pending host: " + mapHost.toString());
                        }
                        if (ShuffleScheduler.this.isShutdown.get()) continue;
                        ++count;
                        LOG.info("Scheduling fetch for inputHost: {}", (Object)mapHost.getIdentifier());
                        FetcherOrderedGrouped fetcherOrderedGrouped = ShuffleScheduler.this.constructFetcherForHost(mapHost);
                        ShuffleScheduler.this.runningFetchers.add(fetcherOrderedGrouped);
                        ListenableFuture future = ShuffleScheduler.this.fetcherExecutor.submit((Callable)((Object)fetcherOrderedGrouped));
                        Futures.addCallback((ListenableFuture)future, (FutureCallback)new FetchFutureCallback(fetcherOrderedGrouped));
                    }
                }
            }
            LOG.info("Shutting down FetchScheduler for input: {}, wasInterrupted={}", (Object)ShuffleScheduler.this.srcNameTrimmed, (Object)Thread.currentThread().isInterrupted());
            if (!ShuffleScheduler.this.fetcherExecutor.isShutdown()) {
                ShuffleScheduler.this.fetcherExecutor.shutdownNow();
            }
            return null;
        }
    }

    private class Referee
    extends Thread {
        public Referee() {
            this.setName("ShufflePenaltyReferee [" + TezUtilsInternal.cleanVertexName((String)ShuffleScheduler.this.inputContext.getSourceVertexName()) + "]");
            this.setDaemon(true);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            try {
                while (!ShuffleScheduler.this.isShutdown.get()) {
                    MapHost host = ((Penalty)((ShuffleScheduler)ShuffleScheduler.this).penalties.take()).host;
                    ShuffleScheduler shuffleScheduler = ShuffleScheduler.this;
                    synchronized (shuffleScheduler) {
                        if (host.markAvailable() == MapHost.State.PENDING) {
                            ShuffleScheduler.this.pendingHosts.add(host);
                            ShuffleScheduler.this.notifyAll();
                        }
                    }
                }
                return;
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                return;
            }
            catch (Throwable t) {
                ShuffleScheduler.this.shuffle.reportException(t);
            }
        }
    }

    private static class Penalty
    implements Delayed {
        MapHost host;
        private long endTime;

        Penalty(MapHost host, long delay) {
            this.host = host;
            this.endTime = System.currentTimeMillis() + delay;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            long remainingTime = this.endTime - System.currentTimeMillis();
            return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            long other = ((Penalty)o).endTime;
            return this.endTime == other ? 0 : (this.endTime < other ? -1 : 1);
        }
    }

    static class ShuffleEventInfo {
        BitSet eventsProcessed;
        int finalEventId = -1;
        int attemptNum;
        String id;

        ShuffleEventInfo(InputAttemptIdentifier input) {
            this.id = input.getInputIdentifier().getInputIndex() + "_" + input.getAttemptNumber();
            this.eventsProcessed = new BitSet();
            this.attemptNum = input.getAttemptNumber();
        }

        void spillProcessed(int spillId) {
            if (this.finalEventId != -1) {
                Preconditions.checkState((this.eventsProcessed.cardinality() <= this.finalEventId + 1 ? 1 : 0) != 0, (Object)("Wrong state. eventsProcessed cardinality=" + this.eventsProcessed.cardinality() + " " + "finalEventId=" + this.finalEventId + ", spillId=" + spillId + ", " + this.toString()));
            }
            this.eventsProcessed.set(spillId);
        }

        void setFinalEventId(int spillId) {
            this.finalEventId = spillId;
        }

        boolean isDone() {
            return this.finalEventId != -1 && this.finalEventId + 1 == this.eventsProcessed.cardinality();
        }

        public String toString() {
            return "[eventsProcessed=" + this.eventsProcessed + ", finalEventId=" + this.finalEventId + ", id=" + this.id + ", attemptNum=" + this.attemptNum + "]";
        }
    }

    @VisibleForTesting
    static enum ShuffleErrors {
        IO_ERROR,
        WRONG_LENGTH,
        BAD_ID,
        WRONG_MAP,
        CONNECTION,
        WRONG_REDUCE;

    }
}

