/*
 * Decompiled with CFR 0.152.
 */
package com.metamx.emitter.core;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.ISE;
import com.metamx.common.StringUtils;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.core.Emitter;
import com.metamx.emitter.core.Event;
import com.metamx.emitter.core.HttpEmitterConfig;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.HttpResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.jboss.netty.handler.codec.http.HttpMethod;

public class HttpPostEmitter
implements Flushable,
Closeable,
Emitter {
    private static final int MAX_EVENT_SIZE = 1047552;
    private static final long BUFFER_FULL_WARNING_THROTTLE = 30000L;
    private static final Logger log = new Logger(HttpPostEmitter.class);
    private static final AtomicInteger instanceCounter = new AtomicInteger();
    private final HttpEmitterConfig config;
    private final HttpClient client;
    private final ObjectMapper jsonMapper;
    private final URL url;
    private final AtomicReference<List<byte[]>> eventsList = new AtomicReference<LinkedList>(Lists.newLinkedList());
    private final AtomicInteger count = new AtomicInteger(0);
    private final AtomicLong bufferedSize = new AtomicLong(0L);
    private final ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat(String.format("HttpPostEmitter-%s-%%s", instanceCounter.incrementAndGet())).build());
    private final AtomicLong version = new AtomicLong(0L);
    private final AtomicBoolean started = new AtomicBoolean(false);
    private long lastBufferFullWarning = 0L;
    private long messagesDroppedSinceLastBufferFullWarning = 0L;

    public HttpPostEmitter(HttpEmitterConfig config, HttpClient client) {
        this(config, client, new ObjectMapper());
    }

    public HttpPostEmitter(HttpEmitterConfig config, HttpClient client, ObjectMapper jsonMapper) {
        int batchOverhead = config.getBatchingStrategy().batchStart().length + config.getBatchingStrategy().batchEnd().length;
        Preconditions.checkArgument((config.getMaxBatchSize() >= 1047552 + batchOverhead ? 1 : 0) != 0, (Object)String.format("maxBatchSize must be greater than MAX_EVENT_SIZE[%,d] + overhead[%,d].", 1047552, batchOverhead));
        Preconditions.checkArgument((config.getMaxBufferSize() >= 1047552L ? 1 : 0) != 0, (Object)String.format("maxBufferSize must be greater than MAX_EVENT_SIZE[%,d].", 1047552));
        this.config = config;
        this.client = client;
        this.jsonMapper = jsonMapper;
        try {
            this.url = new URL(config.getRecipientBaseUrl());
        }
        catch (MalformedURLException e) {
            throw new ISE((Throwable)e, "Bad URL: %s", new Object[]{config.getRecipientBaseUrl()});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @LifecycleStart
    public void start() {
        AtomicBoolean atomicBoolean = this.started;
        synchronized (atomicBoolean) {
            if (!this.started.getAndSet(true)) {
                this.exec.schedule(new EmittingRunnable(this.version.get()), this.config.getFlushMillis(), TimeUnit.MILLISECONDS);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void emit(Event event) {
        byte[] eventBytes;
        AtomicBoolean atomicBoolean = this.started;
        synchronized (atomicBoolean) {
            if (!this.started.get()) {
                throw new RejectedExecutionException("Service is closed.");
            }
        }
        try {
            eventBytes = this.jsonMapper.writeValueAsBytes((Object)event);
        }
        catch (IOException e) {
            throw Throwables.propagate((Throwable)e);
        }
        if (eventBytes.length > 1047552) {
            log.error("Event too large to emit (%,d > %,d): %s ...", new Object[]{eventBytes.length, 1047552, StringUtils.fromUtf8((ByteBuffer)ByteBuffer.wrap(eventBytes), (int)1024)});
            return;
        }
        AtomicReference<List<byte[]>> atomicReference = this.eventsList;
        synchronized (atomicReference) {
            if (this.bufferedSize.get() + (long)eventBytes.length <= this.config.getMaxBufferSize()) {
                this.eventsList.get().add(eventBytes);
                this.bufferedSize.addAndGet(eventBytes.length);
                if (!event.isSafeToBuffer() || this.count.incrementAndGet() >= this.config.getFlushCount()) {
                    this.exec.execute(new EmittingRunnable(this.version.get()));
                }
            } else {
                ++this.messagesDroppedSinceLastBufferFullWarning;
            }
            long now = System.currentTimeMillis();
            if (this.messagesDroppedSinceLastBufferFullWarning > 0L && this.lastBufferFullWarning + 30000L < now) {
                log.error("Buffer full: dropped %,d events!", new Object[]{this.messagesDroppedSinceLastBufferFullWarning});
                this.lastBufferFullWarning = now;
                this.messagesDroppedSinceLastBufferFullWarning = 0L;
            }
        }
    }

    @Override
    public void flush() throws IOException {
        if (this.started.get()) {
            Future<?> future = this.exec.submit(new EmittingRunnable(this.version.get()));
            try {
                future.get(this.config.getFlushTimeOut(), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                log.debug("Thread Interrupted", new Object[0]);
                Thread.currentThread().interrupt();
                throw new IOException("Thread Interrupted while flushing", e);
            }
            catch (ExecutionException e) {
                throw new IOException("Exception while flushing", e);
            }
            catch (TimeoutException e) {
                throw new IOException(String.format("Timed out after [%d] millis during flushing", this.config.getFlushTimeOut()), e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @LifecycleStop
    public void close() throws IOException {
        AtomicBoolean atomicBoolean = this.started;
        synchronized (atomicBoolean) {
            this.flush();
            this.started.set(false);
            this.exec.shutdown();
        }
    }

    long getBufferedSize() {
        return this.bufferedSize.get();
    }

    ScheduledExecutorService getExec() {
        return this.exec;
    }

    private class EmittingRunnable
    implements Runnable {
        private final long instantiatedVersion;

        public EmittingRunnable(long instantiatedVersion) {
            this.instantiatedVersion = instantiatedVersion;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            long currVersion = HttpPostEmitter.this.version.get();
            try {
                List events;
                if (!HttpPostEmitter.this.started.get()) {
                    log.info("Not started, skipping...", new Object[0]);
                    return;
                }
                if (this.instantiatedVersion != currVersion) {
                    log.debug("Skipping because instantiatedVersion[%s] != currVersion[%s]", new Object[]{this.instantiatedVersion, currVersion});
                    return;
                }
                HttpPostEmitter.this.count.set(0);
                currVersion = HttpPostEmitter.this.version.incrementAndGet();
                AtomicReference atomicReference = HttpPostEmitter.this.eventsList;
                synchronized (atomicReference) {
                    events = HttpPostEmitter.this.eventsList.getAndSet(Lists.newLinkedList());
                }
                long eventsBytesCount = 0L;
                for (byte[] message : events) {
                    eventsBytesCount += (long)message.length;
                }
                boolean requeue = false;
                try {
                    List<List<byte[]>> batches = this.splitIntoBatches(events);
                    log.debug("Running export with version[%s], eventsList count[%s], bytes[%s], batches[%s]", new Object[]{this.instantiatedVersion, events.size(), eventsBytesCount, batches.size()});
                    Iterator<List<byte[]>> iterator = batches.iterator();
                    while (iterator.hasNext()) {
                        StatusResponseHolder response;
                        List<byte[]> batch = iterator.next();
                        log.debug("Sending batch to url[%s], batch.size[%,d]", new Object[]{HttpPostEmitter.this.url, batch.size()});
                        Request request = new Request(HttpMethod.POST, HttpPostEmitter.this.url).setContent("application/json", this.serializeBatch(batch));
                        if (HttpPostEmitter.this.config.getBasicAuthentication() != null) {
                            String[] parts = HttpPostEmitter.this.config.getBasicAuthentication().split(":", 2);
                            String user = parts[0];
                            String password = parts.length > 0 ? parts[1] : "";
                            request.setBasicAuthentication(user, password);
                        }
                        if ((response = (StatusResponseHolder)HttpPostEmitter.this.client.go(request, (HttpResponseHandler)new StatusResponseHandler(Charsets.UTF_8)).get()).getStatus().getCode() == 413) {
                            throw new ISE("Received HTTP status 413 from [%s]. Batch size of [%d] may be too large, try adjusting com.metamx.emitter.http.maxBatchSizeBatch", new Object[]{HttpPostEmitter.this.config.getRecipientBaseUrl(), HttpPostEmitter.this.config.getMaxBatchSize()});
                        }
                        if (response.getStatus().getCode() / 100 == 2) continue;
                        throw new ISE("Emissions of events not successful[%s], with message[%s].", new Object[]{response.getStatus(), response.getContent().trim()});
                    }
                }
                catch (Exception e) {
                    log.warn((Throwable)e, "Got exception when posting events to urlString[%s]. Resubmitting.", new Object[]{HttpPostEmitter.this.config.getRecipientBaseUrl()});
                    requeue = true;
                }
                catch (Throwable e) {
                    log.warn(e, "Got unrecoverable error when posting events to urlString[%s]. Dropping.", new Object[]{HttpPostEmitter.this.config.getRecipientBaseUrl()});
                    throw e;
                }
                finally {
                    if (requeue) {
                        AtomicReference e = HttpPostEmitter.this.eventsList;
                        synchronized (e) {
                            ((List)HttpPostEmitter.this.eventsList.get()).addAll(events);
                        }
                    } else {
                        HttpPostEmitter.this.bufferedSize.addAndGet(-eventsBytesCount);
                    }
                }
            }
            catch (Throwable e) {
                log.error(e, "Uncaught exception in EmittingRunnable.run()", new Object[0]);
            }
            HttpPostEmitter.this.exec.schedule(new EmittingRunnable(currVersion), HttpPostEmitter.this.config.getFlushMillis(), TimeUnit.MILLISECONDS);
        }

        private byte[] serializeBatch(List<byte[]> messages) {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            try {
                boolean first = true;
                baos.write(HttpPostEmitter.this.config.getBatchingStrategy().batchStart());
                for (byte[] message : messages) {
                    if (first) {
                        first = false;
                    } else {
                        baos.write(HttpPostEmitter.this.config.getBatchingStrategy().messageSeparator());
                    }
                    baos.write(message);
                }
                baos.write(HttpPostEmitter.this.config.getBatchingStrategy().batchEnd());
                return baos.toByteArray();
            }
            catch (IOException e) {
                throw Throwables.propagate((Throwable)e);
            }
        }

        private List<List<byte[]>> splitIntoBatches(List<byte[]> messages) {
            LinkedList batches = Lists.newLinkedList();
            ArrayList currentBatch = Lists.newArrayList();
            int currentBatchBytes = 0;
            for (byte[] message : messages) {
                int batchSizeAfterAddingMessage = HttpPostEmitter.this.config.getBatchingStrategy().batchStart().length + currentBatchBytes + HttpPostEmitter.this.config.getBatchingStrategy().messageSeparator().length + message.length + HttpPostEmitter.this.config.getBatchingStrategy().batchEnd().length;
                if (!currentBatch.isEmpty() && batchSizeAfterAddingMessage > HttpPostEmitter.this.config.getMaxBatchSize()) {
                    batches.add(currentBatch);
                    currentBatch = Lists.newArrayList();
                    currentBatchBytes = 0;
                }
                currentBatch.add(message);
                currentBatchBytes += message.length;
            }
            if (!currentBatch.isEmpty()) {
                batches.add(currentBatch);
            }
            return batches;
        }
    }
}

