/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle.impl;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.shuffle.impl.MapHost;
import org.apache.tez.runtime.library.common.shuffle.impl.MapOutput;
import org.apache.tez.runtime.library.common.shuffle.impl.Shuffle;

class ShuffleScheduler {
    static ThreadLocal<Long> shuffleStart = new ThreadLocal<Long>(){

        @Override
        protected Long initialValue() {
            return 0L;
        }
    };
    private static final Log LOG = LogFactory.getLog(ShuffleScheduler.class);
    private static final long INITIAL_PENALTY = 2000L;
    private static final float PENALTY_GROWTH_RATE = 1.3f;
    private boolean[] finishedMaps;
    private final int numInputs;
    private int remainingMaps;
    private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
    private ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, InputAttemptIdentifier>();
    private Set<MapHost> pendingHosts = new HashSet<MapHost>();
    private Set<InputAttemptIdentifier> obsoleteInputs = new HashSet<InputAttemptIdentifier>();
    private final Random random = new Random(System.currentTimeMillis());
    private final DelayQueue<Penalty> penalties = new DelayQueue();
    private final Referee referee;
    private final Map<InputAttemptIdentifier, IntWritable> failureCounts = new HashMap<InputAttemptIdentifier, IntWritable>();
    private final Map<String, IntWritable> hostFailures = new HashMap<String, IntWritable>();
    private final TezInputContext inputContext;
    private final Shuffle shuffle;
    private final TezCounter shuffledInputsCounter;
    private final TezCounter skippedInputCounter;
    private final TezCounter reduceShuffleBytes;
    private final TezCounter reduceBytesDecompressed;
    private final TezCounter failedShuffleCounter;
    private final TezCounter bytesShuffledToDisk;
    private final TezCounter bytesShuffledToMem;
    private final long startTime;
    private long lastProgressTime;
    private int maxTaskOutputAtOnce;
    private int maxFetchFailuresBeforeReporting;
    private boolean reportReadErrorImmediately = true;
    private int maxFailedUniqueFetches = 5;
    private final int abortFailureLimit;
    private int maxMapRuntime = 0;
    private long totalBytesShuffledTillNow = 0L;
    private DecimalFormat mbpsFormat = new DecimalFormat("0.00");

    public ShuffleScheduler(TezInputContext inputContext, Configuration conf, int numberOfInputs, Shuffle shuffle, TezCounter shuffledInputsCounter, TezCounter reduceShuffleBytes, TezCounter reduceBytesDecompressed, TezCounter failedShuffleCounter, TezCounter bytesShuffledToDisk, TezCounter bytesShuffledToMem) {
        this.inputContext = inputContext;
        this.numInputs = numberOfInputs;
        this.abortFailureLimit = Math.max(30, numberOfInputs / 10);
        this.remainingMaps = numberOfInputs;
        this.finishedMaps = new boolean[this.remainingMaps];
        this.referee = new Referee();
        this.shuffle = shuffle;
        this.shuffledInputsCounter = shuffledInputsCounter;
        this.reduceShuffleBytes = reduceShuffleBytes;
        this.reduceBytesDecompressed = reduceBytesDecompressed;
        this.failedShuffleCounter = failedShuffleCounter;
        this.bytesShuffledToDisk = bytesShuffledToDisk;
        this.bytesShuffledToMem = bytesShuffledToMem;
        this.lastProgressTime = this.startTime = System.currentTimeMillis();
        this.maxFailedUniqueFetches = Math.min(numberOfInputs, this.maxFailedUniqueFetches);
        this.referee.start();
        this.maxFetchFailuresBeforeReporting = conf.getInt("tez.runtime.shuffle.fetch.failures.limit", 5);
        this.reportReadErrorImmediately = conf.getBoolean("tez.runtime.shuffle.notify.readerror", true);
        this.maxTaskOutputAtOnce = Math.max(1, conf.getInt("tez.runtime.shuffle.fetch.max.task.output.at.once", 20));
        this.skippedInputCounter = inputContext.getCounters().findCounter((Enum)TaskCounter.NUM_SKIPPED_INPUTS);
        LOG.info((Object)("ShuffleScheduler running for sourceVertex: " + inputContext.getSourceVertexName() + " with configuration: " + "maxFetchFailuresBeforeReporting=" + this.maxFetchFailuresBeforeReporting + ", reportReadErrorImmediately=" + this.reportReadErrorImmediately + ", maxFailedUniqueFetches=" + this.maxFailedUniqueFetches + ", abortFailureLimit=" + this.abortFailureLimit + ", maxMapRuntime=" + this.maxMapRuntime));
    }

    public synchronized void copySucceeded(InputAttemptIdentifier srcAttemptIdentifier, MapHost host, long bytesCompressed, long bytesDecompressed, long milis, MapOutput output) throws IOException {
        this.failureCounts.remove(srcAttemptIdentifier);
        if (host != null) {
            this.hostFailures.remove(host.getHostIdentifier());
        }
        if (!this.isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) {
            if (output != null) {
                output.commit();
                if (output.getType() == MapOutput.Type.DISK) {
                    this.bytesShuffledToDisk.increment(bytesCompressed);
                } else {
                    this.bytesShuffledToMem.increment(bytesCompressed);
                }
                this.shuffledInputsCounter.increment(1L);
            } else {
                this.skippedInputCounter.increment(1L);
            }
            this.setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex());
            if (--this.remainingMaps == 0) {
                LOG.info((Object)("All inputs fetched for input vertex : " + this.inputContext.getSourceVertexName()));
                this.notifyAll();
            }
            this.lastProgressTime = System.currentTimeMillis();
            this.totalBytesShuffledTillNow += bytesCompressed;
            this.logProgress();
            this.reduceShuffleBytes.increment(bytesCompressed);
            this.reduceBytesDecompressed.increment(bytesDecompressed);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("src task: " + TezRuntimeUtils.getTaskAttemptIdentifier(this.inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier().getInputIndex(), srcAttemptIdentifier.getAttemptNumber()) + " done"));
            }
        } else {
            LOG.warn((Object)("Duplicate fetch of input no longer needs to be fetched: " + srcAttemptIdentifier));
            if (output != null) {
                output.abort();
            }
        }
    }

    private void logProgress() {
        float mbs = (float)this.totalBytesShuffledTillNow / 1048576.0f;
        int mapsDone = this.numInputs - this.remainingMaps;
        long secsSinceStart = (System.currentTimeMillis() - this.startTime) / 1000L + 1L;
        float transferRate = mbs / (float)secsSinceStart;
        LOG.info((Object)("copy(" + mapsDone + " of " + this.numInputs + " at " + this.mbpsFormat.format(transferRate) + " MB/s)"));
    }

    public synchronized void copyFailed(InputAttemptIdentifier srcAttempt, MapHost host, boolean readError, boolean connectError) {
        host.penalize();
        int failures = 1;
        if (this.failureCounts.containsKey(srcAttempt)) {
            IntWritable x = this.failureCounts.get(srcAttempt);
            x.set(x.get() + 1);
            failures = x.get();
        } else {
            this.failureCounts.put(srcAttempt, new IntWritable(1));
        }
        String hostPort = host.getHostIdentifier();
        if (this.hostFailures.containsKey(hostPort)) {
            IntWritable x = this.hostFailures.get(hostPort);
            x.set(x.get() + 1);
        } else {
            this.hostFailures.put(hostPort, new IntWritable(1));
        }
        if (failures >= this.abortFailureLimit) {
            IOException ioe = new IOException(failures + " failures downloading " + TezRuntimeUtils.getTaskAttemptIdentifier(this.inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(), srcAttempt.getAttemptNumber()));
            ioe.fillInStackTrace();
            this.shuffle.reportException(ioe);
        }
        this.failedShuffleCounter.increment(1L);
        this.checkAndInformAM(failures, srcAttempt, readError, connectError);
        this.checkReducerHealth();
        long delay = (long)(2000.0 * Math.pow(1.3f, failures));
        this.penalties.add(new Penalty(host, delay));
    }

    public void reportLocalError(IOException ioe) {
        LOG.error((Object)"Shuffle failed : caused by local error", (Throwable)ioe);
        this.shuffle.reportException(ioe);
    }

    private void checkAndInformAM(int failures, InputAttemptIdentifier srcAttempt, boolean readError, boolean connectError) {
        if (this.reportReadErrorImmediately && (readError || connectError) || failures % this.maxFetchFailuresBeforeReporting == 0) {
            LOG.info((Object)("Reporting fetch failure for InputIdentifier: " + srcAttempt + " taskAttemptIdentifier: " + TezRuntimeUtils.getTaskAttemptIdentifier(this.inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(), srcAttempt.getAttemptNumber()) + " to AM."));
            ArrayList failedEvents = Lists.newArrayListWithCapacity((int)1);
            failedEvents.add(new InputReadErrorEvent("Fetch failure for " + TezRuntimeUtils.getTaskAttemptIdentifier(this.inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(), srcAttempt.getAttemptNumber()) + " to jobtracker.", srcAttempt.getInputIdentifier().getInputIndex(), srcAttempt.getAttemptNumber()));
            this.inputContext.sendEvents((List)failedEvents);
        }
    }

    private void checkReducerHealth() {
        boolean reducerStalled;
        int doneMaps;
        float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
        float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
        float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
        long totalFailures = this.failedShuffleCounter.getValue();
        boolean reducerHealthy = (float)totalFailures / (float)(totalFailures + (long)(doneMaps = this.numInputs - this.remainingMaps)) < 0.5f;
        boolean reducerProgressedEnough = (float)doneMaps / (float)this.numInputs >= 0.5f;
        int stallDuration = (int)(System.currentTimeMillis() - this.lastProgressTime);
        int shuffleProgressDuration = (int)(this.lastProgressTime - this.startTime);
        int minShuffleRunDuration = shuffleProgressDuration > this.maxMapRuntime ? shuffleProgressDuration : this.maxMapRuntime;
        boolean bl = reducerStalled = (float)stallDuration / (float)minShuffleRunDuration >= 0.5f;
        if (!(this.failureCounts.size() < this.maxFailedUniqueFetches && this.failureCounts.size() != this.numInputs - doneMaps || reducerHealthy || reducerProgressedEnough && !reducerStalled)) {
            LOG.fatal((Object)("Shuffle failed with too many fetch failures and insufficient progress!failureCounts=" + this.failureCounts.size() + ", pendingInputs=" + (this.numInputs - doneMaps) + ", reducerHealthy=" + reducerHealthy + ", reducerProgressedEnough=" + reducerProgressedEnough + ", reducerStalled=" + reducerStalled));
            String errorMsg = "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.";
            this.shuffle.reportException(new IOException(errorMsg));
        }
    }

    public synchronized void addKnownMapOutput(String inputHostName, int port, int partitionId, String hostUrl, InputAttemptIdentifier srcAttempt) {
        String hostPort = inputHostName + ":" + String.valueOf(port);
        String identifier = MapHost.createIdentifier(hostPort, partitionId);
        MapHost host = this.mapLocations.get(identifier);
        if (host == null) {
            host = new MapHost(partitionId, hostPort, hostUrl);
            assert (identifier.equals(host.getIdentifier()));
            this.mapLocations.put(identifier, host);
        }
        host.addKnownMap(srcAttempt);
        this.pathToIdentifierMap.put(this.getIdentifierFromPathAndReduceId(srcAttempt.getPathComponent(), partitionId), srcAttempt);
        if (host.getState() == MapHost.State.PENDING) {
            this.pendingHosts.add(host);
            this.notifyAll();
        }
    }

    public synchronized void obsoleteInput(InputAttemptIdentifier srcAttempt) {
        LOG.info((Object)("Adding obsolete input: " + srcAttempt));
        this.obsoleteInputs.add(srcAttempt);
    }

    public synchronized void putBackKnownMapOutput(MapHost host, InputAttemptIdentifier srcAttempt) {
        host.addKnownMap(srcAttempt);
    }

    public synchronized MapHost getHost() throws InterruptedException {
        while (this.pendingHosts.isEmpty()) {
            this.wait();
        }
        MapHost host = null;
        Iterator<MapHost> iter = this.pendingHosts.iterator();
        int numToPick = this.random.nextInt(this.pendingHosts.size());
        for (int i = 0; i <= numToPick; ++i) {
            host = iter.next();
        }
        this.pendingHosts.remove(host);
        host.markBusy();
        LOG.info((Object)("Assigning " + host + " with " + host.getNumKnownMapOutputs() + " to " + Thread.currentThread().getName()));
        shuffleStart.set(System.currentTimeMillis());
        return host;
    }

    public InputAttemptIdentifier getIdentifierForFetchedOutput(String path, int reduceId) {
        return (InputAttemptIdentifier)this.pathToIdentifierMap.get(this.getIdentifierFromPathAndReduceId(path, reduceId));
    }

    private boolean inputShouldBeConsumed(InputAttemptIdentifier id) {
        return !this.obsoleteInputs.contains(id) && !this.isInputFinished(id.getInputIdentifier().getInputIndex());
    }

    public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) {
        InputAttemptIdentifier id;
        List<InputAttemptIdentifier> origList = host.getAndClearKnownMaps();
        LinkedHashMap<Integer, InputAttemptIdentifier> dedupedList = new LinkedHashMap<Integer, InputAttemptIdentifier>();
        for (InputAttemptIdentifier id2 : origList) {
            if (this.inputShouldBeConsumed(id2)) {
                Integer inputNumber = new Integer(id2.getInputIdentifier().getInputIndex());
                InputAttemptIdentifier oldId = (InputAttemptIdentifier)dedupedList.get(inputNumber);
                if (oldId != null && oldId.getAttemptNumber() >= id2.getAttemptNumber()) continue;
                dedupedList.put(inputNumber, id2);
                if (oldId == null) continue;
                LOG.warn((Object)("Old Src for InputIndex: " + inputNumber + " with attemptNumber: " + oldId.getAttemptNumber() + " was not determined to be invalid. Ignoring it for now in favour of " + id2.getAttemptNumber()));
                continue;
            }
            LOG.info((Object)("Ignoring finished or obsolete source: " + id2));
        }
        ArrayList<InputAttemptIdentifier> result = new ArrayList<InputAttemptIdentifier>();
        int includedMaps = 0;
        int totalSize = dedupedList.size();
        Iterator dedupedItr = dedupedList.entrySet().iterator();
        while (dedupedItr.hasNext()) {
            id = (InputAttemptIdentifier)dedupedItr.next().getValue();
            result.add(id);
            if (++includedMaps < this.maxTaskOutputAtOnce) continue;
            break;
        }
        while (dedupedItr.hasNext()) {
            id = (InputAttemptIdentifier)dedupedItr.next().getValue();
            host.addKnownMap(id);
        }
        LOG.info((Object)("assigned " + includedMaps + " of " + totalSize + " to " + host + " to " + Thread.currentThread().getName()));
        return result;
    }

    public synchronized void freeHost(MapHost host) {
        if (host.getState() != MapHost.State.PENALIZED && host.markAvailable() == MapHost.State.PENDING) {
            this.pendingHosts.add(host);
            this.notifyAll();
        }
        LOG.info((Object)(host + " freed by " + Thread.currentThread().getName() + " in " + (System.currentTimeMillis() - shuffleStart.get()) + "ms"));
    }

    public synchronized void resetKnownMaps() {
        this.mapLocations.clear();
        this.obsoleteInputs.clear();
        this.pendingHosts.clear();
        this.pathToIdentifierMap.clear();
    }

    public synchronized boolean isDone() {
        return this.remainingMaps == 0;
    }

    public synchronized boolean waitUntilDone(int millis) throws InterruptedException {
        if (this.remainingMaps > 0) {
            this.wait(millis);
            return this.remainingMaps == 0;
        }
        return true;
    }

    private String getIdentifierFromPathAndReduceId(String path, int reduceId) {
        return path + "_" + reduceId;
    }

    public void close() throws InterruptedException {
        this.referee.interrupt();
        this.referee.join();
    }

    public synchronized void informMaxMapRunTime(int duration) {
        if (duration > this.maxMapRuntime) {
            this.maxMapRuntime = duration;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void setInputFinished(int inputIndex) {
        boolean[] blArray = this.finishedMaps;
        synchronized (this.finishedMaps) {
            this.finishedMaps[inputIndex] = true;
            // ** MonitorExit[var2_2] (shouldn't be in output)
            return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean isInputFinished(int inputIndex) {
        boolean[] blArray = this.finishedMaps;
        synchronized (this.finishedMaps) {
            // ** MonitorExit[var2_2] (shouldn't be in output)
            return this.finishedMaps[inputIndex];
        }
    }

    private class Referee
    extends Thread {
        public Referee() {
            this.setName("ShufflePenaltyReferee [" + TezUtils.cleanVertexName((String)ShuffleScheduler.this.inputContext.getSourceVertexName()) + "]");
            this.setDaemon(true);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            try {
                while (true) {
                    MapHost host = ((Penalty)((ShuffleScheduler)ShuffleScheduler.this).penalties.take()).host;
                    ShuffleScheduler shuffleScheduler = ShuffleScheduler.this;
                    synchronized (shuffleScheduler) {
                        if (host.markAvailable() == MapHost.State.PENDING) {
                            ShuffleScheduler.this.pendingHosts.add(host);
                            ShuffleScheduler.this.notifyAll();
                        }
                    }
                }
            }
            catch (InterruptedException ie) {
                return;
            }
            catch (Throwable t) {
                ShuffleScheduler.this.shuffle.reportException(t);
                return;
            }
        }
    }

    private static class Penalty
    implements Delayed {
        MapHost host;
        private long endTime;

        Penalty(MapHost host, long delay) {
            this.host = host;
            this.endTime = System.currentTimeMillis() + delay;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            long remainingTime = this.endTime - System.currentTimeMillis();
            return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            long other = ((Penalty)o).endTime;
            return this.endTime == other ? 0 : (this.endTime < other ? -1 : 1);
        }
    }
}

