/*
 * Decompiled with CFR 0.152.
 */
package org.apache.htrace.impl;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import org.apache.htrace.core.HTraceConfiguration;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.SpanReceiver;

public class FlumeSpanReceiver
extends SpanReceiver {
    private static final Log LOG = LogFactory.getLog(FlumeSpanReceiver.class);
    public static final String NUM_THREADS_KEY = "htrace.flume.num-threads";
    public static final int DEFAULT_NUM_THREADS = 1;
    public static final String FLUME_HOSTNAME_KEY = "htrace.flume.hostname";
    public static final String DEFAULT_FLUME_HOSTNAME = "localhost";
    public static final String FLUME_PORT_KEY = "htrace.flume.port";
    public static final String FLUME_BATCHSIZE_KEY = "htrace.flume.batchsize";
    public static final int DEFAULT_FLUME_BATCHSIZE = 100;
    private static final int SHUTDOWN_TIMEOUT = 30;
    private static final int MAX_ERRORS = 10;
    private final BlockingQueue<Span> queue;
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final ThreadFactory tf;
    private ExecutorService service;
    private int maxSpanBatchSize;
    private String flumeHostName;
    private int flumePort;

    public FlumeSpanReceiver(HTraceConfiguration conf) {
        this.queue = new ArrayBlockingQueue<Span>(1000);
        this.tf = new SimpleThreadFactory();
        this.configure(conf);
    }

    private void configure(HTraceConfiguration conf) {
        int numThreads = conf.getInt(NUM_THREADS_KEY, 1);
        this.flumeHostName = conf.get(FLUME_HOSTNAME_KEY, DEFAULT_FLUME_HOSTNAME);
        this.flumePort = conf.getInt(FLUME_PORT_KEY, 0);
        if (this.flumePort == 0) {
            throw new IllegalArgumentException("htrace.flume.port is required in configuration.");
        }
        this.maxSpanBatchSize = conf.getInt(FLUME_BATCHSIZE_KEY, 100);
        if (this.service != null) {
            this.service.shutdownNow();
            this.service = null;
        }
        this.service = Executors.newFixedThreadPool(numThreads, this.tf);
        for (int i = 0; i < numThreads; ++i) {
            this.service.submit(new WriteSpanRunnable());
        }
    }

    public void close() throws IOException {
        this.running.set(false);
        this.service.shutdown();
        try {
            if (!this.service.awaitTermination(30L, TimeUnit.SECONDS)) {
                LOG.error((Object)("Was not able to process all remaining spans upon closing in: 30 " + (Object)((Object)TimeUnit.SECONDS) + ". Left Spans could be dropped."));
            }
        }
        catch (InterruptedException e1) {
            LOG.warn((Object)"Thread interrupted when terminating executor.", (Throwable)e1);
        }
    }

    public void receiveSpan(Span span) {
        if (this.running.get()) {
            try {
                this.queue.add(span);
            }
            catch (IllegalStateException e) {
                LOG.error((Object)("Error trying to append span (" + span.getDescription() + ") to the queue. Blocking Queue was full."));
            }
        }
    }

    private class WriteSpanRunnable
    implements Runnable {
        private RpcClient flumeClient = null;

        private WriteSpanRunnable() {
        }

        @Override
        public void run() {
            ArrayList<Span> dequeuedSpans = new ArrayList<Span>(FlumeSpanReceiver.this.maxSpanBatchSize);
            long errorCount = 0L;
            while (FlumeSpanReceiver.this.running.get() || FlumeSpanReceiver.this.queue.size() > 0) {
                Span firstSpan = null;
                try {
                    firstSpan = (Span)FlumeSpanReceiver.this.queue.poll(1L, TimeUnit.SECONDS);
                    if (firstSpan != null) {
                        dequeuedSpans.add(firstSpan);
                        FlumeSpanReceiver.this.queue.drainTo(dequeuedSpans, FlumeSpanReceiver.this.maxSpanBatchSize - 1);
                    }
                }
                catch (InterruptedException ie) {
                    // empty catch block
                }
                this.startClient();
                if (dequeuedSpans.isEmpty()) continue;
                try {
                    ArrayList<Event> events = new ArrayList<Event>(dequeuedSpans.size());
                    for (Span span : dequeuedSpans) {
                        HashMap<String, String> headers = new HashMap<String, String>();
                        headers.put("SpanId", span.toString());
                        headers.put("TracerId", span.getTracerId());
                        headers.put("Description", span.getDescription());
                        String body = span.toJson();
                        Event evt = EventBuilder.withBody((String)body, (Charset)Charset.forName("UTF-8"), headers);
                        events.add(evt);
                    }
                    this.flumeClient.appendBatch(events);
                    dequeuedSpans.clear();
                    errorCount = 0L;
                }
                catch (Exception e) {
                    if (++errorCount < 10L) {
                        try {
                            FlumeSpanReceiver.this.queue.addAll(dequeuedSpans);
                        }
                        catch (IllegalStateException ex) {
                            LOG.error((Object)("Drop " + dequeuedSpans.size() + " span(s) because writing to HBase failed."));
                        }
                    }
                    this.closeClient();
                    try {
                        Thread.sleep(500L);
                    }
                    catch (InterruptedException e1) {}
                }
            }
            this.closeClient();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void closeClient() {
            if (this.flumeClient != null) {
                try {
                    this.flumeClient.close();
                }
                catch (FlumeException ex) {
                    LOG.warn((Object)"Error while trying to close Flume Rpc Client.", (Throwable)ex);
                }
                finally {
                    this.flumeClient = null;
                }
            }
        }

        private void startClient() {
            if (this.flumeClient != null && !this.flumeClient.isActive()) {
                this.flumeClient.close();
                this.flumeClient = null;
            }
            if (this.flumeClient == null) {
                try {
                    this.flumeClient = RpcClientFactory.getDefaultInstance((String)FlumeSpanReceiver.this.flumeHostName, (Integer)FlumeSpanReceiver.this.flumePort, (Integer)FlumeSpanReceiver.this.maxSpanBatchSize);
                }
                catch (FlumeException e) {
                    LOG.warn((Object)("Failed to create Flume RPC Client. " + e.getMessage()));
                }
            }
        }
    }

    private class SimpleThreadFactory
    implements ThreadFactory {
        final AtomicLong count = new AtomicLong(0L);

        private SimpleThreadFactory() {
        }

        @Override
        public Thread newThread(Runnable arg0) {
            String name = String.format("flumeSpanReceiver-%d", this.count.getAndIncrement());
            Thread t = new Thread(arg0, name);
            t.setDaemon(true);
            return t;
        }
    }
}

