/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.shuffle.common;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchResult;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
import org.apache.tez.runtime.library.shuffle.common.FetchedInputCallback;
import org.apache.tez.runtime.library.shuffle.common.FetcherCallback;
import org.apache.tez.runtime.library.shuffle.common.HttpConnection;
import org.apache.tez.runtime.library.shuffle.common.LocalDiskFetchedInput;
import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;

public class Fetcher
implements Callable<FetchResult> {
    private static final Log LOG = LogFactory.getLog(Fetcher.class);
    private static final AtomicInteger fetcherIdGen = new AtomicInteger(0);
    private final Configuration conf;
    private CompressionCodec codec;
    private boolean ifileReadAhead = true;
    private int ifileReadAheadLength = 0x400000;
    private final SecretKey shuffleSecret;
    private final FetcherCallback fetcherCallback;
    private final FetchedInputAllocator inputManager;
    private final ApplicationId appId;
    private final String logIdentifier;
    private final AtomicBoolean isShutDown = new AtomicBoolean(false);
    private final int fetcherIdentifier;
    private List<InputAttemptIdentifier> srcAttempts;
    private String host;
    private int port;
    private int partition;
    private final Map<String, InputAttemptIdentifier> pathToAttemptMap;
    private LinkedHashSet<InputAttemptIdentifier> remaining;
    private URL url;
    private volatile DataInputStream input;
    private HttpConnection httpConnection;
    private HttpConnection.HttpConnectionParams httpConnectionParams;
    private final boolean localDiskFetchEnabled;

    private Fetcher(FetcherCallback fetcherCallback, HttpConnection.HttpConnectionParams params, FetchedInputAllocator inputManager, ApplicationId appId, SecretKey shuffleSecret, String srcNameTrimmed, Configuration conf, boolean localDiskFetchEnabled) {
        this.fetcherCallback = fetcherCallback;
        this.inputManager = inputManager;
        this.shuffleSecret = shuffleSecret;
        this.appId = appId;
        this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>();
        this.httpConnectionParams = params;
        this.conf = conf;
        this.localDiskFetchEnabled = localDiskFetchEnabled;
        this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
        this.logIdentifier = "fetcher [" + srcNameTrimmed + "] " + this.fetcherIdentifier;
    }

    @Override
    public FetchResult call() throws Exception {
        if (this.srcAttempts.size() == 0) {
            return new FetchResult(this.host, this.port, this.partition, this.srcAttempts);
        }
        for (InputAttemptIdentifier in : this.srcAttempts) {
            this.pathToAttemptMap.put(in.getPathComponent(), in);
        }
        this.remaining = new LinkedHashSet<InputAttemptIdentifier>(this.srcAttempts);
        HostFetchResult hostFetchResult = this.localDiskFetchEnabled && this.host.equals(System.getenv(ApplicationConstants.Environment.NM_HOST.toString())) ? this.setupLocalDiskFetch() : this.doHttpFetch();
        if (hostFetchResult.failedInputs != null && hostFetchResult.failedInputs.length > 0) {
            LOG.warn((Object)("copyInputs failed for tasks " + Arrays.toString(hostFetchResult.failedInputs)));
            for (InputAttemptIdentifier left : hostFetchResult.failedInputs) {
                this.fetcherCallback.fetchFailed(this.host, left, hostFetchResult.connectFailed);
            }
        }
        this.shutdown();
        if (hostFetchResult.failedInputs == null && !this.remaining.isEmpty()) {
            throw new IOException("server didn't return all expected map outputs: " + this.remaining.size() + " left.");
        }
        return hostFetchResult.fetchResult;
    }

    @VisibleForTesting
    protected HostFetchResult doHttpFetch() {
        try {
            StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(this.host, this.port, this.partition, this.appId.toString(), this.httpConnectionParams.isSSLShuffleEnabled());
            this.url = ShuffleUtils.constructInputURL(baseURI.toString(), this.srcAttempts, this.httpConnectionParams.getKeepAlive());
            this.httpConnection = new HttpConnection(this.url, this.httpConnectionParams, this.logIdentifier, this.shuffleSecret);
            this.httpConnection.connect();
        }
        catch (IOException e) {
            InputAttemptIdentifier[] failedFetches = null;
            if (this.isShutDown.get()) {
                LOG.info((Object)"Not reporting fetch failure, since an Exception was caught after shutdown");
            } else {
                failedFetches = this.remaining.toArray(new InputAttemptIdentifier[this.remaining.size()]);
            }
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), failedFetches, true);
        }
        if (this.isShutDown.get()) {
            this.shutdownInternal();
            LOG.info((Object)"Detected fetcher has been shutdown after connection establishment. Returning");
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), null, false);
        }
        try {
            this.input = this.httpConnection.getInputStream();
            this.httpConnection.validate();
        }
        catch (IOException e) {
            if (this.isShutDown.get()) {
                LOG.info((Object)"Not reporting fetch failure, since an Exception was caught after shutdown");
            }
            InputAttemptIdentifier firstAttempt = this.srcAttempts.get(0);
            LOG.warn((Object)("Fetch Failure from host while connecting: " + this.host + ", attempt: " + firstAttempt + " Informing ShuffleManager: "), (Throwable)e);
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), new InputAttemptIdentifier[]{firstAttempt}, false);
        }
        if (this.isShutDown.get()) {
            this.shutdownInternal();
            LOG.info((Object)"Detected fetcher has been shutdown after opening stream. Returning");
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), null, false);
        }
        InputAttemptIdentifier[] failedInputs = null;
        while (!this.remaining.isEmpty() && failedInputs == null) {
            failedInputs = this.fetchInputs(this.input);
        }
        return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), failedInputs, false);
    }

    @VisibleForTesting
    protected HostFetchResult setupLocalDiskFetch() {
        Iterator iterator = this.remaining.iterator();
        while (iterator.hasNext()) {
            InputAttemptIdentifier srcAttemptId = (InputAttemptIdentifier)iterator.next();
            long startTime = System.currentTimeMillis();
            FetchedInput fetchedInput = null;
            try {
                TezIndexRecord idxRecord = this.getTezIndexRecord(srcAttemptId);
                fetchedInput = new LocalDiskFetchedInput(idxRecord.getStartOffset(), idxRecord.getRawLength(), idxRecord.getPartLength(), srcAttemptId, this.getShuffleInputFileName(srcAttemptId.getPathComponent(), null), this.conf, new FetchedInputCallback(){

                    @Override
                    public void fetchComplete(FetchedInput fetchedInput) {
                    }

                    @Override
                    public void fetchFailed(FetchedInput fetchedInput) {
                    }

                    @Override
                    public void freeResources(FetchedInput fetchedInput) {
                    }
                });
                LOG.info((Object)("fetcher about to shuffle output of srcAttempt (direct disk)" + srcAttemptId + " decomp: " + idxRecord.getRawLength() + " len: " + idxRecord.getPartLength() + " to " + (Object)((Object)fetchedInput.getType())));
                long endTime = System.currentTimeMillis();
                this.fetcherCallback.fetchSucceeded(this.host, srcAttemptId, fetchedInput, idxRecord.getPartLength(), idxRecord.getRawLength(), endTime - startTime);
                iterator.remove();
            }
            catch (IOException e) {
                LOG.warn((Object)("Failed to shuffle output of " + srcAttemptId + " from " + this.host + "(local fetch)"), (Throwable)e);
                if (fetchedInput == null) continue;
                try {
                    fetchedInput.abort();
                }
                catch (IOException e1) {
                    LOG.info((Object)("Failed to cleanup fetchedInput " + fetchedInput));
                }
            }
        }
        InputAttemptIdentifier[] failedFetches = null;
        if (this.remaining.size() > 0) {
            failedFetches = this.remaining.toArray(new InputAttemptIdentifier[this.remaining.size()]);
        }
        return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), failedFetches, false);
    }

    @VisibleForTesting
    protected TezIndexRecord getTezIndexRecord(InputAttemptIdentifier srcAttemptId) throws IOException {
        Path indexFile = this.getShuffleInputFileName(srcAttemptId.getPathComponent(), ".index");
        TezSpillRecord spillRecord = new TezSpillRecord(indexFile, this.conf);
        TezIndexRecord idxRecord = spillRecord.getIndex(this.partition);
        return idxRecord;
    }

    @VisibleForTesting
    protected Path getShuffleInputFileName(String pathComponent, String suffix) throws IOException {
        LocalDirAllocator localDirAllocator = new LocalDirAllocator("tez.runtime.framework.local.dirs");
        suffix = suffix != null ? suffix : "";
        String pathFromLocalDir = "output/" + pathComponent + "/" + "file.out" + suffix;
        return localDirAllocator.getLocalPathToRead(pathFromLocalDir, this.conf);
    }

    public void shutdown() {
        if (!this.isShutDown.getAndSet(true)) {
            this.shutdownInternal();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void shutdownInternal() {
        AtomicBoolean atomicBoolean = this.isShutDown;
        synchronized (atomicBoolean) {
            block6: {
                try {
                    if (this.httpConnection != null) {
                        this.httpConnection.cleanup(false);
                    }
                }
                catch (IOException e) {
                    LOG.info((Object)("Exception while shutting down fetcher on " + this.logIdentifier + " : " + e.getMessage()));
                    if (!LOG.isDebugEnabled()) break block6;
                    LOG.debug((Object)e);
                }
            }
        }
    }

    private InputAttemptIdentifier[] fetchInputs(DataInputStream input) {
        FetchedInput fetchedInput = null;
        InputAttemptIdentifier srcAttemptId = null;
        long decompressedLength = -1L;
        long compressedLength = -1L;
        try {
            long startTime = System.currentTimeMillis();
            int responsePartition = -1;
            String pathComponent = null;
            try {
                ShuffleHeader header = new ShuffleHeader();
                header.readFields(input);
                pathComponent = header.getMapId();
                srcAttemptId = this.pathToAttemptMap.get(pathComponent);
                compressedLength = header.getCompressedLength();
                decompressedLength = header.getUncompressedLength();
                responsePartition = header.getPartition();
            }
            catch (IllegalArgumentException e) {
                LOG.warn((Object)"Invalid src id ", (Throwable)e);
                return this.remaining.toArray(new InputAttemptIdentifier[this.remaining.size()]);
            }
            if (!this.verifySanity(compressedLength, decompressedLength, responsePartition, srcAttemptId, pathComponent)) {
                if (srcAttemptId == null) {
                    LOG.warn((Object)("Was expecting " + this.getNextRemainingAttempt() + " but got null"));
                    srcAttemptId = this.getNextRemainingAttempt();
                }
                assert (srcAttemptId != null);
                return new InputAttemptIdentifier[]{srcAttemptId};
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("header: " + srcAttemptId + ", len: " + compressedLength + ", decomp len: " + decompressedLength));
            }
            fetchedInput = this.inputManager.allocate(decompressedLength, compressedLength, srcAttemptId);
            LOG.info((Object)("fetcher about to shuffle output of srcAttempt " + fetchedInput.getInputAttemptIdentifier() + " decomp: " + decompressedLength + " len: " + compressedLength + " to " + (Object)((Object)fetchedInput.getType())));
            if (fetchedInput.getType() == FetchedInput.Type.MEMORY) {
                ShuffleUtils.shuffleToMemory(((MemoryFetchedInput)fetchedInput).getBytes(), input, (int)decompressedLength, (int)compressedLength, this.codec, this.ifileReadAhead, this.ifileReadAheadLength, LOG, fetchedInput.getInputAttemptIdentifier().toString());
            } else if (fetchedInput.getType() == FetchedInput.Type.DISK) {
                ShuffleUtils.shuffleToDisk(((DiskFetchedInput)fetchedInput).getOutputStream(), this.host + ":" + this.port, input, compressedLength, LOG, fetchedInput.getInputAttemptIdentifier().toString());
            } else {
                throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " + fetchedInput);
            }
            long endTime = System.currentTimeMillis();
            this.fetcherCallback.fetchSucceeded(this.host, srcAttemptId, fetchedInput, compressedLength, decompressedLength, endTime - startTime);
            this.remaining.remove(srcAttemptId);
            return null;
        }
        catch (IOException ioe) {
            if (srcAttemptId == null || fetchedInput == null) {
                LOG.info((Object)("fetcher failed to read map header" + srcAttemptId + " decomp: " + decompressedLength + ", " + compressedLength), (Throwable)ioe);
                if (srcAttemptId == null) {
                    return this.remaining.toArray(new InputAttemptIdentifier[this.remaining.size()]);
                }
                return new InputAttemptIdentifier[]{srcAttemptId};
            }
            LOG.warn((Object)("Failed to shuffle output of " + srcAttemptId + " from " + this.host), (Throwable)ioe);
            try {
                fetchedInput.abort();
            }
            catch (IOException e) {
                LOG.info((Object)("Failure to cleanup fetchedInput: " + fetchedInput));
            }
            return new InputAttemptIdentifier[]{srcAttemptId};
        }
    }

    private boolean verifySanity(long compressedLength, long decompressedLength, int fetchPartition, InputAttemptIdentifier srcAttemptId, String pathComponent) {
        if (compressedLength < 0L || decompressedLength < 0L) {
            LOG.warn((Object)(" invalid lengths in input header -> headerPathComponent: " + pathComponent + ", nextRemainingSrcAttemptId: " + this.getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId + " len: " + compressedLength + ", decomp len: " + decompressedLength));
            return false;
        }
        if (fetchPartition != this.partition) {
            LOG.warn((Object)(" data for the wrong reduce -> headerPathComponent: " + pathComponent + "nextRemainingSrcAttemptId: " + this.getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId + " len: " + compressedLength + " decomp len: " + decompressedLength + " for reduce " + fetchPartition));
            return false;
        }
        if (!this.remaining.contains(srcAttemptId)) {
            LOG.warn((Object)("Invalid input. Received output for headerPathComponent: " + pathComponent + "nextRemainingSrcAttemptId: " + this.getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId));
            return false;
        }
        return true;
    }

    private InputAttemptIdentifier getNextRemainingAttempt() {
        if (this.remaining.size() > 0) {
            return (InputAttemptIdentifier)this.remaining.iterator().next();
        }
        return null;
    }

    public int hashCode() {
        return this.fetcherIdentifier;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null) {
            return false;
        }
        if (this.getClass() != obj.getClass()) {
            return false;
        }
        Fetcher other = (Fetcher)obj;
        return this.fetcherIdentifier == other.fetcherIdentifier;
    }

    public static class FetcherBuilder {
        private Fetcher fetcher;
        private boolean workAssigned = false;

        public FetcherBuilder(FetcherCallback fetcherCallback, HttpConnection.HttpConnectionParams params, FetchedInputAllocator inputManager, ApplicationId appId, SecretKey shuffleSecret, String srcNameTrimmed, Configuration conf, boolean localDiskFetchEnabled) {
            this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, shuffleSecret, srcNameTrimmed, conf, localDiskFetchEnabled);
        }

        public FetcherBuilder setHttpConnectionParameters(HttpConnection.HttpConnectionParams httpParams) {
            this.fetcher.httpConnectionParams = httpParams;
            return this;
        }

        public FetcherBuilder setCompressionParameters(CompressionCodec codec) {
            this.fetcher.codec = codec;
            return this;
        }

        public FetcherBuilder setIFileParams(boolean readAhead, int readAheadBytes) {
            this.fetcher.ifileReadAhead = readAhead;
            this.fetcher.ifileReadAheadLength = readAheadBytes;
            return this;
        }

        public FetcherBuilder assignWork(String host, int port, int partition, List<InputAttemptIdentifier> inputs) {
            this.fetcher.host = host;
            this.fetcher.port = port;
            this.fetcher.partition = partition;
            this.fetcher.srcAttempts = inputs;
            this.workAssigned = true;
            return this;
        }

        public Fetcher build() {
            Preconditions.checkState((this.workAssigned ? 1 : 0) != 0, (Object)"Cannot build a fetcher withot assigning work to it");
            return this.fetcher;
        }
    }

    static class HostFetchResult {
        private final FetchResult fetchResult;
        private final InputAttemptIdentifier[] failedInputs;
        private final boolean connectFailed;

        public HostFetchResult(FetchResult fetchResult, InputAttemptIdentifier[] failedInputs, boolean connectFailed) {
            this.fetchResult = fetchResult;
            this.failedInputs = failedInputs;
            this.connectFailed = connectFailed;
        }
    }
}

