/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.broadcast.input;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.library.broadcast.input.BroadcastInputManager;
import org.apache.tez.runtime.library.broadcast.input.BroadcastKVReader;
import org.apache.tez.runtime.library.broadcast.input.BroadcastShuffleInputEventHandler;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.InputIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.shuffle.common.FetchResult;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
import org.apache.tez.runtime.library.shuffle.common.Fetcher;
import org.apache.tez.runtime.library.shuffle.common.FetcherCallback;
import org.apache.tez.runtime.library.shuffle.common.InputHost;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class BroadcastShuffleManager
implements FetcherCallback {
    private static final Log LOG = LogFactory.getLog(BroadcastShuffleManager.class);
    private TezInputContext inputContext;
    private int numInputs;
    private Configuration conf;
    private final BroadcastShuffleInputEventHandler inputEventHandler;
    private final FetchedInputAllocator inputManager;
    private final ExecutorService fetcherRawExecutor;
    private final ListeningExecutorService fetcherExecutor;
    private final ExecutorService schedulerRawExecutor;
    private final ListeningExecutorService schedulerExecutor;
    private final BlockingQueue<FetchedInput> completedInputs;
    private final Set<InputIdentifier> completedInputSet;
    private final ConcurrentMap<String, InputHost> knownSrcHosts;
    private final BlockingQueue<InputHost> pendingHosts;
    private final Set<InputAttemptIdentifier> obsoletedInputs;
    private final AtomicInteger numCompletedInputs = new AtomicInteger(0);
    private final long startTime;
    private long lastProgressTime;
    private ReentrantLock lock = new ReentrantLock();
    private Condition wakeLoop = this.lock.newCondition();
    private final int numFetchers;
    private final AtomicInteger numRunningFetchers = new AtomicInteger(0);
    private final SecretKey shuffleSecret;
    private final int connectionTimeout;
    private final int readTimeout;
    private final CompressionCodec codec;
    private final boolean ifileReadAhead;
    private final int ifileReadAheadLength;
    private final int ifileBufferSize;
    private final FetchFutureCallback fetchFutureCallback = new FetchFutureCallback();
    private volatile Throwable shuffleError;

    public BroadcastShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
        this.inputContext = inputContext;
        this.conf = conf;
        this.numInputs = numInputs;
        if (ConfigUtils.isIntermediateInputCompressed(conf)) {
            Class<? extends CompressionCodec> codecClass = ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
            this.codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, (Configuration)conf);
        } else {
            this.codec = null;
        }
        this.ifileReadAhead = conf.getBoolean("tez.runtime.ifile.readahead", true);
        this.ifileReadAheadLength = this.ifileReadAhead ? conf.getInt("tez.runtime.ifile.readahead.bytes", 0x400000) : 0;
        this.ifileBufferSize = conf.getInt("io.file.buffer.size", -1);
        this.inputManager = new BroadcastInputManager(inputContext.getUniqueIdentifier(), conf);
        this.inputEventHandler = new BroadcastShuffleInputEventHandler(inputContext, this, this.inputManager, this.codec, this.ifileReadAhead, this.ifileReadAheadLength);
        this.completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap(numInputs));
        this.completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
        this.knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
        this.pendingHosts = new LinkedBlockingQueue<InputHost>();
        this.obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap());
        int maxConfiguredFetchers = conf.getInt("tez.runtime.shuffle.parallel.copies", 20);
        this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
        this.fetcherRawExecutor = Executors.newFixedThreadPool(this.numFetchers, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Fetcher [" + inputContext.getUniqueIdentifier() + "] #%d").build());
        this.fetcherExecutor = MoreExecutors.listeningDecorator((ExecutorService)this.fetcherRawExecutor);
        this.schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ShuffleRunner [" + inputContext.getUniqueIdentifier() + "]").build());
        this.schedulerExecutor = MoreExecutors.listeningDecorator((ExecutorService)this.schedulerRawExecutor);
        this.lastProgressTime = this.startTime = System.currentTimeMillis();
        this.shuffleSecret = ShuffleUtils.getJobTokenSecretFromTokenBytes(inputContext.getServiceConsumerMetaData("mapreduce_shuffle"));
        this.connectionTimeout = conf.getInt("tez.runtime.shuffle.connect.timeout", 180000);
        this.readTimeout = conf.getInt("tez.runtime.shuffle.read.timeout", 180000);
        LOG.info((Object)("BroadcastShuffleManager -> numInputs: " + numInputs + " compressionCodec: " + (this.codec == null ? "NoCompressionCodec" : this.codec.getClass().getName()) + ", numFetchers: " + this.numFetchers));
    }

    public void run() {
        RunBroadcastShuffleCallable callable = new RunBroadcastShuffleCallable();
        ListenableFuture runShuffleFuture = this.schedulerExecutor.submit((Callable)callable);
        Futures.addCallback((ListenableFuture)runShuffleFuture, (FutureCallback)new SchedulerFutureCallback());
        this.schedulerExecutor.shutdown();
    }

    private Fetcher constructFetcherForHost(InputHost inputHost) {
        Fetcher.FetcherBuilder fetcherBuilder = new Fetcher.FetcherBuilder(this, this.inputManager, this.inputContext.getApplicationId(), this.shuffleSecret, this.conf);
        fetcherBuilder.setConnectionParameters(this.connectionTimeout, this.readTimeout);
        if (this.codec != null) {
            fetcherBuilder.setCompressionParameters(this.codec);
        }
        fetcherBuilder.setIFileParams(this.ifileReadAhead, this.ifileReadAheadLength);
        List<InputAttemptIdentifier> pendingInputsForHost = inputHost.clearAndGetPendingInputs();
        Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost.iterator();
        while (inputIter.hasNext()) {
            InputAttemptIdentifier input = inputIter.next();
            if (this.completedInputSet.contains(input.getInputIdentifier())) {
                inputIter.remove();
            }
            if (!this.obsoletedInputs.contains(input)) continue;
            inputIter.remove();
            this.obsoletedInputs.remove(input);
        }
        fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), 0, pendingInputsForHost);
        LOG.info((Object)("Created Fetcher for host: " + inputHost.getHost() + ", with inputs: " + pendingInputsForHost));
        return fetcherBuilder.build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addKnownInput(String hostName, int port, InputAttemptIdentifier srcAttemptIdentifier, int partition) {
        InputHost old;
        InputHost host = (InputHost)this.knownSrcHosts.get(hostName);
        if (host == null && (old = this.knownSrcHosts.putIfAbsent(hostName, host = new InputHost(hostName, port, this.inputContext.getApplicationId()))) != null) {
            host = old;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Adding input: " + srcAttemptIdentifier + ", to host: " + host));
        }
        host.addKnownInput(srcAttemptIdentifier);
        this.lock.lock();
        try {
            boolean added = this.pendingHosts.offer(host);
            if (!added) {
                String errorMessage = "Unable to add host: " + host.getHost() + " to pending queue";
                LOG.error((Object)errorMessage);
                throw new TezUncheckedException(errorMessage);
            }
            this.wakeLoop.signal();
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addCompletedInputWithNoData(InputAttemptIdentifier srcAttemptIdentifier) {
        InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
        LOG.info((Object)("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete."));
        if (!this.completedInputSet.contains(inputIdentifier)) {
            Set<InputIdentifier> set = this.completedInputSet;
            synchronized (set) {
                if (!this.completedInputSet.contains(inputIdentifier)) {
                    this.registerCompletedInput(new NullFetchedInput(srcAttemptIdentifier));
                }
            }
        }
        this.lock.lock();
        try {
            this.wakeLoop.signal();
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addCompletedInputWithData(InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput) throws IOException {
        InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
        LOG.info((Object)("Received Data via Event: " + srcAttemptIdentifier + " to " + (Object)((Object)fetchedInput.getType())));
        this.lock.lock();
        try {
            this.lastProgressTime = System.currentTimeMillis();
        }
        finally {
            this.lock.unlock();
        }
        boolean committed = false;
        if (!this.completedInputSet.contains(inputIdentifier)) {
            Set<InputIdentifier> set = this.completedInputSet;
            synchronized (set) {
                if (!this.completedInputSet.contains(inputIdentifier)) {
                    fetchedInput.commit();
                    committed = true;
                    this.registerCompletedInput(fetchedInput);
                }
            }
        }
        if (!committed) {
            fetchedInput.abort();
        } else {
            this.lock.lock();
            try {
                this.wakeLoop.signal();
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    public synchronized void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) {
        this.obsoletedInputs.add(srcAttemptIdentifier);
    }

    public void handleEvents(List<Event> events) throws IOException {
        this.inputEventHandler.handleEvents(events);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void fetchSucceeded(String host, InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput, long fetchedBytes, long copyDuration) throws IOException {
        InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
        LOG.info((Object)("Completed fetch for attempt: " + srcAttemptIdentifier + " to " + (Object)((Object)fetchedInput.getType())));
        this.lock.lock();
        try {
            this.lastProgressTime = System.currentTimeMillis();
        }
        finally {
            this.lock.unlock();
        }
        boolean committed = false;
        if (!this.completedInputSet.contains(inputIdentifier)) {
            Set<InputIdentifier> set = this.completedInputSet;
            synchronized (set) {
                if (!this.completedInputSet.contains(inputIdentifier)) {
                    fetchedInput.commit();
                    committed = true;
                    this.registerCompletedInput(fetchedInput);
                }
            }
        }
        if (!committed) {
            fetchedInput.abort();
        } else {
            this.lock.lock();
            try {
                this.wakeLoop.signal();
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    @Override
    public void fetchFailed(String host, InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
        LOG.info((Object)("Fetch failed for src: " + srcAttemptIdentifier + "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: " + connectFailed));
        if (srcAttemptIdentifier == null) {
            String message = "Received fetchFailure for an unknown src (null)";
            LOG.fatal((Object)message);
            this.inputContext.fatalError(null, message);
        } else {
            InputReadErrorEvent readError = new InputReadErrorEvent("Fetch failure while fetching from " + TezRuntimeUtils.getTaskAttemptIdentifier(this.inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(), srcAttemptIdentifier.getAttemptNumber()), srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(), srcAttemptIdentifier.getAttemptNumber());
            ArrayList failedEvents = Lists.newArrayListWithCapacity((int)1);
            failedEvents.add(readError);
            this.inputContext.sendEvents((List)failedEvents);
        }
    }

    public void shutdown() throws InterruptedException {
        if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
            this.fetcherExecutor.shutdown();
            this.fetcherExecutor.awaitTermination(2000L, TimeUnit.MILLISECONDS);
            if (!this.fetcherExecutor.isShutdown()) {
                this.fetcherExecutor.shutdownNow();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void registerCompletedInput(FetchedInput fetchedInput) {
        this.lock.lock();
        try {
            this.completedInputSet.add(fetchedInput.getInputAttemptIdentifier().getInputIdentifier());
            this.completedInputs.add(fetchedInput);
            this.numCompletedInputs.incrementAndGet();
        }
        finally {
            this.lock.unlock();
        }
    }

    public boolean newInputAvailable() {
        FetchedInput head = (FetchedInput)this.completedInputs.peek();
        return head != null && !(head instanceof NullFetchedInput);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean allInputsFetched() {
        this.lock.lock();
        try {
            boolean bl = this.numCompletedInputs.get() == this.numInputs;
            return bl;
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FetchedInput getNextInput() throws InterruptedException {
        FetchedInput input = null;
        do {
            this.lock.lock();
            try {
                input = (FetchedInput)this.completedInputs.peek();
                if (input != null || !this.allInputsFetched()) continue;
                break;
            }
            finally {
                this.lock.unlock();
            }
        } while ((input = this.completedInputs.take()) instanceof NullFetchedInput);
        return input;
    }

    public BroadcastKVReader createReader() throws IOException {
        return new BroadcastKVReader(this, this.conf, this.codec, this.ifileReadAhead, this.ifileReadAheadLength, this.ifileBufferSize);
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private class FetchFutureCallback
    implements FutureCallback<FetchResult> {
        private FetchFutureCallback() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doBookKeepingForFetcherComplete() {
            BroadcastShuffleManager.this.numRunningFetchers.decrementAndGet();
            BroadcastShuffleManager.this.lock.lock();
            try {
                BroadcastShuffleManager.this.wakeLoop.signal();
            }
            finally {
                BroadcastShuffleManager.this.lock.unlock();
            }
        }

        public void onSuccess(FetchResult result) {
            Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
            if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
                InputHost inputHost = (InputHost)BroadcastShuffleManager.this.knownSrcHosts.get(result.getHost());
                assert (inputHost != null);
                for (InputAttemptIdentifier input : pendingInputs) {
                    inputHost.addKnownInput(input);
                }
                BroadcastShuffleManager.this.pendingHosts.add(inputHost);
            }
            this.doBookKeepingForFetcherComplete();
        }

        public void onFailure(Throwable t) {
            LOG.error((Object)"Fetcher failed with error: ", t);
            BroadcastShuffleManager.this.shuffleError = t;
            BroadcastShuffleManager.this.inputContext.fatalError(t, "Fetch failed");
            this.doBookKeepingForFetcherComplete();
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private class SchedulerFutureCallback
    implements FutureCallback<Void> {
        private SchedulerFutureCallback() {
        }

        public void onSuccess(Void result) {
            LOG.info((Object)"Scheduler thread completed");
        }

        public void onFailure(Throwable t) {
            LOG.error((Object)"Scheduler failed with error: ", t);
            BroadcastShuffleManager.this.inputContext.fatalError(t, "Broadcast Scheduler Failed");
        }
    }

    private class NullFetchedInput
    extends FetchedInput {
        public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
            super(FetchedInput.Type.MEMORY, -1L, -1L, inputAttemptIdentifier, null);
        }

        public OutputStream getOutputStream() throws IOException {
            throw new UnsupportedOperationException("Not supported for NullFetchedInput");
        }

        public InputStream getInputStream() throws IOException {
            throw new UnsupportedOperationException("Not supported for NullFetchedInput");
        }

        public void commit() throws IOException {
            throw new UnsupportedOperationException("Not supported for NullFetchedInput");
        }

        public void abort() throws IOException {
            throw new UnsupportedOperationException("Not supported for NullFetchedInput");
        }

        public void free() {
            throw new UnsupportedOperationException("Not supported for NullFetchedInput");
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private class RunBroadcastShuffleCallable
    implements Callable<Void> {
        private RunBroadcastShuffleCallable() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws Exception {
            block6: while (BroadcastShuffleManager.this.numCompletedInputs.get() < BroadcastShuffleManager.this.numInputs) {
                BroadcastShuffleManager.this.lock.lock();
                try {
                    if ((BroadcastShuffleManager.this.numRunningFetchers.get() >= BroadcastShuffleManager.this.numFetchers || BroadcastShuffleManager.this.pendingHosts.size() == 0) && BroadcastShuffleManager.this.numCompletedInputs.get() < BroadcastShuffleManager.this.numInputs) {
                        BroadcastShuffleManager.this.wakeLoop.await();
                    }
                }
                finally {
                    BroadcastShuffleManager.this.lock.unlock();
                }
                if (BroadcastShuffleManager.this.shuffleError != null) break;
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("NumCompletedInputs: " + BroadcastShuffleManager.this.numCompletedInputs));
                }
                if (BroadcastShuffleManager.this.numCompletedInputs.get() >= BroadcastShuffleManager.this.numInputs) continue;
                BroadcastShuffleManager.this.lock.lock();
                try {
                    int maxFetchersToRun = BroadcastShuffleManager.this.numFetchers - BroadcastShuffleManager.this.numRunningFetchers.get();
                    int count = 0;
                    while (BroadcastShuffleManager.this.pendingHosts.peek() != null) {
                        InputHost inputHost = (InputHost)BroadcastShuffleManager.this.pendingHosts.take();
                        if (LOG.isDebugEnabled()) {
                            LOG.debug((Object)("Processing pending host: " + inputHost.toDetailedString()));
                        }
                        if (inputHost.getNumPendingInputs() > 0) {
                            LOG.info((Object)("Scheduling fetch for inputHost: " + inputHost.getHost()));
                            Fetcher fetcher = BroadcastShuffleManager.this.constructFetcherForHost(inputHost);
                            BroadcastShuffleManager.this.numRunningFetchers.incrementAndGet();
                            ListenableFuture future = BroadcastShuffleManager.this.fetcherExecutor.submit((Callable)fetcher);
                            Futures.addCallback((ListenableFuture)future, (FutureCallback)BroadcastShuffleManager.this.fetchFutureCallback);
                            if (++count < maxFetchersToRun) continue;
                            continue block6;
                        }
                        if (!LOG.isDebugEnabled()) continue;
                        LOG.debug((Object)("Skipping host: " + inputHost.getHost() + " since it has no inputs to process"));
                    }
                }
                finally {
                    BroadcastShuffleManager.this.lock.unlock();
                }
            }
            LOG.info((Object)"Shutting down FetchScheduler");
            if (!BroadcastShuffleManager.this.fetcherExecutor.isShutdown()) {
                BroadcastShuffleManager.this.fetcherExecutor.shutdownNow();
            }
            return null;
        }
    }
}

