/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Sum;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.AlwaysContinueProductionExceptionHandler;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.junit.Assert;
import org.junit.Test;

public class RecordCollectorTest {
    private final LogContext logContext = new LogContext("test ");
    private final List<PartitionInfo> infos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]));
    private final Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), this.infos, Collections.emptySet(), Collections.emptySet());
    private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
    private final StringSerializer stringSerializer = new StringSerializer();
    private final StreamPartitioner<String, Object> streamPartitioner = new StreamPartitioner<String, Object>(){

        public Integer partition(String topic, String key, Object value, int numPartitions) {
            return Integer.parseInt(key) % numPartitions;
        }
    };

    @Test
    public void testSpecificPartition() {
        RecordCollectorImpl collector = new RecordCollectorImpl("RecordCollectorTest-TestSpecificPartition", new LogContext("RecordCollectorTest-TestSpecificPartition "), (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer));
        RecordHeaders headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        collector.send("topic1", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", (Headers)headers, Integer.valueOf(1), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", (Headers)headers, Integer.valueOf(1), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", (Headers)headers, Integer.valueOf(2), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        Map offsets = collector.offsets();
        Assert.assertEquals((Object)2L, offsets.get(new TopicPartition("topic1", 0)));
        Assert.assertEquals((Object)1L, offsets.get(new TopicPartition("topic1", 1)));
        Assert.assertEquals((Object)0L, offsets.get(new TopicPartition("topic1", 2)));
        collector.send("topic1", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", null, Integer.valueOf(1), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", (Headers)headers, Integer.valueOf(2), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        Assert.assertEquals((Object)3L, offsets.get(new TopicPartition("topic1", 0)));
        Assert.assertEquals((Object)2L, offsets.get(new TopicPartition("topic1", 1)));
        Assert.assertEquals((Object)1L, offsets.get(new TopicPartition("topic1", 2)));
    }

    @Test
    public void testStreamPartitioner() {
        RecordCollectorImpl collector = new RecordCollectorImpl("RecordCollectorTest-TestStreamPartitioner", new LogContext("RecordCollectorTest-TestStreamPartitioner "), (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer));
        RecordHeaders headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"9", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"27", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"81", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"243", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"28", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"82", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"244", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"245", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        Map offsets = collector.offsets();
        Assert.assertEquals((Object)4L, offsets.get(new TopicPartition("topic1", 0)));
        Assert.assertEquals((Object)2L, offsets.get(new TopicPartition("topic1", 1)));
        Assert.assertEquals((Object)0L, offsets.get(new TopicPartition("topic1", 2)));
    }

    @Test(expected=StreamsException.class)
    public void shouldThrowStreamsExceptionOnAnyExceptionButProducerFencedException() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                throw new KafkaException();
            }
        });
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
    }

    @Test
    public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFailsWithDefaultExceptionHandler() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                callback.onCompletion(null, new Exception());
                return null;
            }
        });
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        try {
            collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
            Assert.fail((String)"Should have thrown StreamsException");
        }
        catch (StreamsException streamsException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContinueExceptionHandler() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new AlwaysContinueProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                callback.onCompletion(null, new Exception());
                return null;
            }
        });
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
    }

    @Test
    public void shouldRecordSkippedMetricAndLogWarningIfSendFailsWithContinueExceptionHandler() {
        Metrics metrics = new Metrics();
        Sensor sensor = metrics.sensor("skipped-records");
        LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister();
        MetricName metricName = new MetricName("name", "group", "description", Collections.EMPTY_MAP);
        sensor.add(metricName, (MeasurableStat)new Sum());
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new AlwaysContinueProductionExceptionHandler(), sensor);
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                callback.onCompletion(null, new Exception());
                return null;
            }
        });
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        Assert.assertEquals((Object)1.0, (Object)((KafkaMetric)metrics.metrics().get(metricName)).metricValue());
        Assert.assertTrue((boolean)logCaptureAppender.getMessages().contains("test Error sending records (key=[3] value=[0] timestamp=[null]) to topic=[topic1] and partition=[0]; The exception handler chose to CONTINUE processing in spite of this error."));
        LogCaptureAppender.unregister(logCaptureAppender);
    }

    @Test
    public void shouldThrowStreamsExceptionOnFlushIfASendFailedWithDefaultExceptionHandler() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                callback.onCompletion(null, new Exception());
                return null;
            }
        });
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        try {
            collector.flush();
            Assert.fail((String)"Should have thrown StreamsException");
        }
        catch (StreamsException streamsException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotThrowStreamsExceptionOnFlushIfASendFailedWithContinueExceptionHandler() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new AlwaysContinueProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                callback.onCompletion(null, new Exception());
                return null;
            }
        });
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.flush();
    }

    @Test
    public void shouldThrowStreamsExceptionOnCloseIfASendFailedWithDefaultExceptionHandler() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                callback.onCompletion(null, new Exception());
                return null;
            }
        });
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        try {
            collector.close();
            Assert.fail((String)"Should have thrown StreamsException");
        }
        catch (StreamsException streamsException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotThrowStreamsExceptionOnCloseIfASendFailedWithContinueExceptionHandler() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new AlwaysContinueProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                callback.onCompletion(null, new Exception());
                return null;
            }
        });
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.close();
    }

    @Test(expected=StreamsException.class)
    public void shouldThrowIfTopicIsUnknownWithDefaultExceptionHandler() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public List<PartitionInfo> partitionsFor(String topic) {
                return Collections.EMPTY_LIST;
            }
        });
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
    }

    @Test(expected=StreamsException.class)
    public void shouldThrowIfTopicIsUnknownWithContinueExceptionHandler() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new AlwaysContinueProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public List<PartitionInfo> partitionsFor(String topic) {
                return Collections.EMPTY_LIST;
            }
        });
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
    }
}

