/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.junit.Assert;
import org.junit.Test;

public class RecordCollectorTest {
    private final LogContext logContext = new LogContext("test ");
    private final List<PartitionInfo> infos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]));
    private final Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), this.infos, Collections.emptySet(), Collections.emptySet());
    private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
    private final StringSerializer stringSerializer = new StringSerializer();
    private final StreamPartitioner<String, Object> streamPartitioner = new StreamPartitioner<String, Object>(){

        public Integer partition(String key, Object value, int numPartitions) {
            return Integer.parseInt(key) % numPartitions;
        }
    };

    @Test
    public void testSpecificPartition() {
        RecordCollectorImpl collector = new RecordCollectorImpl((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer), "RecordCollectorTest-TestSpecificPartition", new LogContext("RecordCollectorTest-TestSpecificPartition "));
        collector.send("topic1", (Object)"999", (Object)"0", Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", Integer.valueOf(1), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", Integer.valueOf(1), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", Integer.valueOf(2), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        Map offsets = collector.offsets();
        Assert.assertEquals((Object)2L, offsets.get(new TopicPartition("topic1", 0)));
        Assert.assertEquals((Object)1L, offsets.get(new TopicPartition("topic1", 1)));
        Assert.assertEquals((Object)0L, offsets.get(new TopicPartition("topic1", 2)));
        collector.send("topic1", (Object)"999", (Object)"0", Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", Integer.valueOf(1), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", Integer.valueOf(2), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        Assert.assertEquals((Object)3L, offsets.get(new TopicPartition("topic1", 0)));
        Assert.assertEquals((Object)2L, offsets.get(new TopicPartition("topic1", 1)));
        Assert.assertEquals((Object)1L, offsets.get(new TopicPartition("topic1", 2)));
    }

    @Test
    public void testStreamPartitioner() {
        RecordCollectorImpl collector = new RecordCollectorImpl((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer), "RecordCollectorTest-TestStreamPartitioner", new LogContext("RecordCollectorTest-TestStreamPartitioner "));
        collector.send("topic1", (Object)"3", (Object)"0", null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"9", (Object)"0", null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"27", (Object)"0", null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"81", (Object)"0", null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"243", (Object)"0", null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"28", (Object)"0", null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"82", (Object)"0", null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"244", (Object)"0", null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"245", (Object)"0", null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        Map offsets = collector.offsets();
        Assert.assertEquals((Object)4L, offsets.get(new TopicPartition("topic1", 0)));
        Assert.assertEquals((Object)2L, offsets.get(new TopicPartition("topic1", 1)));
        Assert.assertEquals((Object)0L, offsets.get(new TopicPartition("topic1", 2)));
    }

    @Test
    public void shouldRetryWhenTimeoutExceptionOccursOnSend() {
        final AtomicInteger attempt = new AtomicInteger(0);
        RecordCollectorImpl collector = new RecordCollectorImpl((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                if (attempt.getAndIncrement() == 0) {
                    throw new TimeoutException();
                }
                return super.send(record, callback);
            }
        }, "test", this.logContext);
        collector.send("topic1", (Object)"3", (Object)"0", null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        Long offset = (Long)collector.offsets().get(new TopicPartition("topic1", 0));
        Assert.assertEquals((Object)0L, (Object)offset);
    }

    @Test(expected=StreamsException.class)
    public void shouldThrowStreamsExceptionAfterMaxAttempts() {
        RecordCollectorImpl collector = new RecordCollectorImpl((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                throw new TimeoutException();
            }
        }, "test", this.logContext);
        collector.send("topic1", (Object)"3", (Object)"0", null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
    }

    @Test(expected=StreamsException.class)
    public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() {
        RecordCollectorImpl collector = new RecordCollectorImpl((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                callback.onCompletion(null, new Exception());
                return null;
            }
        }, "test", this.logContext);
        collector.send("topic1", (Object)"3", (Object)"0", null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"3", (Object)"0", null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
    }

    @Test(expected=StreamsException.class)
    public void shouldThrowStreamsExceptionOnFlushIfASendFailed() {
        RecordCollectorImpl collector = new RecordCollectorImpl((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                callback.onCompletion(null, new Exception());
                return null;
            }
        }, "test", this.logContext);
        collector.send("topic1", (Object)"3", (Object)"0", null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.flush();
    }

    @Test(expected=StreamsException.class)
    public void shouldThrowStreamsExceptionOnCloseIfASendFailed() {
        RecordCollectorImpl collector = new RecordCollectorImpl((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                callback.onCompletion(null, new Exception());
                return null;
            }
        }, "test", this.logContext);
        collector.send("topic1", (Object)"3", (Object)"0", null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.close();
    }

    @Test(expected=StreamsException.class)
    public void shouldThrowIfTopicIsUnknown() {
        RecordCollectorImpl collector = new RecordCollectorImpl((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public List<PartitionInfo> partitionsFor(String topic) {
                return Collections.EMPTY_LIST;
            }
        }, "test", this.logContext);
        collector.send("topic1", (Object)"3", (Object)"0", null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
    }
}

