/*
 * Decompiled with CFR 0.152.
 */
package org.apache.htrace.impl;

import java.io.IOException;
import java.net.URL;
import java.util.ArrayDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.htrace.HTraceConfiguration;
import org.apache.htrace.Span;
import org.apache.htrace.SpanReceiver;
import org.apache.htrace.commons.logging.Log;
import org.apache.htrace.commons.logging.LogFactory;
import org.apache.htrace.impl.ProcessId;
import org.apache.htrace.jetty.client.HttpClient;
import org.apache.htrace.jetty.client.api.ContentResponse;
import org.apache.htrace.jetty.client.api.Request;
import org.apache.htrace.jetty.client.util.StringContentProvider;
import org.apache.htrace.jetty.http.HttpField;
import org.apache.htrace.jetty.http.HttpHeader;
import org.apache.htrace.jetty.http.HttpMethod;

public class HTracedRESTReceiver
implements SpanReceiver {
    private static final Log LOG = LogFactory.getLog(HTracedRESTReceiver.class);
    private final HttpClient httpClient;
    private final int capacity;
    private final String url;
    private final int maxToSendAtATime;
    private final PostSpans postSpans;
    private final Thread postSpansThread;
    public static final String CLIENT_CONNECT_TIMEOUT_MS_KEY = "client.connect.timeout.ms";
    private static final int CLIENT_CONNECT_TIMEOUT_MS_DEFAULT = 30000;
    public static final String CLIENT_IDLE_TIMEOUT_MS_KEY = "client.idle.timeout.ms";
    private static final int CLIENT_IDLE_TIMEOUT_MS_DEFAULT = 120000;
    public static final String HTRACED_REST_URL_KEY = "htraced.rest.url";
    private static final String HTRACED_REST_URL_DEFAULT = "http://localhost:9095/";
    public static final String CLIENT_REST_QUEUE_CAPACITY_KEY = "client.rest.queue.capacity";
    private static final int CLIENT_REST_QUEUE_CAPACITY_DEFAULT = 1000000;
    public static final String CLIENT_REST_PERIOD_MS_KEY = "client.rest.period.ms";
    private static final int CLIENT_REST_PERIOD_MS_DEFAULT = 30000;
    public static final String CLIENT_REST_MAX_SPANS_AT_A_TIME_KEY = "htrace.client.rest.batch.size";
    private static final int CLIENT_REST_MAX_SPANS_AT_A_TIME_DEFAULT = 100;
    private ReentrantLock lock = new ReentrantLock();
    private Condition cond = this.lock.newCondition();
    private boolean shutdown = false;
    private final ArrayDeque<Span> spans;
    private AtomicLong lastAtCapacityWarningLog = new AtomicLong(0L);
    private boolean mustStartFlush;
    private final ProcessId processId;
    private static long WARN_TIMEOUT_MS = 300000L;

    HttpClient createHttpClient(long connTimeout, long idleTimeout) {
        HttpClient httpClient = new HttpClient();
        httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT, this.getClass().getSimpleName()));
        httpClient.setConnectTimeout(connTimeout);
        httpClient.setIdleTimeout(idleTimeout);
        return httpClient;
    }

    public HTracedRESTReceiver(HTraceConfiguration conf) throws Exception {
        int connTimeout = conf.getInt(CLIENT_CONNECT_TIMEOUT_MS_KEY, 30000);
        int idleTimeout = conf.getInt(CLIENT_IDLE_TIMEOUT_MS_KEY, 120000);
        this.httpClient = this.createHttpClient(connTimeout, idleTimeout);
        this.capacity = conf.getInt(CLIENT_REST_QUEUE_CAPACITY_KEY, 1000000);
        this.spans = new ArrayDeque(this.capacity);
        URL restServer = new URL(conf.get(HTRACED_REST_URL_KEY, HTRACED_REST_URL_DEFAULT));
        URL url = new URL(restServer.getProtocol(), restServer.getHost(), restServer.getPort(), "/writeSpans");
        this.url = url.toString();
        int periodInMs = conf.getInt(CLIENT_REST_PERIOD_MS_KEY, 30000);
        this.maxToSendAtATime = conf.getInt(CLIENT_REST_MAX_SPANS_AT_A_TIME_KEY, 100);
        this.httpClient.start();
        this.postSpans = new PostSpans(periodInMs);
        this.postSpansThread = new Thread(this.postSpans);
        this.postSpansThread.setDaemon(true);
        this.postSpansThread.setName("PostSpans");
        this.postSpansThread.start();
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Created new HTracedRESTReceiver with connTimeout=" + connTimeout + ", idleTimeout = " + idleTimeout + ", capacity=" + this.capacity + ", url=" + url + ", periodInMs=" + periodInMs + ", maxToSendAtATime=" + this.maxToSendAtATime));
        }
        this.processId = new ProcessId(conf);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws IOException {
        LOG.debug((Object)("Closing HTracedRESTReceiver(" + this.url + ")."));
        this.lock.lock();
        try {
            this.shutdown = true;
            this.cond.signal();
        }
        finally {
            this.lock.unlock();
        }
        try {
            this.postSpansThread.join(120000L);
            if (this.postSpansThread.isAlive()) {
                LOG.error((Object)("Timed out without closing HTracedRESTReceiver(" + this.url + ")."));
            } else {
                LOG.debug((Object)("Closed HTracedRESTReceiver(" + this.url + ")."));
            }
        }
        catch (InterruptedException e) {
            LOG.error((Object)"Interrupted while joining postSpans", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void startFlushing() {
        LOG.info((Object)"Triggering HTracedRESTReceiver flush.");
        this.lock.lock();
        try {
            this.mustStartFlush = true;
            this.cond.signal();
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void receiveSpan(Span span) {
        long last;
        long now;
        boolean added = false;
        this.lock.lock();
        try {
            if (this.shutdown) {
                LOG.trace((Object)("receiveSpan(span=" + span + "): HTracedRESTReceiver " + "is already shut down."));
                return;
            }
            if (this.spans.size() < this.capacity) {
                this.spans.add(span);
                added = true;
                if (this.spans.size() >= this.maxToSendAtATime) {
                    this.cond.signal();
                }
            } else {
                this.cond.signal();
            }
        }
        finally {
            this.lock.unlock();
        }
        if (!added && (now = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS)) - (last = this.lastAtCapacityWarningLog.get()) > WARN_TIMEOUT_MS && this.lastAtCapacityWarningLog.compareAndSet(last, now)) {
            LOG.warn((Object)("There are too many HTrace spans to buffer!  We have already buffered " + this.capacity + " spans.  Dropping spans."));
        }
    }

    private class PostSpans
    implements Runnable {
        private final long periodInNs;
        private final ArrayDeque<Span> spanBuf;

        private PostSpans(long periodInMs) {
            this.periodInNs = TimeUnit.NANOSECONDS.convert(periodInMs, TimeUnit.MILLISECONDS);
            this.spanBuf = new ArrayDeque(HTracedRESTReceiver.this.maxToSendAtATime);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                long waitNs = this.periodInNs;
                while (true) {
                    HTracedRESTReceiver.this.lock.lock();
                    try {
                        if (HTracedRESTReceiver.this.shutdown) {
                            if (HTracedRESTReceiver.this.spans.isEmpty()) {
                                LOG.debug((Object)"Shutting down PostSpans thread...");
                                break;
                            }
                        } else {
                            try {
                                waitNs = HTracedRESTReceiver.this.cond.awaitNanos(waitNs);
                                if (HTracedRESTReceiver.this.mustStartFlush) {
                                    waitNs = 0L;
                                    HTracedRESTReceiver.this.mustStartFlush = false;
                                }
                            }
                            catch (InterruptedException e) {
                                LOG.info((Object)"Got InterruptedException");
                                waitNs = 0L;
                            }
                        }
                        if (HTracedRESTReceiver.this.spans.size() > HTracedRESTReceiver.this.maxToSendAtATime || waitNs <= 0L || HTracedRESTReceiver.this.shutdown) {
                            this.loadSpanBuf();
                            waitNs = this.periodInNs;
                        }
                    }
                    finally {
                        HTracedRESTReceiver.this.lock.unlock();
                    }
                    if (this.spanBuf.isEmpty()) continue;
                    this.sendSpans();
                    this.spanBuf.clear();
                }
            }
            finally {
                if (HTracedRESTReceiver.this.httpClient != null) {
                    try {
                        HTracedRESTReceiver.this.httpClient.stop();
                    }
                    catch (Exception e) {
                        LOG.error((Object)"Error shutting down httpClient", (Throwable)e);
                    }
                }
                HTracedRESTReceiver.this.spans.clear();
            }
        }

        private void loadSpanBuf() {
            for (int loaded = 0; loaded < HTracedRESTReceiver.this.maxToSendAtATime; ++loaded) {
                Span span = (Span)HTracedRESTReceiver.this.spans.pollFirst();
                if (span == null) {
                    return;
                }
                this.spanBuf.add(span);
            }
        }

        private void sendSpans() {
            try {
                Request request = HTracedRESTReceiver.this.httpClient.newRequest(HTracedRESTReceiver.this.url).method(HttpMethod.POST);
                request.header(HttpHeader.CONTENT_TYPE, "application/json");
                request.header("htrace-pid", HTracedRESTReceiver.this.processId.get());
                StringBuilder bld = new StringBuilder();
                for (Span span : this.spanBuf) {
                    bld.append(span.toJson());
                }
                request.content(new StringContentProvider(bld.toString()));
                ContentResponse response = request.send();
                if (response.getStatus() == 200) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("POSTED " + this.spanBuf.size() + " spans"));
                    }
                } else {
                    LOG.error((Object)("Status: " + response.getStatus()));
                    LOG.error((Object)response.getHeaders());
                    LOG.error((Object)response.getContentAsString());
                }
            }
            catch (InterruptedException e) {
                LOG.error((Object)e);
            }
            catch (TimeoutException e) {
                LOG.error((Object)e);
            }
            catch (ExecutionException e) {
                LOG.error((Object)e);
            }
        }
    }
}

