/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle.impl;

import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import javax.crypto.SecretKey;
import javax.net.ssl.HttpsURLConnection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.impl.MapHost;
import org.apache.tez.runtime.library.common.shuffle.impl.MapOutput;
import org.apache.tez.runtime.library.common.shuffle.impl.MergeManager;
import org.apache.tez.runtime.library.common.shuffle.impl.Shuffle;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleClientMetrics;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleHeader;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleScheduler;
import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
class Fetcher
extends Thread {
    private static final Log LOG = LogFactory.getLog(Fetcher.class);
    private static final int UNIT_CONNECT_TIMEOUT = 60000;
    private static final String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
    private final TezCounter connectionErrs;
    private final TezCounter ioErrs;
    private final TezCounter wrongLengthErrs;
    private final TezCounter badIdErrs;
    private final TezCounter wrongMapErrs;
    private final TezCounter wrongReduceErrs;
    private final MergeManager merger;
    private final ShuffleScheduler scheduler;
    private final ShuffleClientMetrics metrics;
    private final Shuffle shuffle;
    private final int id;
    private static int nextId = 0;
    private final int connectionTimeout;
    private final int readTimeout;
    private final CompressionCodec codec;
    private final Decompressor decompressor;
    private final SecretKey jobTokenSecret;
    private volatile boolean stopped = false;
    private Configuration job;
    private final boolean ifileReadAhead;
    private final int ifileReadAheadLength;
    private static boolean sslShuffle;
    private static SSLFactory sslFactory;
    private LinkedHashSet<InputAttemptIdentifier> remaining;
    private static InputAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Fetcher(Configuration job, ShuffleScheduler scheduler, MergeManager merger, ShuffleClientMetrics metrics, Shuffle shuffle, SecretKey jobTokenSecret, boolean ifileReadAhead, int ifileReadAheadLength, CompressionCodec codec, TezInputContext inputContext) throws IOException {
        this.job = job;
        this.scheduler = scheduler;
        this.merger = merger;
        this.metrics = metrics;
        this.shuffle = shuffle;
        this.id = ++nextId;
        this.jobTokenSecret = jobTokenSecret;
        this.ioErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.IO_ERROR.toString());
        this.wrongLengthErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_LENGTH.toString());
        this.badIdErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.BAD_ID.toString());
        this.wrongMapErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_MAP.toString());
        this.connectionErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.CONNECTION.toString());
        this.wrongReduceErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_REDUCE.toString());
        this.ifileReadAhead = ifileReadAhead;
        this.ifileReadAheadLength = ifileReadAheadLength;
        if (codec != null) {
            this.codec = codec;
            this.decompressor = CodecPool.getDecompressor((CompressionCodec)codec);
        } else {
            this.codec = null;
            this.decompressor = null;
        }
        this.connectionTimeout = job.getInt("tez.runtime.shuffle.connect.timeout", 180000);
        this.readTimeout = job.getInt("tez.runtime.shuffle.read.timeout", 180000);
        this.setName("fetcher#" + this.id);
        this.setDaemon(true);
        Class<Fetcher> clazz = Fetcher.class;
        synchronized (Fetcher.class) {
            sslShuffle = job.getBoolean("tez.runtime.shuffle.ssl.enable", false);
            if (sslShuffle && sslFactory == null) {
                sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, job);
                try {
                    sslFactory.init();
                }
                catch (Exception ex) {
                    sslFactory.destroy();
                    throw new RuntimeException(ex);
                }
            }
            // ** MonitorExit[var11_11] (shouldn't be in output)
            return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void run() {
        try {
            while (!this.stopped && !Thread.currentThread().isInterrupted()) {
                this.remaining = null;
                MapHost host = null;
                try {
                    this.merger.waitForInMemoryMerge();
                    host = this.scheduler.getHost();
                    this.metrics.threadBusy();
                    this.copyFromHost(host);
                }
                finally {
                    if (host == null) continue;
                    this.scheduler.freeHost(host);
                    this.metrics.threadFree();
                }
            }
            return;
        }
        catch (InterruptedException ie) {
            return;
        }
        catch (Throwable t) {
            this.shuffle.reportException(t);
        }
    }

    public void shutDown() throws InterruptedException {
        this.stopped = true;
        this.interrupt();
        try {
            this.join(5000L);
        }
        catch (InterruptedException ie) {
            LOG.warn((Object)("Got interrupt while joining " + this.getName()), (Throwable)ie);
        }
        if (sslFactory != null) {
            sslFactory.destroy();
        }
    }

    @VisibleForTesting
    protected HttpURLConnection openConnection(URL url) throws IOException {
        HttpURLConnection conn = (HttpURLConnection)url.openConnection();
        if (sslShuffle) {
            HttpsURLConnection httpsConn = (HttpsURLConnection)conn;
            try {
                httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());
            }
            catch (GeneralSecurityException ex) {
                throw new IOException(ex);
            }
            httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
        }
        return conn;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    protected void copyFromHost(MapHost host) throws IOException {
        DataInputStream input;
        List<InputAttemptIdentifier> srcAttempts = this.scheduler.getMapsForHost(host);
        if (srcAttempts.size() == 0) {
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Fetcher " + this.id + " going to fetch from " + host + " for: " + srcAttempts));
        }
        this.remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
        boolean connectSucceeded = false;
        try {
            URL url = this.getMapOutputURL(host, srcAttempts);
            HttpURLConnection connection = this.openConnection(url);
            String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
            String encHash = SecureShuffleUtils.hashFromString(msgToEncode, this.jobTokenSecret);
            connection.addRequestProperty("UrlHash", encHash);
            connection.setReadTimeout(this.readTimeout);
            connection.addRequestProperty("name", "mapreduce");
            connection.addRequestProperty("version", "1.0.0");
            this.connect(connection, this.connectionTimeout);
            connectSucceeded = true;
            input = new DataInputStream(connection.getInputStream());
            int rc = connection.getResponseCode();
            if (rc != 200) {
                throw new IOException("Got invalid response code " + rc + " from " + url + ": " + connection.getResponseMessage());
            }
            if (!"mapreduce".equals(connection.getHeaderField("name")) || !"1.0.0".equals(connection.getHeaderField("version"))) {
                throw new IOException("Incompatible shuffle response version");
            }
            String replyHash = connection.getHeaderField("ReplyHash");
            if (replyHash == null) {
                throw new IOException("security validation of TT Map output failed");
            }
            LOG.debug((Object)("url=" + msgToEncode + ";encHash=" + encHash + ";replyHash=" + replyHash));
            SecureShuffleUtils.verifyReply(replyHash, encHash, this.jobTokenSecret);
            LOG.info((Object)("for url=" + msgToEncode + " sent hash and receievd reply"));
        }
        catch (IOException ie) {
            this.ioErrs.increment(1L);
            LOG.warn((Object)("Failed to connect to " + host + " with " + this.remaining.size() + " map outputs"), (Throwable)ie);
            if (!connectSucceeded) {
                for (InputAttemptIdentifier left : this.remaining) {
                    this.scheduler.copyFailed(left, host, connectSucceeded);
                }
            } else {
                InputAttemptIdentifier firstMap = srcAttempts.get(0);
                this.scheduler.copyFailed(firstMap, host, connectSucceeded);
            }
            for (InputAttemptIdentifier left : this.remaining) {
                this.scheduler.putBackKnownMapOutput(host, left);
            }
            return;
        }
        try {
            Object[] failedTasks = null;
            while (!this.remaining.isEmpty() && failedTasks == null) {
                failedTasks = this.copyMapOutput(host, input);
            }
            if (failedTasks != null && failedTasks.length > 0) {
                LOG.warn((Object)("copyMapOutput failed for tasks " + Arrays.toString(failedTasks)));
                for (Object left : failedTasks) {
                    this.scheduler.copyFailed((InputAttemptIdentifier)left, host, true);
                }
            }
            IOUtils.cleanup((Log)LOG, (Closeable[])new Closeable[]{input});
            if (failedTasks == null && !this.remaining.isEmpty()) {
                throw new IOException("server didn't return all expected map outputs: " + this.remaining.size() + " left.");
            }
        }
        finally {
            for (InputAttemptIdentifier left : this.remaining) {
                this.scheduler.putBackKnownMapOutput(host, left);
            }
        }
    }

    private InputAttemptIdentifier[] copyMapOutput(MapHost host, DataInputStream input) {
        MapOutput mapOutput = null;
        InputAttemptIdentifier srcAttemptId = null;
        long decompressedLength = -1L;
        long compressedLength = -1L;
        try {
            long startTime = System.currentTimeMillis();
            int forReduce = -1;
            try {
                ShuffleHeader header = new ShuffleHeader();
                header.readFields(input);
                srcAttemptId = this.scheduler.getIdentifierForFetchedOutput(header.mapId, header.forReduce);
                compressedLength = header.compressedLength;
                decompressedLength = header.uncompressedLength;
                forReduce = header.forReduce;
            }
            catch (IllegalArgumentException e) {
                this.badIdErrs.increment(1L);
                LOG.warn((Object)"Invalid map id ", (Throwable)e);
                return this.remaining.toArray(new InputAttemptIdentifier[this.remaining.size()]);
            }
            if (!this.verifySanity(compressedLength, decompressedLength, forReduce, this.remaining, srcAttemptId)) {
                if (srcAttemptId == null) {
                    LOG.warn((Object)("Was expecting " + this.getNextRemainingAttempt() + " but got null"));
                    srcAttemptId = this.getNextRemainingAttempt();
                }
                assert (srcAttemptId != null);
                return new InputAttemptIdentifier[]{srcAttemptId};
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("header: " + srcAttemptId + ", len: " + compressedLength + ", decomp len: " + decompressedLength));
            }
            if ((mapOutput = this.merger.reserve(srcAttemptId, decompressedLength, this.id)).getType() == MapOutput.Type.WAIT) {
                LOG.info((Object)("fetcher#" + this.id + " - MergerManager returned Status.WAIT ..."));
                return EMPTY_ATTEMPT_ID_ARRAY;
            }
            LOG.info((Object)("fetcher#" + this.id + " about to shuffle output of map " + mapOutput.getAttemptIdentifier() + " decomp: " + decompressedLength + " len: " + compressedLength + " to " + (Object)((Object)mapOutput.getType())));
            if (mapOutput.getType() == MapOutput.Type.MEMORY) {
                this.shuffleToMemory(host, mapOutput, input, (int)decompressedLength, (int)compressedLength);
            } else {
                this.shuffleToDisk(host, mapOutput, input, compressedLength);
            }
            long endTime = System.currentTimeMillis();
            this.scheduler.copySucceeded(srcAttemptId, host, compressedLength, endTime - startTime, mapOutput);
            this.remaining.remove(srcAttemptId);
            this.metrics.successFetch();
            return null;
        }
        catch (IOException ioe) {
            this.ioErrs.increment(1L);
            if (srcAttemptId == null || mapOutput == null) {
                LOG.info((Object)("fetcher#" + this.id + " failed to read map header" + srcAttemptId + " decomp: " + decompressedLength + ", " + compressedLength), (Throwable)ioe);
                if (srcAttemptId == null) {
                    return this.remaining.toArray(new InputAttemptIdentifier[this.remaining.size()]);
                }
                return new InputAttemptIdentifier[]{srcAttemptId};
            }
            LOG.warn((Object)("Failed to shuffle output of " + srcAttemptId + " from " + host.getHostName()), (Throwable)ioe);
            mapOutput.abort();
            this.metrics.failedFetch();
            return new InputAttemptIdentifier[]{srcAttemptId};
        }
    }

    private boolean verifySanity(long compressedLength, long decompressedLength, int forReduce, Set<InputAttemptIdentifier> remaining, InputAttemptIdentifier srcAttemptId) {
        if (compressedLength < 0L || decompressedLength < 0L) {
            this.wrongLengthErrs.increment(1L);
            LOG.warn((Object)(this.getName() + " invalid lengths in map output header: id: " + srcAttemptId + " len: " + compressedLength + ", decomp len: " + decompressedLength));
            return false;
        }
        if (!remaining.contains(srcAttemptId)) {
            this.wrongMapErrs.increment(1L);
            LOG.warn((Object)("Invalid map-output! Received output for " + srcAttemptId));
            return false;
        }
        return true;
    }

    private InputAttemptIdentifier getNextRemainingAttempt() {
        if (this.remaining.size() > 0) {
            return (InputAttemptIdentifier)this.remaining.iterator().next();
        }
        return null;
    }

    private URL getMapOutputURL(MapHost host, List<InputAttemptIdentifier> srcAttempts) throws MalformedURLException {
        StringBuffer url = new StringBuffer(host.getBaseUrl());
        boolean first = true;
        for (InputAttemptIdentifier mapId : srcAttempts) {
            if (!first) {
                url.append(",");
            }
            url.append(mapId.getPathComponent());
            first = false;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("MapOutput URL for " + host + " -> " + url.toString()));
        }
        return new URL(url.toString());
    }

    private void connect(URLConnection connection, int connectionTimeout) throws IOException {
        int unit = 0;
        if (connectionTimeout < 0) {
            throw new IOException("Invalid timeout [timeout = " + connectionTimeout + " ms]");
        }
        if (connectionTimeout > 0) {
            unit = Math.min(60000, connectionTimeout);
        }
        connection.setConnectTimeout(unit);
        while (true) {
            try {
                connection.connect();
            }
            catch (IOException ioe) {
                if ((connectionTimeout -= unit) == 0) {
                    throw ioe;
                }
                if (connectionTimeout >= unit) continue;
                unit = connectionTimeout;
                connection.setConnectTimeout(unit);
                continue;
            }
            break;
        }
    }

    private void shuffleToMemory(MapHost host, MapOutput mapOutput, InputStream input, int decompressedLength, int compressedLength) throws IOException {
        IFileInputStream checksumIn = new IFileInputStream(input, compressedLength, this.ifileReadAhead, this.ifileReadAheadLength);
        input = checksumIn;
        if (this.codec != null) {
            this.decompressor.reset();
            input = this.codec.createInputStream(input, this.decompressor);
        }
        byte[] shuffleData = mapOutput.getMemory();
        try {
            IOUtils.readFully((InputStream)input, (byte[])shuffleData, (int)0, (int)shuffleData.length);
            this.metrics.inputBytes(shuffleData.length);
            LOG.info((Object)("Read " + shuffleData.length + " bytes from map-output for " + mapOutput.getAttemptIdentifier()));
        }
        catch (IOException ioe) {
            IOUtils.cleanup((Log)LOG, (Closeable[])new Closeable[]{input});
            throw ioe;
        }
    }

    private void shuffleToDisk(MapHost host, MapOutput mapOutput, InputStream input, long compressedLength) throws IOException {
        long bytesLeft;
        OutputStream output = mapOutput.getDisk();
        try {
            int n;
            int BYTES_TO_READ = 65536;
            byte[] buf = new byte[65536];
            for (bytesLeft = compressedLength; bytesLeft > 0L; bytesLeft -= (long)n) {
                n = input.read(buf, 0, (int)Math.min(bytesLeft, 65536L));
                if (n < 0) {
                    throw new IOException("read past end of stream reading " + mapOutput.getAttemptIdentifier());
                }
                output.write(buf, 0, n);
                this.metrics.inputBytes(n);
            }
            LOG.info((Object)("Read " + (compressedLength - bytesLeft) + " bytes from map-output for " + mapOutput.getAttemptIdentifier()));
            output.close();
        }
        catch (IOException ioe) {
            IOUtils.cleanup((Log)LOG, (Closeable[])new Closeable[]{input, output});
            throw ioe;
        }
        if (bytesLeft != 0L) {
            throw new IOException("Incomplete map output received for " + mapOutput.getAttemptIdentifier() + " from " + host.getHostName() + " (" + bytesLeft + " bytes missing of " + compressedLength + ")");
        }
    }

    static {
        EMPTY_ATTEMPT_ID_ARRAY = new InputAttemptIdentifier[0];
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private static enum ShuffleErrors {
        IO_ERROR,
        WRONG_LENGTH,
        BAD_ID,
        WRONG_MAP,
        CONNECTION,
        WRONG_REDUCE;

    }
}

