/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.http.async.netty;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.ning.http.client.AsyncHandler;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfig;
import com.ning.http.client.ListenableFuture;
import com.ning.http.client.Request;
import com.ning.http.client.RequestBuilder;
import com.ning.http.client.Response;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.URL;
import java.util.concurrent.Future;
import org.apache.commons.io.IOUtils;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.http.BaseHttpConnection;
import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.http.SSLFactory;
import org.apache.tez.http.async.netty.TezBodyDeferringAsyncHandler;
import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncHttpConnection
extends BaseHttpConnection {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncHttpConnection.class);
    private final JobTokenSecretManager jobTokenSecretMgr;
    private String encHash;
    private String msgToEncode;
    private final HttpConnectionParams httpConnParams;
    private final Stopwatch stopWatch;
    private final URL url;
    private static volatile AsyncHttpClient httpAsyncClient;
    private final TezBodyDeferringAsyncHandler handler;
    private final PipedOutputStream pos;
    private final PipedInputStream pis;
    private Response response;
    private ListenableFuture<Response> responseFuture;
    private TezBodyDeferringAsyncHandler.BodyDeferringInputStream dis;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void initClient(HttpConnectionParams httpConnParams) throws IOException {
        if (httpAsyncClient != null) {
            return;
        }
        if (httpAsyncClient != null) return;
        Class<AsyncHttpConnection> clazz = AsyncHttpConnection.class;
        synchronized (AsyncHttpConnection.class) {
            if (httpAsyncClient != null) return;
            LOG.info("Initializing AsyncClient (TezBodyDeferringAsyncHandler)");
            AsyncHttpClientConfig.Builder builder = new AsyncHttpClientConfig.Builder();
            if (httpConnParams.isSslShuffle()) {
                SSLFactory sslFactory = httpConnParams.getSslFactory();
                Preconditions.checkArgument((sslFactory != null ? 1 : 0) != 0, (Object)"SSLFactory can not be null");
                sslFactory.configure(builder);
            }
            builder.setAllowPoolingConnection(httpConnParams.isKeepAlive()).setAllowSslConnectionPool(httpConnParams.isKeepAlive()).setCompressionEnabled(false).setMaximumConnectionsPerHost(1).setConnectionTimeoutInMs(httpConnParams.getConnectionTimeout()).setRequestTimeoutInMs(httpConnParams.getReadTimeout()).setUseRawUrl(true).build();
            httpAsyncClient = new AsyncHttpClient(builder.build());
            // ** MonitorExit[var2_2] (shouldn't be in output)
            return;
        }
    }

    public AsyncHttpConnection(URL url, HttpConnectionParams connParams, String logIdentifier, JobTokenSecretManager jobTokenSecretManager) throws IOException {
        this.jobTokenSecretMgr = jobTokenSecretManager;
        this.httpConnParams = connParams;
        this.url = url;
        this.stopWatch = new Stopwatch();
        if (LOG.isDebugEnabled()) {
            LOG.debug("MapOutput URL :" + url.toString());
        }
        this.initClient(this.httpConnParams);
        this.pos = new PipedOutputStream();
        this.pis = new PipedInputStream(this.pos, this.httpConnParams.getBufferSize());
        this.handler = new TezBodyDeferringAsyncHandler(this.pos, url, 60000);
    }

    @VisibleForTesting
    public void computeEncHash() throws IOException {
        this.msgToEncode = SecureShuffleUtils.buildMsgFrom(this.url);
        this.encHash = SecureShuffleUtils.hashFromString(this.msgToEncode, this.jobTokenSecretMgr);
    }

    @Override
    public boolean connect() throws IOException, InterruptedException {
        this.computeEncHash();
        RequestBuilder rb = new RequestBuilder();
        rb.setHeader("UrlHash", this.encHash);
        rb.setHeader("name", "mapreduce");
        rb.setHeader("version", "1.0.0");
        Request request = rb.setUrl(this.url.toString()).build();
        LOG.debug("Request url={}, encHash={}, id={}", (Object)this.url, (Object)this.encHash);
        this.responseFuture = httpAsyncClient.executeRequest(request, (AsyncHandler)this.handler);
        this.dis = new TezBodyDeferringAsyncHandler.BodyDeferringInputStream((Future<Response>)this.responseFuture, this.handler, this.pis);
        this.response = this.dis.getAsapResponse();
        if (this.response == null) {
            throw new IOException("Response is null");
        }
        int rc = this.response.getStatusCode();
        if (rc != 200) {
            LOG.debug("Request url={}, id={}", (Object)this.response.getUri());
            throw new IOException("Got invalid response code " + rc + " from " + this.url + ": " + this.response.getStatusText());
        }
        return true;
    }

    @Override
    public void validate() throws IOException {
        if (!"mapreduce".equals(this.response.getHeader("name")) || !"1.0.0".equals(this.response.getHeader("version"))) {
            throw new IOException("Incompatible shuffle response version");
        }
        String replyHash = this.response.getHeader("ReplyHash");
        if (replyHash == null) {
            throw new IOException("security validation of TT Map output failed");
        }
        LOG.debug("url={};encHash={};replyHash={}", new Object[]{this.msgToEncode, this.encHash, replyHash});
        SecureShuffleUtils.verifyReply(replyHash, this.encHash, this.jobTokenSecretMgr);
        LOG.info("for url={} sent hash and receievd reply {} ms", (Object)this.url, (Object)this.stopWatch.elapsedMillis());
    }

    @Override
    public DataInputStream getInputStream() throws IOException, InterruptedException {
        Preconditions.checkState((this.response != null ? 1 : 0) != 0, (Object)"Response can not be null");
        return new DataInputStream(this.dis);
    }

    @VisibleForTesting
    public void close() {
        httpAsyncClient.close();
        httpAsyncClient = null;
    }

    @Override
    public void cleanup(boolean disconnect) throws IOException {
        if (this.response != null) {
            this.dis.close();
        }
        IOUtils.closeQuietly((OutputStream)this.pos);
        IOUtils.closeQuietly((InputStream)this.pis);
        this.response = null;
    }
}

