/*
 * Decompiled with CFR 0.152.
 */
package kafka.producer.async;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kafka.producer.SyncProducer;
import kafka.producer.async.CallbackHandler;
import kafka.producer.async.EventHandler;
import kafka.producer.async.QueueItem;
import kafka.serializer.Encoder;
import kafka.utils.SystemTime$;
import org.apache.log4j.Logger;
import scala.ScalaObject;
import scala.collection.Seq;
import scala.collection.mutable.ListBuffer;
import scala.collection.mutable.StringBuilder;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
@ScalaSignature(bytes="\u0006\u0001\u0005Me!B\u0001\u0003\u0001\tA!A\u0005)s_\u0012,8-\u001a:TK:$G\u000b\u001b:fC\u0012T!a\u0001\u0003\u0002\u000b\u0005\u001c\u0018P\\2\u000b\u0005\u00151\u0011\u0001\u00039s_\u0012,8-\u001a:\u000b\u0003\u001d\tQa[1gW\u0006,\"!\u0003\u001c\u0014\u0007\u0001Q!\u0003\u0005\u0002\f!5\tAB\u0003\u0002\u000e\u001d\u0005!A.\u00198h\u0015\u0005y\u0011\u0001\u00026bm\u0006L!!\u0005\u0007\u0003\rQC'/Z1e!\t\u0019b#D\u0001\u0015\u0015\u0005)\u0012!B:dC2\f\u0017BA\f\u0015\u0005-\u00196-\u00197b\u001f\nTWm\u0019;\t\u0011e\u0001!Q1A\u0005\u0002m\t!\u0002\u001e5sK\u0006$g*Y7f\u0007\u0001)\u0012\u0001\b\t\u0003;\u0001r!a\u0005\u0010\n\u0005}!\u0012A\u0002)sK\u0012,g-\u0003\u0002\"E\t11\u000b\u001e:j]\u001eT!a\b\u000b\t\u0011\u0011\u0002!\u0011!Q\u0001\nq\t1\u0002\u001e5sK\u0006$g*Y7fA!Aa\u0005\u0001BC\u0002\u0013\u0005q%A\u0003rk\u0016,X-F\u0001)!\rIc\u0006M\u0007\u0002U)\u00111\u0006L\u0001\u000bG>t7-\u001e:sK:$(BA\u0017\u000f\u0003\u0011)H/\u001b7\n\u0005=R#!\u0004\"m_\u000e\\\u0017N\\4Rk\u0016,X\rE\u00022eQj\u0011AA\u0005\u0003g\t\u0011\u0011\"U;fk\u0016LE/Z7\u0011\u0005U2D\u0002\u0001\u0003\to\u0001!\t\u0011!b\u0001q\t\tA+\u0005\u0002:yA\u00111CO\u0005\u0003wQ\u0011qAT8uQ&tw\r\u0005\u0002\u0014{%\u0011a\b\u0006\u0002\u0004\u0003:L\b\u0002\u0003!\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0015\u0002\rE,X-^3!\u0011!\u0011\u0005A!b\u0001\n\u0003\u0019\u0015AC:fe&\fG.\u001b>feV\tA\tE\u0002F\u000fRj\u0011A\u0012\u0006\u0003\u0005\u001aI!\u0001\u0013$\u0003\u000f\u0015s7m\u001c3fe\"A!\n\u0001B\u0001B\u0003%A)A\u0006tKJL\u0017\r\\5{KJ\u0004\u0003\u0002\u0003'\u0001\u0005\u000b\u0007I\u0011A'\u0002%UtG-\u001a:ms&tw\r\u0015:pIV\u001cWM]\u000b\u0002\u001dB\u0011q\nU\u0007\u0002\t%\u0011\u0011\u000b\u0002\u0002\r'ft7\r\u0015:pIV\u001cWM\u001d\u0005\t'\u0002\u0011\t\u0011)A\u0005\u001d\u0006\u0019RO\u001c3fe2L\u0018N\\4Qe>$WoY3sA!AQ\u000b\u0001BC\u0002\u0013\u0005a+A\u0004iC:$G.\u001a:\u0016\u0003]\u00032!\r-5\u0013\tI&A\u0001\u0007Fm\u0016tG\u000fS1oI2,'\u000f\u0003\u0005\\\u0001\t\u0005\t\u0015!\u0003X\u0003!A\u0017M\u001c3mKJ\u0004\u0003\u0002C/\u0001\u0005\u000b\u0007I\u0011\u00010\u0002\u0015\r\u00147\u000eS1oI2,'/F\u0001`!\r\t\u0004\rN\u0005\u0003C\n\u0011qbQ1mY\n\f7m\u001b%b]\u0012dWM\u001d\u0005\tG\u0002\u0011\t\u0011)A\u0005?\u0006Y1MY6IC:$G.\u001a:!\u0011!)\u0007A!b\u0001\n\u00031\u0017!C9vKV,G+[7f+\u00059\u0007CA\ni\u0013\tIGC\u0001\u0003M_:<\u0007\u0002C6\u0001\u0005\u0003\u0005\u000b\u0011B4\u0002\u0015E,X-^3US6,\u0007\u0005\u0003\u0005n\u0001\t\u0015\r\u0011\"\u0001o\u0003%\u0011\u0017\r^2i'&TX-F\u0001p!\t\u0019\u0002/\u0003\u0002r)\t\u0019\u0011J\u001c;\t\u0011M\u0004!\u0011!Q\u0001\n=\f!BY1uG\"\u001c\u0016N_3!\u0011!)\bA!b\u0001\n\u00031\u0018aD:ikR$wn\u001e8D_6l\u0017M\u001c3\u0016\u0003qB\u0001\u0002\u001f\u0001\u0003\u0002\u0003\u0006I\u0001P\u0001\u0011g\",H\u000fZ8x]\u000e{W.\\1oI\u0002BQA\u001f\u0001\u0005\u0002m\fa\u0001P5oSRtD\u0003\u0005?~}~\f\t!a\u0001\u0002\u0006\u0005\u001d\u0011\u0011BA\u0006!\r\t\u0004\u0001\u000e\u0005\u00063e\u0004\r\u0001\b\u0005\u0006Me\u0004\r\u0001\u000b\u0005\u0006\u0005f\u0004\r\u0001\u0012\u0005\u0006\u0019f\u0004\rA\u0014\u0005\u0006+f\u0004\ra\u0016\u0005\u0006;f\u0004\ra\u0018\u0005\u0006Kf\u0004\ra\u001a\u0005\u0006[f\u0004\ra\u001c\u0005\u0006kf\u0004\r\u0001\u0010\u0005\n\u0003\u001f\u0001!\u0019!C\u0005\u0003#\ta\u0001\\8hO\u0016\u0014XCAA\n!\u0011\t)\"a\t\u000e\u0005\u0005]!\u0002BA\r\u00037\tQ\u0001\\8hi)TA!!\b\u0002 \u00051\u0011\r]1dQ\u0016T!!!\t\u0002\u0007=\u0014x-\u0003\u0003\u0002&\u0005]!A\u0002'pO\u001e,'\u000f\u0003\u0005\u0002*\u0001\u0001\u000b\u0011BA\n\u0003\u001dawnZ4fe\u0002B\u0011\"!\f\u0001\u0001\u0004%I!a\f\u0002\u000fI,hN\\5oOV\u0011\u0011\u0011\u0007\t\u0004'\u0005M\u0012bAA\u001b)\t9!i\\8mK\u0006t\u0007\"CA\u001d\u0001\u0001\u0007I\u0011BA\u001e\u0003-\u0011XO\u001c8j]\u001e|F%Z9\u0015\t\u0005u\u00121\t\t\u0004'\u0005}\u0012bAA!)\t!QK\\5u\u0011)\t)%a\u000e\u0002\u0002\u0003\u0007\u0011\u0011G\u0001\u0004q\u0012\n\u0004\u0002CA%\u0001\u0001\u0006K!!\r\u0002\u0011I,hN\\5oO\u0002B\u0011\"!\u0014\u0001\u0005\u0004%I!a\u0014\u0002\u001bMDW\u000f\u001e3po:d\u0015\r^2i+\t\t\t\u0006E\u0002*\u0003'J1!!\u0016+\u00059\u0019u.\u001e8u\t><h\u000eT1uG\"D\u0001\"!\u0017\u0001A\u0003%\u0011\u0011K\u0001\u000fg\",H\u000fZ8x]2\u000bGo\u00195!\u0011\u001d\ti\u0006\u0001C!\u0003?\n1A];o)\t\ti\u0004C\u0004\u0002d\u0001!\t!!\u001a\u0002\u001b\u0005<\u0018-\u001b;TQV$Hm\\<o+\t\ti\u0004C\u0004\u0002j\u0001!\t!!\u001a\u0002\u0011MDW\u000f\u001e3po:Dq!!\u001c\u0001\t\u0013\ty'A\u0007qe>\u001cWm]:Fm\u0016tGo\u001d\u000b\u0003\u0003c\u0002R!a\u001d\u0002\u0004BrA!!\u001e\u0002\u00009!\u0011qOA?\u001b\t\tIHC\u0002\u0002|i\ta\u0001\u0010:p_Rt\u0014\"A\u000b\n\u0007\u0005\u0005E#A\u0004qC\u000e\\\u0017mZ3\n\t\u0005\u0015\u0015q\u0011\u0002\u0004'\u0016\f(bAAA)!9\u00111\u0012\u0001\u0005\u0002\u00055\u0015a\u0003;ssR{\u0007*\u00198eY\u0016$B!!\u0010\u0002\u0010\"A\u0011\u0011SAE\u0001\u0004\t\t(\u0001\u0004fm\u0016tGo\u001d")
public class ProducerSendThread<T>
extends Thread
implements ScalaObject {
    private final String threadName;
    private final BlockingQueue<QueueItem<T>> queue;
    private final Encoder<T> serializer;
    private final SyncProducer underlyingProducer;
    private final EventHandler<T> handler;
    private final CallbackHandler<T> cbkHandler;
    private final long queueTime;
    private final int batchSize;
    private final Object shutdownCommand;
    private final Logger logger;
    private boolean running;
    private final CountDownLatch shutdownLatch;

    public String threadName() {
        return this.threadName;
    }

    public BlockingQueue<QueueItem<T>> queue() {
        return this.queue;
    }

    public Encoder<T> serializer() {
        return this.serializer;
    }

    public SyncProducer underlyingProducer() {
        return this.underlyingProducer;
    }

    public EventHandler<T> handler() {
        return this.handler;
    }

    public CallbackHandler<T> cbkHandler() {
        return this.cbkHandler;
    }

    public long queueTime() {
        return this.queueTime;
    }

    public int batchSize() {
        return this.batchSize;
    }

    public Object shutdownCommand() {
        return this.shutdownCommand;
    }

    private Logger logger() {
        return this.logger;
    }

    private boolean running() {
        return this.running;
    }

    private void running_$eq(boolean bl) {
        this.running = bl;
    }

    private CountDownLatch shutdownLatch() {
        return this.shutdownLatch;
    }

    @Override
    public void run() {
        try {
            Seq<QueueItem<T>> remainingEvents = this.processEvents();
            if (this.logger().isDebugEnabled()) {
                this.logger().debug((Object)new StringBuilder().append((Object)"Remaining events = ").append((Object)BoxesRunTime.boxToInteger((int)remainingEvents.size())).toString());
            }
            if (remainingEvents.size() > 0) {
                this.tryToHandle(remainingEvents);
            }
        }
        catch (Exception exception) {
            this.logger().error((Object)"Error in sending events: ", (Throwable)exception);
        }
        return;
        finally {
            this.shutdownLatch().countDown();
        }
    }

    public void awaitShutdown() {
        this.shutdownLatch().await();
    }

    public void shutdown() {
        this.running_$eq(false);
        this.handler().close();
        if (this.logger().isDebugEnabled()) {
            this.logger().debug((Object)"Shutdown thread complete");
        }
    }

    private Seq<QueueItem<T>> processEvents() {
        long now;
        long lastSend = now = SystemTime$.MODULE$.milliseconds();
        ListBuffer events = new ListBuffer();
        while (this.running()) {
            boolean full;
            BoxedUnit boxedUnit;
            QueueItem<T> current = this.queue().poll(package$.MODULE$.max(0L, this.queueTime() - (lastSend - now)), TimeUnit.MILLISECONDS);
            if (current != null && BoxesRunTime.equals(current.getData(), (Object)this.shutdownCommand())) {
                return events;
            }
            if (current == null || current.getData() == null) {
                boxedUnit = BoxedUnit.UNIT;
            } else if (this.cbkHandler() == null) {
                boxedUnit = events.$plus$eq(current);
            } else {
                events = (ListBuffer)events.$plus$plus(this.cbkHandler().afterDequeuingExistingData(current));
                boxedUnit = BoxedUnit.UNIT;
            }
            now = SystemTime$.MODULE$.milliseconds();
            boolean expired = now - lastSend > this.queueTime();
            boolean bl = full = events.size() >= this.batchSize();
            if (!expired && !full) continue;
            if (this.logger().isDebugEnabled() && full) {
                this.logger().debug((Object)"Batch full. Sending..");
            }
            if (this.logger().isDebugEnabled() && expired) {
                this.logger().debug((Object)"Queue time reached. Sending..");
            }
            this.tryToHandle((Seq<QueueItem<T>>)events);
            lastSend = now;
            events = new ListBuffer();
        }
        return events;
    }

    public void tryToHandle(Seq<QueueItem<T>> events) {
        try {
            if (this.logger().isDebugEnabled()) {
                this.logger().debug((Object)new StringBuilder().append((Object)"Handling ").append((Object)BoxesRunTime.boxToInteger((int)events.size())).append((Object)" events").toString());
            }
            this.handler().handle(events, this.underlyingProducer(), this.serializer());
        }
        catch (Exception exception) {
            this.logger().error((Object)new StringBuilder().append((Object)"Error in handling batch of ").append((Object)BoxesRunTime.boxToInteger((int)events.size())).append((Object)" events").toString(), (Throwable)exception);
        }
    }

    public ProducerSendThread(String threadName, BlockingQueue<QueueItem<T>> queue, Encoder<T> serializer, SyncProducer underlyingProducer, EventHandler<T> handler, CallbackHandler<T> cbkHandler, long queueTime, int batchSize, Object shutdownCommand) {
        this.threadName = threadName;
        this.queue = queue;
        this.serializer = serializer;
        this.underlyingProducer = underlyingProducer;
        this.handler = handler;
        this.cbkHandler = cbkHandler;
        this.queueTime = queueTime;
        this.batchSize = batchSize;
        this.shutdownCommand = shutdownCommand;
        super(threadName);
        this.logger = Logger.getLogger(ProducerSendThread.class);
        this.running = true;
        this.shutdownLatch = new CountDownLatch(1);
    }
}

