/*
 * Decompiled with CFR 0.152.
 */
package kafka.producer;

import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import junit.framework.Assert;
import kafka.api.PartitionMetadata;
import kafka.api.PartitionMetadata$;
import kafka.api.ProducerRequest;
import kafka.api.ProducerResponse;
import kafka.api.ProducerResponseStatus;
import kafka.api.TopicMetadata;
import kafka.api.TopicMetadata$;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping$;
import kafka.common.FailedToSendMessageException;
import kafka.common.QueueFullException;
import kafka.common.TopicAndPartition;
import kafka.message.ByteBufferMessageSet;
import kafka.message.CompressionCodec;
import kafka.message.Message;
import kafka.message.NoCompressionCodec$;
import kafka.producer.KeyedMessage;
import kafka.producer.NegativePartitioner;
import kafka.producer.NegativePartitioner$;
import kafka.producer.Partitioner;
import kafka.producer.Producer;
import kafka.producer.ProducerClosedException;
import kafka.producer.ProducerConfig;
import kafka.producer.ProducerPool;
import kafka.producer.SyncProducer;
import kafka.producer.async.DefaultEventHandler;
import kafka.producer.async.EventHandler;
import kafka.producer.async.ProducerSendThread;
import kafka.serializer.DefaultEncoder;
import kafka.serializer.Encoder;
import kafka.serializer.NullEncoder;
import kafka.serializer.NullEncoder$;
import kafka.serializer.StringEncoder;
import kafka.serializer.StringEncoder$;
import kafka.server.KafkaConfig;
import kafka.utils.FixedValuePartitioner;
import kafka.utils.FixedValuePartitioner$;
import kafka.utils.IntEncoder;
import kafka.utils.IntEncoder$;
import kafka.utils.TestUtils$;
import kafka.utils.Utils$;
import org.easymock.EasyMock;
import org.junit.Test;
import org.scalatest.junit.JUnit3Suite;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.JavaConversions$;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.StringBuilder;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

@ScalaSignature(bytes="\u0006\u0001\u0005\re\u0001B\u0001\u0003\u0001\u001d\u0011\u0011#Q:z]\u000e\u0004&o\u001c3vG\u0016\u0014H+Z:u\u0015\t\u0019A!\u0001\u0005qe>$WoY3s\u0015\u0005)\u0011!B6bM.\f7\u0001A\n\u0003\u0001!\u0001\"!\u0003\t\u000e\u0003)Q!a\u0003\u0007\u0002\u000b),h.\u001b;\u000b\u00055q\u0011!C:dC2\fG/Z:u\u0015\u0005y\u0011aA8sO&\u0011\u0011C\u0003\u0002\f\u0015Vs\u0017\u000e^\u001aTk&$X\rC\u0003\u0014\u0001\u0011\u0005A#\u0001\u0004=S:LGO\u0010\u000b\u0002+A\u0011a\u0003A\u0007\u0002\u0005!9\u0001\u0004\u0001b\u0001\n\u0003I\u0012!\u00029s_B\u001cX#\u0001\u000e\u0011\u0007m)\u0003F\u0004\u0002\u001dE9\u0011Q\u0004I\u0007\u0002=)\u0011qDB\u0001\u0007yI|w\u000e\u001e \n\u0003\u0005\nQa]2bY\u0006L!a\t\u0013\u0002\u000fA\f7m[1hK*\t\u0011%\u0003\u0002'O\t!A*[:u\u0015\t\u0019C\u0005\u0005\u0002*]5\t!F\u0003\u0002,Y\u0005!Q\u000f^5m\u0015\u0005i\u0013\u0001\u00026bm\u0006L!a\f\u0016\u0003\u0015A\u0013x\u000e]3si&,7\u000f\u0003\u00042\u0001\u0001\u0006IAG\u0001\u0007aJ|\u0007o\u001d\u0011\t\u000fM\u0002!\u0019!C\u0001i\u000591m\u001c8gS\u001e\u001cX#A\u001b\u0011\u0007YZD(D\u00018\u0015\tA\u0014(A\u0005j[6,H/\u00192mK*\u0011!\bJ\u0001\u000bG>dG.Z2uS>t\u0017B\u0001\u00148!\ti\u0004)D\u0001?\u0015\tyD!\u0001\u0004tKJ4XM]\u0005\u0003\u0003z\u00121bS1gW\u0006\u001cuN\u001c4jO\"11\t\u0001Q\u0001\nU\n\u0001bY8oM&<7\u000f\t\u0005\u0006\u000b\u0002!\tER\u0001\u0006g\u0016$X\u000b\u001d\u000b\u0002\u000fB\u0011\u0001*S\u0007\u0002I%\u0011!\n\n\u0002\u0005+:LG\u000fC\u0003M\u0001\u0011\u0005c)\u0001\u0005uK\u0006\u0014Hi\\<o\u0011\u0015q\u0005\u0001\"\u0001G\u0003U!Xm\u001d;Qe>$WoY3s#V,W/Z*ju\u0016D#!\u0014)\u0011\u0005E\u001bV\"\u0001*\u000b\u0005-q\u0011B\u0001+S\u0005\u0011!Vm\u001d;\t\u000bY\u0003A\u0011\u0001$\u0002-Q,7\u000f\u001e)s_\u0012,8-Z!gi\u0016\u00148\t\\8tK\u0012D#!\u0016)\t\u000be\u0003A\u0011\u0001$\u0002\u001bQ,7\u000f\u001e\"bi\u000eD7+\u001b>fQ\tA\u0006\u000bC\u0003]\u0001\u0011\u0005a)\u0001\u000buKN$\u0018+^3vKRKW.Z#ya&\u0014X\r\u001a\u0015\u00037BCQa\u0018\u0001\u0005\u0002\u0019\u000bQ\u0004^3tiB\u000b'\u000f^5uS>t\u0017I\u001c3D_2d\u0017\r^3Fm\u0016tGo\u001d\u0015\u0003=BCQA\u0019\u0001\u0005\u0002\u0019\u000b1\u0003^3tiN+'/[1mSj,WI^3oiND#!\u0019)\t\u000b\u0015\u0004A\u0011\u0001$\u0002)Q,7\u000f^%om\u0006d\u0017\u000e\u001a)beRLG/[8oQ\t!\u0007\u000bC\u0003i\u0001\u0011\u0005a)\u0001\u0007uKN$hj\u001c\"s_.,'\u000f\u000b\u0002h!\")1\u000e\u0001C\u0001\r\u00069B/Z:u\u0013:\u001cw.\u001c9bi&\u0014G.Z#oG>$WM\u001d\u0015\u0003UBCQA\u001c\u0001\u0005\u0002\u0019\u000bQ\u0003^3tiJ\u000bg\u000eZ8n!\u0006\u0014H/\u001b;j_:,'\u000f\u000b\u0002n!\")\u0011\u000f\u0001C\u0001\r\u0006AB/Z:u\r\u0006LG.\u001a3TK:$'+\u001a;ss2{w-[2)\u0005A\u0004\u0006\"\u0002;\u0001\t\u00031\u0015\u0001\u0005;fgRT\u0015M^1Qe>$WoY3sQ\t\u0019\b\u000bC\u0003x\u0001\u0011\u0005a)\u0001\ruKN$\u0018J\u001c<bY&$7i\u001c8gS\u001e,(/\u0019;j_:D#A\u001e)\t\u000bi\u0004A\u0011A>\u0002\u001d\u001d,G\u000f\u0015:pIV\u001cW\rR1uCR\u0019A0a\u0005\u0011\u0007mix0\u0003\u0002\u007fO\t\u00191+Z9\u0011\u000fY\t\t!!\u0002\u0002\u0006%\u0019\u00111\u0001\u0002\u0003\u0019-+\u00170\u001a3NKN\u001c\u0018mZ3\u0011\t\u0005\u001d\u0011Q\u0002\b\u0004\u0011\u0006%\u0011bAA\u0006I\u00051\u0001K]3eK\u001aLA!a\u0004\u0002\u0012\t11\u000b\u001e:j]\u001eT1!a\u0003%\u0011\u001d\t)\"\u001fa\u0001\u0003/\tqA\\#wK:$8\u000fE\u0002I\u00033I1!a\u0007%\u0005\rIe\u000e\u001e\u0005\b\u0003?\u0001A\u0011BA\u0011\u0003A9W\r\u001e+pa&\u001cW*\u001a;bI\u0006$\u0018\r\u0006\u0007\u0002$\u0005=\u00121GA\u001c\u0003w\ty\u0004\u0005\u0003\u0002&\u0005-RBAA\u0014\u0015\r\tI\u0003B\u0001\u0004CBL\u0017\u0002BA\u0017\u0003O\u0011Q\u0002V8qS\u000elU\r^1eCR\f\u0007\u0002CA\u0019\u0003;\u0001\r!!\u0002\u0002\u000bQ|\u0007/[2\t\u0011\u0005U\u0012Q\u0004a\u0001\u0003/\t\u0011\u0002]1si&$\u0018n\u001c8\t\u0011\u0005e\u0012Q\u0004a\u0001\u0003/\t\u0001B\u0019:pW\u0016\u0014\u0018\n\u001a\u0005\t\u0003{\ti\u00021\u0001\u0002\u0006\u0005Q!M]8lKJDun\u001d;\t\u0011\u0005\u0005\u0013Q\u0004a\u0001\u0003/\t!B\u0019:pW\u0016\u0014\bk\u001c:u\u0011\u001d\ty\u0002\u0001C\u0005\u0003\u000b\"B\"a\t\u0002H\u0005%\u0013QJA(\u0003#B\u0001\"!\r\u0002D\u0001\u0007\u0011Q\u0001\u0005\t\u0003k\t\u0019\u00051\u0001\u0002LA!1$`A\f\u0011!\tI$a\u0011A\u0002\u0005]\u0001\u0002CA\u001f\u0003\u0007\u0002\r!!\u0002\t\u0011\u0005\u0005\u00131\ta\u0001\u0003/Aq!!\u0016\u0001\t\u0003\t9&A\u0007nKN\u001c\u0018mZ3t)>\u001cV\r\u001e\u000b\u0005\u00033\n)\u0007\u0005\u0003\u0002\\\u0005\u0005TBAA/\u0015\r\ty\u0006B\u0001\b[\u0016\u001c8/Y4f\u0013\u0011\t\u0019'!\u0018\u0003)\tKH/\u001a\"vM\u001a,'/T3tg\u0006<WmU3u\u0011!\t9'a\u0015A\u0002\u0005%\u0014\u0001C7fgN\fw-Z:\u0011\tmi\u0018Q\u0001\u0005\b\u0003+\u0002A\u0011AA7)\u0019\tI&a\u001c\u0002\u0000!A\u0011\u0011OA6\u0001\u0004\t\u0019(A\u0002lKf\u0004R\u0001SA;\u0003sJ1!a\u001e%\u0005\u0015\t%O]1z!\rA\u00151P\u0005\u0004\u0003{\"#\u0001\u0002\"zi\u0016D\u0001\"a\u001a\u0002l\u0001\u0007\u0011\u0011\u0011\t\u00057u\f\u0019\b")
public class AsyncProducerTest
extends JUnit3Suite {
    private final List<Properties> props = TestUtils$.MODULE$.createBrokerConfigs(1, TestUtils$.MODULE$.createBrokerConfigs$default$2());
    private final List<KafkaConfig> configs = (List)this.props().map((Function1)new Serializable(this){
        public static final long serialVersionUID = 0L;

        public final KafkaConfig apply(Properties p) {
            return new KafkaConfig(p);
        }
    }, List$.MODULE$.canBuildFrom());

    public List<Properties> props() {
        return this.props;
    }

    public List<KafkaConfig> configs() {
        return this.configs;
    }

    public void setUp() {
        super.setUp();
    }

    public void tearDown() {
        super.tearDown();
    }

    @Test
    public void testProducerQueueSize() {
        EventHandler<String, String> mockEventHandler = new EventHandler<String, String>(this){

            public void handle(Seq<KeyedMessage<String, String>> events) {
                Thread.sleep(500L);
            }

            public void close() {
            }
        };
        Properties props = new Properties();
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq<KafkaConfig>)this.configs()));
        props.put("producer.type", "async");
        props.put("queue.buffering.max.messages", "10");
        props.put("batch.num.messages", "1");
        props.put("queue.enqueue.timeout.ms", "0");
        ProducerConfig config = new ProducerConfig(props);
        Seq<KeyedMessage<String, String>> produceData = this.getProduceData(12);
        Producer producer = new Producer(config, (EventHandler)mockEventHandler);
        try {
            try {
                producer.send(produceData);
                throw this.fail("Queue should be full");
            }
            catch (QueueFullException queueFullException) {
            }
        }
        finally {
            producer.close();
        }
    }

    @Test
    public void testProduceAfterClosed() {
        Seq<KeyedMessage<String, String>> produceData = this.getProduceData(10);
        Producer producer = TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq<KafkaConfig>)this.configs()), StringEncoder.class.getName(), TestUtils$.MODULE$.createProducer$default$3(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5());
        producer.close();
        try {
            producer.send(produceData);
            throw this.fail("should complain that producer is already closed");
        }
        catch (ProducerClosedException producerClosedException) {
            return;
        }
    }

    @Test
    public void testBatchSize() {
        Seq<KeyedMessage<String, String>> producerDataList = this.getProduceData(10);
        DefaultEventHandler mockHandler = (DefaultEventHandler)EasyMock.createStrictMock(DefaultEventHandler.class);
        mockHandler.handle((Seq)producerDataList.take(5));
        EasyMock.expectLastCall();
        mockHandler.handle((Seq)producerDataList.takeRight(5));
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{mockHandler});
        LinkedBlockingQueue queue = new LinkedBlockingQueue(10);
        ProducerSendThread producerSendThread = new ProducerSendThread("thread1", queue, (EventHandler)mockHandler, Integer.MAX_VALUE, 5, "");
        producerSendThread.start();
        producerDataList.foreach((Function1)new Serializable(this, queue){
            public static final long serialVersionUID = 0L;
            private final LinkedBlockingQueue queue$1;

            public final void apply(KeyedMessage<String, String> producerData) {
                this.queue$1.put(producerData);
            }
            {
                this.queue$1 = queue$1;
            }
        });
        producerSendThread.shutdown();
        EasyMock.verify((Object[])new Object[]{mockHandler});
    }

    @Test
    public void testQueueTimeExpired() {
        Seq<KeyedMessage<String, String>> producerDataList = this.getProduceData(2);
        DefaultEventHandler mockHandler = (DefaultEventHandler)EasyMock.createStrictMock(DefaultEventHandler.class);
        mockHandler.handle(producerDataList);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{mockHandler});
        int queueExpirationTime = 200;
        LinkedBlockingQueue queue = new LinkedBlockingQueue(10);
        ProducerSendThread producerSendThread = new ProducerSendThread("thread1", queue, (EventHandler)mockHandler, (long)queueExpirationTime, 5, "");
        producerSendThread.start();
        producerDataList.foreach((Function1)new Serializable(this, queue){
            public static final long serialVersionUID = 0L;
            private final LinkedBlockingQueue queue$2;

            public final void apply(KeyedMessage<String, String> producerData) {
                this.queue$2.put(producerData);
            }
            {
                this.queue$2 = queue$2;
            }
        });
        Thread.sleep(queueExpirationTime + 100);
        EasyMock.verify((Object[])new Object[]{mockHandler});
        producerSendThread.shutdown();
    }

    @Test
    public void testPartitionAndCollateEvents() {
        ArrayBuffer producerDataList = new ArrayBuffer();
        producerDataList.append((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage("topic1", (Object)BoxesRunTime.boxToInteger((int)0), (Object)new Message("msg1".getBytes()))}));
        producerDataList.append((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage("topic2", (Object)BoxesRunTime.boxToInteger((int)-99), (Object)BoxesRunTime.boxToInteger((int)1), (Object)new Message("msg2".getBytes()))}));
        producerDataList.append((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage("topic1", (Object)BoxesRunTime.boxToInteger((int)2), (Object)new Message("msg3".getBytes()))}));
        producerDataList.append((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage("topic1", (Object)BoxesRunTime.boxToInteger((int)-101), (Object)BoxesRunTime.boxToInteger((int)3), (Object)new Message("msg4".getBytes()))}));
        producerDataList.append((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage("topic2", (Object)BoxesRunTime.boxToInteger((int)4), (Object)new Message("msg5".getBytes()))}));
        Properties props = new Properties();
        props.put("metadata.broker.list", TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq<KafkaConfig>)this.configs()));
        Broker broker1 = new Broker(0, "localhost", 9092);
        Broker broker2 = new Broker(1, "localhost", 9093);
        PartitionMetadata partition1Metadata = new PartitionMetadata(0, (Option)new Some((Object)broker1), (Seq)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Broker[]{broker1, broker2})), PartitionMetadata$.MODULE$.$lessinit$greater$default$4(), PartitionMetadata$.MODULE$.$lessinit$greater$default$5());
        PartitionMetadata partition2Metadata = new PartitionMetadata(1, (Option)new Some((Object)broker2), (Seq)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Broker[]{broker1, broker2})), PartitionMetadata$.MODULE$.$lessinit$greater$default$4(), PartitionMetadata$.MODULE$.$lessinit$greater$default$5());
        TopicMetadata topic1Metadata = new TopicMetadata("topic1", (Seq)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new PartitionMetadata[]{partition1Metadata, partition2Metadata})), TopicMetadata$.MODULE$.$lessinit$greater$default$3());
        TopicMetadata topic2Metadata = new TopicMetadata("topic2", (Seq)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new PartitionMetadata[]{partition1Metadata, partition2Metadata})), TopicMetadata$.MODULE$.$lessinit$greater$default$3());
        HashMap topicPartitionInfos = new HashMap();
        topicPartitionInfos.put((Object)"topic1", (Object)topic1Metadata);
        topicPartitionInfos.put((Object)"topic2", (Object)topic2Metadata);
        Partitioner intPartitioner = new Partitioner(this){

            public int partition(Object key, int numPartitions) {
                return BoxesRunTime.unboxToInt((Object)key) % numPartitions;
            }
        };
        ProducerConfig config = new ProducerConfig(props);
        ProducerPool producerPool = new ProducerPool(config);
        DefaultEventHandler handler = new DefaultEventHandler(config, intPartitioner, null, (Encoder)new IntEncoder(IntEncoder$.MODULE$.$lessinit$greater$default$1()), producerPool, topicPartitionInfos);
        ArrayBuffer topic1Broker1Data = (ArrayBuffer)ArrayBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage("topic1", (Object)BoxesRunTime.boxToInteger((int)0), (Object)new Message("msg1".getBytes())), new KeyedMessage("topic1", (Object)BoxesRunTime.boxToInteger((int)2), (Object)new Message("msg3".getBytes()))}));
        ArrayBuffer topic1Broker2Data = (ArrayBuffer)ArrayBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage("topic1", (Object)BoxesRunTime.boxToInteger((int)-101), (Object)BoxesRunTime.boxToInteger((int)3), (Object)new Message("msg4".getBytes()))}));
        ArrayBuffer topic2Broker1Data = (ArrayBuffer)ArrayBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage("topic2", (Object)BoxesRunTime.boxToInteger((int)4), (Object)new Message("msg5".getBytes()))}));
        ArrayBuffer topic2Broker2Data = (ArrayBuffer)ArrayBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage("topic2", (Object)BoxesRunTime.boxToInteger((int)-99), (Object)BoxesRunTime.boxToInteger((int)1), (Object)new Message("msg2".getBytes()))}));
        Some expectedResult = new Some((Object)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicAndPartition("topic1", 0)), (Object)topic1Broker1Data), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicAndPartition("topic2", 0)), (Object)topic2Broker1Data)}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)1)), (Object)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicAndPartition("topic1", 1)), (Object)topic1Broker2Data), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicAndPartition("topic2", 1)), (Object)topic2Broker2Data)})))})));
        Option actualResult = handler.partitionAndCollate((Seq)producerDataList);
        Assert.assertEquals((Object)expectedResult, (Object)actualResult);
    }

    @Test
    public void testSerializeEvents() {
        Seq produceData = (Seq)TestUtils$.MODULE$.getMsgStrings(5).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final KeyedMessage<String, String> apply(String m) {
                return new KeyedMessage("topic1", (Object)m);
            }
        }, Seq$.MODULE$.canBuildFrom());
        Properties props = new Properties();
        props.put("metadata.broker.list", TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq<KafkaConfig>)this.configs()));
        ProducerConfig config = new ProducerConfig(props);
        TopicMetadata topic1Metadata = this.getTopicMetadata("topic1", 0, 0, "localhost", 9092);
        HashMap topicPartitionInfos = new HashMap();
        topicPartitionInfos.put((Object)"topic1", (Object)topic1Metadata);
        ProducerPool producerPool = new ProducerPool(config);
        DefaultEventHandler handler = new DefaultEventHandler(config, null, (Encoder)new StringEncoder(StringEncoder$.MODULE$.$lessinit$greater$default$1()), (Encoder)new StringEncoder(StringEncoder$.MODULE$.$lessinit$greater$default$1()), producerPool, topicPartitionInfos);
        Seq serializedData = handler.serialize(produceData);
        Seq deserializedData = (Seq)serializedData.map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final KeyedMessage<String, String> apply(KeyedMessage<String, Message> d) {
                return new KeyedMessage(d.topic(), (Object)Utils$.MODULE$.readString(((Message)d.message()).payload(), Utils$.MODULE$.readString$default$2()));
            }
        }, Seq$.MODULE$.canBuildFrom());
        Seq streamedSerializedData = handler.serialize((Seq)package$.MODULE$.Stream().apply(produceData));
        Seq deserializedStreamData = (Seq)streamedSerializedData.map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final KeyedMessage<String, String> apply(KeyedMessage<String, Message> d) {
                return new KeyedMessage(d.topic(), (Object)Utils$.MODULE$.readString(((Message)d.message()).payload(), Utils$.MODULE$.readString$default$2()));
            }
        }, Seq$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.checkEquals(produceData.iterator(), deserializedData.iterator());
        TestUtils$.MODULE$.checkEquals(produceData.iterator(), deserializedStreamData.iterator());
    }

    @Test
    public void testInvalidPartition() {
        ArrayBuffer producerDataList = new ArrayBuffer();
        producerDataList.append((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage("topic1", (Object)"key1", (Object)new Message("msg1".getBytes()))}));
        Properties props = new Properties();
        props.put("metadata.broker.list", TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq<KafkaConfig>)this.configs()));
        ProducerConfig config = new ProducerConfig(props);
        TopicMetadata topic1Metadata = this.getTopicMetadata("topic1", 0, 0, "localhost", 9092);
        HashMap topicPartitionInfos = new HashMap();
        topicPartitionInfos.put((Object)"topic1", (Object)topic1Metadata);
        ProducerPool producerPool = new ProducerPool(config);
        DefaultEventHandler handler = new DefaultEventHandler(config, (Partitioner)new NegativePartitioner(NegativePartitioner$.MODULE$.$lessinit$greater$default$1()), null, null, producerPool, topicPartitionInfos);
        try {
            handler.partitionAndCollate((Seq)producerDataList);
            return;
        }
        catch (Throwable throwable) {
            throw this.fail("Should not throw any exception");
        }
    }

    @Test
    public void testNoBroker() {
        Properties props = new Properties();
        props.put("metadata.broker.list", TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq<KafkaConfig>)this.configs()));
        ProducerConfig config = new ProducerConfig(props);
        TopicMetadata topic1Metadata = new TopicMetadata("topic1", (Seq)Seq$.MODULE$.empty(), TopicMetadata$.MODULE$.$lessinit$greater$default$3());
        HashMap topicPartitionInfos = new HashMap();
        topicPartitionInfos.put((Object)"topic1", (Object)topic1Metadata);
        ProducerPool producerPool = new ProducerPool(config);
        ArrayBuffer producerDataList = new ArrayBuffer();
        producerDataList.append((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage("topic1", (Object)"msg1")}));
        DefaultEventHandler handler = new DefaultEventHandler(config, null, (Encoder)new StringEncoder(StringEncoder$.MODULE$.$lessinit$greater$default$1()), (Encoder)new StringEncoder(StringEncoder$.MODULE$.$lessinit$greater$default$1()), producerPool, topicPartitionInfos);
        try {
            handler.handle((Seq)producerDataList);
            throw this.fail("Should fail with FailedToSendMessageException");
        }
        catch (FailedToSendMessageException failedToSendMessageException) {
            return;
        }
    }

    @Test
    public void testIncompatibleEncoder() {
        Properties props = new Properties();
        props.put("message.send.max.retries", "0");
        String x$4 = TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq<KafkaConfig>)this.configs());
        String x$5 = DefaultEncoder.class.getName();
        String x$6 = DefaultEncoder.class.getName();
        Properties x$7 = props;
        String x$8 = TestUtils$.MODULE$.createProducer$default$4();
        Producer producer = TestUtils$.MODULE$.createProducer(x$4, x$5, x$6, x$8, x$7);
        try {
            try {
                producer.send(this.getProduceData(1));
                throw this.fail("Should fail with ClassCastException due to incompatible Encoder");
            }
            catch (ClassCastException classCastException) {
            }
        }
        finally {
            producer.close();
        }
    }

    @Test
    public void testRandomPartitioner() {
        Option partitionedDataOpt;
        Properties props = new Properties();
        props.put("metadata.broker.list", TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq<KafkaConfig>)this.configs()));
        ProducerConfig config = new ProducerConfig(props);
        TopicMetadata topic1Metadata = this.getTopicMetadata("topic1", 0, 0, "localhost", 9092);
        TopicMetadata topic2Metadata = this.getTopicMetadata("topic2", 0, 0, "localhost", 9092);
        HashMap topicPartitionInfos = new HashMap();
        topicPartitionInfos.put((Object)"topic1", (Object)topic1Metadata);
        topicPartitionInfos.put((Object)"topic2", (Object)topic2Metadata);
        ProducerPool producerPool = new ProducerPool(config);
        DefaultEventHandler handler = new DefaultEventHandler(config, null, null, null, producerPool, topicPartitionInfos);
        ArrayBuffer producerDataList = new ArrayBuffer();
        producerDataList.append((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage("topic1", (Object)new Message("msg1".getBytes()))}));
        producerDataList.append((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage("topic2", (Object)new Message("msg2".getBytes()))}));
        producerDataList.append((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage("topic1", (Object)new Message("msg3".getBytes()))}));
        Option option = partitionedDataOpt = handler.partitionAndCollate((Seq)producerDataList);
        if (option instanceof Some) {
            Some some = (Some)option;
            Map partitionedData = (Map)some.x();
            partitionedData.withFilter((Function1)new Serializable(this){
                public static final long serialVersionUID = 0L;

                public final boolean apply(Tuple2<Object, scala.collection.mutable.Map<TopicAndPartition, Seq<KeyedMessage<String, Message>>>> check$ifrefutable$1) {
                    Tuple2<Object, scala.collection.mutable.Map<TopicAndPartition, Seq<KeyedMessage<String, Message>>>> tuple2 = check$ifrefutable$1;
                    boolean bl = tuple2 != null;
                    return bl;
                }
            }).foreach((Function1)new Serializable(this){
                public static final long serialVersionUID = 0L;

                public final void apply(Tuple2<Object, scala.collection.mutable.Map<TopicAndPartition, Seq<KeyedMessage<String, Message>>>> x$2) {
                    Tuple2<Object, scala.collection.mutable.Map<TopicAndPartition, Seq<KeyedMessage<String, Message>>>> tuple2 = x$2;
                    if (tuple2 != null) {
                        scala.collection.mutable.Map dataPerBroker = (scala.collection.mutable.Map)tuple2._2();
                        dataPerBroker.withFilter((Function1)new Serializable(this){
                            public static final long serialVersionUID = 0L;

                            public final boolean apply(Tuple2<TopicAndPartition, Seq<KeyedMessage<String, Message>>> check$ifrefutable$2) {
                                TopicAndPartition topicAndPartition;
                                Tuple2<TopicAndPartition, Seq<KeyedMessage<String, Message>>> tuple2 = check$ifrefutable$2;
                                boolean bl = tuple2 != null && (topicAndPartition = (TopicAndPartition)tuple2._1()) != null;
                                return bl;
                            }
                        }).foreach((Function1)new Serializable(this){
                            public static final long serialVersionUID = 0L;

                            public final void apply(Tuple2<TopicAndPartition, Seq<KeyedMessage<String, Message>>> x$1) {
                                TopicAndPartition topicAndPartition;
                                Tuple2<TopicAndPartition, Seq<KeyedMessage<String, Message>>> tuple2 = x$1;
                                if (tuple2 != null && (topicAndPartition = (TopicAndPartition)tuple2._1()) != null) {
                                    int partitionId = topicAndPartition.partition();
                                    Assert.assertTrue((partitionId == 0 ? 1 : 0) != 0);
                                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                                    return;
                                }
                                throw new MatchError(tuple2);
                            }
                        });
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        return;
                    }
                    throw new MatchError(tuple2);
                }
            });
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        if (None$.MODULE$.equals(option)) {
            throw this.fail("Failed to collate requests by topic, partition");
        }
        throw new MatchError((Object)option);
    }

    @Test
    public void testFailedSendRetryLogic() {
        Properties props = new Properties();
        props.put("metadata.broker.list", TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq<KafkaConfig>)this.configs()));
        props.put("request.required.acks", "1");
        props.put("serializer.class", StringEncoder.class.getName().toString());
        props.put("key.serializer.class", NullEncoder.class.getName().toString());
        props.put("producer.num.retries", ((Object)BoxesRunTime.boxToInteger((int)3)).toString());
        ProducerConfig config = new ProducerConfig(props);
        String topic1 = "topic1";
        TopicMetadata topic1Metadata = this.getTopicMetadata(topic1, (Seq<Object>)Predef$.MODULE$.wrapIntArray(new int[]{0, 1}), 0, "localhost", 9092);
        HashMap topicPartitionInfos = new HashMap();
        topicPartitionInfos.put((Object)"topic1", (Object)topic1Metadata);
        Seq<String> msgs = TestUtils$.MODULE$.getMsgStrings(2);
        List x$9 = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{topic1}));
        List x$10 = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{0, 1}));
        ByteBufferMessageSet x$11 = this.messagesToSet(msgs);
        int x$12 = 1;
        int x$13 = 11;
        int x$14 = TestUtils$.MODULE$.produceRequestWithAcks$default$5();
        String x$15 = TestUtils$.MODULE$.produceRequestWithAcks$default$7();
        ProducerRequest request1 = TestUtils$.MODULE$.produceRequestWithAcks((Seq<String>)x$9, (Seq<Object>)x$10, x$11, x$12, x$14, x$13, x$15);
        List x$16 = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{topic1}));
        List x$17 = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{0, 1}));
        ByteBufferMessageSet x$18 = this.messagesToSet(msgs);
        int x$19 = 1;
        int x$20 = 17;
        int x$21 = TestUtils$.MODULE$.produceRequestWithAcks$default$5();
        String x$22 = TestUtils$.MODULE$.produceRequestWithAcks$default$7();
        ProducerRequest request2 = TestUtils$.MODULE$.produceRequestWithAcks((Seq<String>)x$16, (Seq<Object>)x$17, x$18, x$19, x$21, x$20, x$22);
        ProducerResponse response1 = new ProducerResponse(0, (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)new TopicAndPartition("topic1", 0), (Object)new ProducerResponseStatus(ErrorMapping$.MODULE$.NotLeaderForPartitionCode(), 0L)), new Tuple2((Object)new TopicAndPartition("topic1", 1), (Object)new ProducerResponseStatus(ErrorMapping$.MODULE$.NoError(), 0L))})));
        String x$23 = topic1;
        int x$24 = 0;
        ByteBufferMessageSet x$25 = this.messagesToSet(msgs);
        int x$26 = 1;
        int x$27 = 21;
        int x$28 = TestUtils$.MODULE$.produceRequest$default$5();
        String x$29 = TestUtils$.MODULE$.produceRequest$default$7();
        ProducerRequest request3 = TestUtils$.MODULE$.produceRequest(x$23, x$24, x$25, x$26, x$28, x$27, x$29);
        ProducerResponse response2 = new ProducerResponse(0, (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)new TopicAndPartition("topic1", 0), (Object)new ProducerResponseStatus(ErrorMapping$.MODULE$.NoError(), 0L))})));
        SyncProducer mockSyncProducer = (SyncProducer)EasyMock.createMock(SyncProducer.class);
        EasyMock.expect((Object)mockSyncProducer.config()).andReturn(EasyMock.anyObject()).anyTimes();
        EasyMock.expect((Object)mockSyncProducer.send(request1)).andThrow((Throwable)new RuntimeException());
        EasyMock.expect((Object)mockSyncProducer.send(request2)).andReturn((Object)response1);
        EasyMock.expect((Object)mockSyncProducer.send(request3)).andReturn((Object)response2);
        EasyMock.replay((Object[])new Object[]{mockSyncProducer});
        ProducerPool producerPool = (ProducerPool)EasyMock.createMock(ProducerPool.class);
        EasyMock.expect((Object)producerPool.getProducer(0)).andReturn((Object)mockSyncProducer).times(4);
        producerPool.close();
        EasyMock.expect((Object)BoxedUnit.UNIT);
        EasyMock.replay((Object[])new Object[]{producerPool});
        DefaultEventHandler handler = new DefaultEventHandler(config, (Partitioner)new FixedValuePartitioner(FixedValuePartitioner$.MODULE$.$lessinit$greater$default$1()), (Encoder)new StringEncoder(StringEncoder$.MODULE$.$lessinit$greater$default$1()), (Encoder)new NullEncoder(NullEncoder$.MODULE$.$lessinit$greater$default$1()), producerPool, topicPartitionInfos);
        Seq data = (Seq)((TraversableLike)msgs.map((Function1)new Serializable(this, topic1){
            public static final long serialVersionUID = 0L;
            private final String topic1$1;

            public final KeyedMessage<Object, String> apply(String m) {
                return new KeyedMessage(this.topic1$1, (Object)BoxesRunTime.boxToInteger((int)0), (Object)m);
            }
            {
                this.topic1$1 = topic1$1;
            }
        }, Seq$.MODULE$.canBuildFrom())).$plus$plus((GenTraversableOnce)msgs.map((Function1)new Serializable(this, topic1){
            public static final long serialVersionUID = 0L;
            private final String topic1$1;

            public final KeyedMessage<Object, String> apply(String m) {
                return new KeyedMessage(this.topic1$1, (Object)BoxesRunTime.boxToInteger((int)1), (Object)m);
            }
            {
                this.topic1$1 = topic1$1;
            }
        }, Seq$.MODULE$.canBuildFrom()), Seq$.MODULE$.canBuildFrom());
        handler.handle(data);
        handler.close();
        EasyMock.verify((Object[])new Object[]{mockSyncProducer});
        EasyMock.verify((Object[])new Object[]{producerPool});
    }

    @Test
    public void testJavaProducer() {
        String topic = "topic1";
        Seq<String> msgs = TestUtils$.MODULE$.getMsgStrings(5);
        Seq scalaProducerData = (Seq)msgs.map((Function1)new Serializable(this, topic){
            public static final long serialVersionUID = 0L;
            private final String topic$1;

            public final KeyedMessage<String, String> apply(String m) {
                return new KeyedMessage(this.topic$1, (Object)m);
            }
            {
                this.topic$1 = topic$1;
            }
        }, Seq$.MODULE$.canBuildFrom());
        java.util.List javaProducerData = JavaConversions$.MODULE$.seqAsJavaList(scalaProducerData);
        Producer mockScalaProducer = (Producer)EasyMock.createMock(Producer.class);
        mockScalaProducer.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{(KeyedMessage)scalaProducerData.head()}));
        EasyMock.expectLastCall();
        mockScalaProducer.send(scalaProducerData);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{mockScalaProducer});
        kafka.javaapi.producer.Producer javaProducer = new kafka.javaapi.producer.Producer(mockScalaProducer);
        javaProducer.send((KeyedMessage)javaProducerData.get(0));
        javaProducer.send(javaProducerData);
        EasyMock.verify((Object[])new Object[]{mockScalaProducer});
    }

    @Test
    public void testInvalidConfiguration() {
        Properties props = new Properties();
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("producer.type", "async");
        try {
            new ProducerConfig(props);
            throw this.fail("should complain about wrong config");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            return;
        }
    }

    /*
     * WARNING - void declaration
     */
    public Seq<KeyedMessage<String, String>> getProduceData(int nEvents) {
        void var2_2;
        ArrayBuffer producerDataList = new ArrayBuffer();
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), nEvents).foreach$mVc$sp((Function1)new Serializable(this, producerDataList){
            public static final long serialVersionUID = 0L;
            private final ArrayBuffer producerDataList$1;

            public final void apply(int i) {
                this.apply$mcVI$sp(i);
            }

            public void apply$mcVI$sp(int i) {
                this.producerDataList$1.append((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage("topic1", null, (Object)new StringBuilder().append((Object)"msg").append((Object)BoxesRunTime.boxToInteger((int)i)).toString())}));
            }
            {
                this.producerDataList$1 = producerDataList$1;
            }
        });
        return var2_2;
    }

    private TopicMetadata getTopicMetadata(String topic, int partition, int brokerId, String brokerHost, int brokerPort) {
        return this.getTopicMetadata(topic, (Seq<Object>)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{partition})), brokerId, brokerHost, brokerPort);
    }

    private TopicMetadata getTopicMetadata(String topic, Seq<Object> partition, int brokerId, String brokerHost, int brokerPort) {
        Broker broker1 = new Broker(brokerId, brokerHost, brokerPort);
        return new TopicMetadata(topic, (Seq)partition.map((Function1)new Serializable(this, broker1){
            public static final long serialVersionUID = 0L;
            private final Broker broker1$1;

            public final PartitionMetadata apply(int x$3) {
                return new PartitionMetadata(x$3, (Option)new Some((Object)this.broker1$1), (Seq)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Broker[]{this.broker1$1})), PartitionMetadata$.MODULE$.$lessinit$greater$default$4(), PartitionMetadata$.MODULE$.$lessinit$greater$default$5());
            }
            {
                this.broker1$1 = broker1$1;
            }
        }, Seq$.MODULE$.canBuildFrom()), TopicMetadata$.MODULE$.$lessinit$greater$default$3());
    }

    public ByteBufferMessageSet messagesToSet(Seq<String> messages2) {
        return new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)messages2.map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Message apply(String m) {
                return new Message(m.getBytes());
            }
        }, Seq$.MODULE$.canBuildFrom()));
    }

    public ByteBufferMessageSet messagesToSet(byte[] key, Seq<byte[]> messages2) {
        return new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)messages2.map((Function1)new Serializable(this, key){
            public static final long serialVersionUID = 0L;
            private final byte[] key$1;

            public final Message apply(byte[] m) {
                byte[] x$30 = this.key$1;
                byte[] x$31 = m;
                return new Message(x$31, x$30);
            }
            {
                this.key$1 = key$1;
            }
        }, Seq$.MODULE$.canBuildFrom()));
    }
}

