/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.AlwaysContinueProductionExceptionHandler;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.RecoverableClientException;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class RecordCollectorTest {
    private final LogContext logContext = new LogContext("test ");
    private final List<PartitionInfo> infos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]));
    private final Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), this.infos, Collections.emptySet(), Collections.emptySet());
    private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
    private final StringSerializer stringSerializer = new StringSerializer();
    private final StreamPartitioner<String, Object> streamPartitioner = (topic, key, value, numPartitions) -> Integer.parseInt(key) % numPartitions;
    private final String topic1TimeoutHint = "Timeout exception caught when sending record to topic topic1. This might happen if the producer cannot send data to the Kafka cluster and thus, its internal buffer fills up. This can also happen if the broker is slow to respond, if the network connection to the broker was interrupted, or if similar circumstances arise. You can increase producer parameter `max.block.ms` to increase this timeout.";

    @Test
    public void testSpecificPartition() {
        RecordCollectorImpl collector = new RecordCollectorImpl("RecordCollectorTest-TestSpecificPartition", new LogContext("RecordCollectorTest-TestSpecificPartition "), (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("dropped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer));
        RecordHeaders headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        collector.send("topic1", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", (Headers)headers, Integer.valueOf(1), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", (Headers)headers, Integer.valueOf(1), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", (Headers)headers, Integer.valueOf(2), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        Map offsets = collector.offsets();
        Assert.assertEquals((Object)2L, offsets.get(new TopicPartition("topic1", 0)));
        Assert.assertEquals((Object)1L, offsets.get(new TopicPartition("topic1", 1)));
        Assert.assertEquals((Object)0L, offsets.get(new TopicPartition("topic1", 2)));
        collector.send("topic1", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", null, Integer.valueOf(1), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", (Headers)headers, Integer.valueOf(2), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        Assert.assertEquals((Object)3L, offsets.get(new TopicPartition("topic1", 0)));
        Assert.assertEquals((Object)2L, offsets.get(new TopicPartition("topic1", 1)));
        Assert.assertEquals((Object)1L, offsets.get(new TopicPartition("topic1", 2)));
    }

    @Test
    public void testStreamPartitioner() {
        RecordCollectorImpl collector = new RecordCollectorImpl("RecordCollectorTest-TestStreamPartitioner", new LogContext("RecordCollectorTest-TestStreamPartitioner "), (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("dropped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer));
        RecordHeaders headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"9", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"27", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"81", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"243", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"28", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"82", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"244", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"245", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        Map offsets = collector.offsets();
        Assert.assertEquals((Object)4L, offsets.get(new TopicPartition("topic1", 0)));
        Assert.assertEquals((Object)2L, offsets.get(new TopicPartition("topic1", 1)));
        Assert.assertEquals((Object)0L, offsets.get(new TopicPartition("topic1", 2)));
    }

    @Test
    public void shouldNotAllowOffsetsToBeUpdatedExternally() {
        String topic = "topic1";
        TopicPartition topicPartition = new TopicPartition("topic1", 0);
        RecordCollectorImpl collector = new RecordCollectorImpl("RecordCollectorTest-TestSpecificPartition", new LogContext("RecordCollectorTest-TestSpecificPartition "), (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("dropped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer));
        collector.send("topic1", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        collector.send("topic1", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        Map offsets = collector.offsets();
        MatcherAssert.assertThat(offsets.get(topicPartition), (Matcher)Matchers.equalTo((Object)2L));
        Assert.assertThrows(UnsupportedOperationException.class, () -> offsets.put(new TopicPartition("topic1", 0), 50L));
        MatcherAssert.assertThat(collector.offsets().get(topicPartition), (Matcher)Matchers.equalTo((Object)2L));
    }

    @Test
    public void shouldThrowStreamsExceptionOnAnyExceptionButProducerFencedException() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("dropped-records"));
        collector.init((Producer)new MockProducer<byte[], byte[]>(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> record, Callback callback) {
                throw new KafkaException();
            }
        });
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.lambda$shouldThrowStreamsExceptionOnAnyExceptionButProducerFencedException$2((RecordCollector)collector));
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.instanceOf(KafkaException.class));
    }

    @Test
    public void shouldThrowRecoverableExceptionOnProducerFencedException() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("dropped-records"));
        collector.init((Producer)new MockProducer<byte[], byte[]>(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> record, Callback callback) {
                throw new KafkaException((Throwable)new ProducerFencedException("asdf"));
            }
        });
        RecoverableClientException thrown = (RecoverableClientException)Assert.assertThrows(RecoverableClientException.class, () -> this.lambda$shouldThrowRecoverableExceptionOnProducerFencedException$3((RecordCollector)collector));
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.instanceOf(KafkaException.class));
        MatcherAssert.assertThat((Object)thrown.getCause().getCause(), (Matcher)Matchers.instanceOf(ProducerFencedException.class));
    }

    @Test
    public void shouldThrowRecoverableExceptionOnUnknownProducerException() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("dropped-records"));
        collector.init((Producer)new MockProducer<byte[], byte[]>(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> record, Callback callback) {
                throw new KafkaException((Throwable)new UnknownProducerIdException("asdf"));
            }
        });
        RecoverableClientException thrown = (RecoverableClientException)Assert.assertThrows(RecoverableClientException.class, () -> this.lambda$shouldThrowRecoverableExceptionOnUnknownProducerException$4((RecordCollector)collector));
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.instanceOf(KafkaException.class));
        MatcherAssert.assertThat((Object)thrown.getCause().getCause(), (Matcher)Matchers.instanceOf(UnknownProducerIdException.class));
    }

    @Test
    public void shouldThrowRecoverableExceptionWhenProducerFencedInCallback() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        collector.init((Producer)new MockProducer<byte[], byte[]>(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> record, Callback callback) {
                callback.onCompletion(null, (Exception)new ProducerFencedException("asdf"));
                return null;
            }
        });
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        RecoverableClientException thrown = (RecoverableClientException)Assert.assertThrows(RecoverableClientException.class, () -> this.lambda$shouldThrowRecoverableExceptionWhenProducerFencedInCallback$5((RecordCollector)collector));
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.instanceOf(ProducerFencedException.class));
    }

    @Test
    public void shouldThrowRecoverableExceptionWhenProducerForgottenInCallback() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        collector.init((Producer)new MockProducer<byte[], byte[]>(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> record, Callback callback) {
                callback.onCompletion(null, (Exception)new UnknownProducerIdException("asdf"));
                return null;
            }
        });
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        RecoverableClientException thrown = (RecoverableClientException)Assert.assertThrows(RecoverableClientException.class, () -> this.lambda$shouldThrowRecoverableExceptionWhenProducerForgottenInCallback$6((RecordCollector)collector));
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.instanceOf(UnknownProducerIdException.class));
    }

    @Test
    public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFailsWithDefaultExceptionHandler() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("dropped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                callback.onCompletion(null, new Exception());
                return null;
            }
        });
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        try {
            collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
            Assert.fail((String)"Should have thrown StreamsException");
        }
        catch (StreamsException streamsException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContinueExceptionHandler() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new AlwaysContinueProductionExceptionHandler(), new Metrics().sensor("dropped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                callback.onCompletion(null, new Exception());
                return null;
            }
        });
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
    }

    @Test
    public void shouldRecordSkippedMetricAndLogWarningIfSendFailsWithContinueExceptionHandler() {
        Metrics metrics = new Metrics();
        Sensor sensor = metrics.sensor("dropped-records");
        LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister();
        MetricName metricName = new MetricName("name", "group", "description", Collections.emptyMap());
        sensor.add(metricName, (MeasurableStat)new WindowedSum());
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new AlwaysContinueProductionExceptionHandler(), metrics.sensor("dropped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                callback.onCompletion(null, new Exception());
                return null;
            }
        });
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        Assert.assertEquals((Object)1.0, (Object)((KafkaMetric)metrics.metrics().get(metricName)).metricValue());
        Assert.assertTrue((boolean)logCaptureAppender.getMessages().contains("test Error sending records topic=[topic1] and partition=[0]; The exception handler chose to CONTINUE processing in spite of this error. Enable TRACE logging to view failed messages key and value."));
        LogCaptureAppender.unregister(logCaptureAppender);
    }

    @Test
    public void shouldThrowStreamsExceptionOnFlushIfASendFailedWithDefaultExceptionHandler() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("dropped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                callback.onCompletion(null, new Exception());
                return null;
            }
        });
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        try {
            collector.flush();
            Assert.fail((String)"Should have thrown StreamsException");
        }
        catch (StreamsException streamsException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowStreamsExceptionWithTimeoutHintOnProducerTimeoutWithDefaultExceptionHandler() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        collector.init((Producer)new MockProducer<byte[], byte[]>(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                callback.onCompletion(null, (Exception)new TimeoutException());
                return null;
            }
        });
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        StreamsException expected = (StreamsException)Assert.assertThrows(StreamsException.class, () -> RecordCollectorTest.lambda$shouldThrowStreamsExceptionWithTimeoutHintOnProducerTimeoutWithDefaultExceptionHandler$7((RecordCollector)collector));
        Assert.assertTrue((boolean)(expected.getCause() instanceof TimeoutException));
        Assert.assertTrue((boolean)expected.getMessage().endsWith("Timeout exception caught when sending record to topic topic1. This might happen if the producer cannot send data to the Kafka cluster and thus, its internal buffer fills up. This can also happen if the broker is slow to respond, if the network connection to the broker was interrupted, or if similar circumstances arise. You can increase producer parameter `max.block.ms` to increase this timeout."));
    }

    @Test
    public void shouldNotThrowStreamsExceptionOnFlushIfASendFailedWithContinueExceptionHandler() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new AlwaysContinueProductionExceptionHandler(), new Metrics().sensor("dropped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                callback.onCompletion(null, new Exception());
                return null;
            }
        });
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.flush();
    }

    @Test
    public void shouldThrowStreamsExceptionOnCloseIfASendFailedWithDefaultExceptionHandler() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("dropped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                callback.onCompletion(null, new Exception());
                return null;
            }
        });
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        try {
            collector.close();
            Assert.fail((String)"Should have thrown StreamsException");
        }
        catch (StreamsException streamsException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotThrowStreamsExceptionOnCloseIfASendFailedWithContinueExceptionHandler() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new AlwaysContinueProductionExceptionHandler(), new Metrics().sensor("dropped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
                callback.onCompletion(null, new Exception());
                return null;
            }
        });
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.close();
    }

    @Test(expected=StreamsException.class)
    public void shouldThrowIfTopicIsUnknownWithDefaultExceptionHandler() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("dropped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public List<PartitionInfo> partitionsFor(String topic) {
                return Collections.emptyList();
            }
        });
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
    }

    @Test(expected=StreamsException.class)
    public void shouldThrowIfTopicIsUnknownWithContinueExceptionHandler() {
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new AlwaysContinueProductionExceptionHandler(), new Metrics().sensor("dropped-records"));
        collector.init((Producer)new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer){

            public List<PartitionInfo> partitionsFor(String topic) {
                return Collections.emptyList();
            }
        });
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
    }

    @Test
    public void testRecordHeaderPassThroughSerializer() {
        CustomStringSerializer keySerializer = new CustomStringSerializer();
        CustomStringSerializer valueSerializer = new CustomStringSerializer();
        keySerializer.configure(Collections.emptyMap(), true);
        RecordCollectorImpl collector = new RecordCollectorImpl("test", this.logContext, (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("dropped-records"));
        MockProducer mockProducer = new MockProducer(this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)this.byteArraySerializer, (Serializer)this.byteArraySerializer);
        collector.init((Producer)mockProducer);
        collector.send("topic1", (Object)"3", (Object)"0", (Headers)new RecordHeaders(), null, (Serializer)keySerializer, (Serializer)valueSerializer, this.streamPartitioner);
        List recordHistory = mockProducer.history();
        for (ProducerRecord sentRecord : recordHistory) {
            Headers headers = sentRecord.headers();
            Assert.assertEquals((long)2L, (long)headers.toArray().length);
            Assert.assertEquals((Object)new RecordHeader("key", "key".getBytes()), (Object)headers.lastHeader("key"));
            Assert.assertEquals((Object)new RecordHeader("value", "value".getBytes()), (Object)headers.lastHeader("value"));
        }
    }

    @Test
    public void testShouldNotThrowNPEOnCloseIfProducerIsNotInitialized() {
        RecordCollectorImpl collector = new RecordCollectorImpl("NoNPE", this.logContext, (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("dropped-records"));
        collector.close();
    }

    private static /* synthetic */ void lambda$shouldThrowStreamsExceptionWithTimeoutHintOnProducerTimeoutWithDefaultExceptionHandler$7(RecordCollector collector) throws Throwable {
        collector.flush();
    }

    private /* synthetic */ void lambda$shouldThrowRecoverableExceptionWhenProducerForgottenInCallback$6(RecordCollector collector) throws Throwable {
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
    }

    private /* synthetic */ void lambda$shouldThrowRecoverableExceptionWhenProducerFencedInCallback$5(RecordCollector collector) throws Throwable {
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
    }

    private /* synthetic */ void lambda$shouldThrowRecoverableExceptionOnUnknownProducerException$4(RecordCollector collector) throws Throwable {
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
    }

    private /* synthetic */ void lambda$shouldThrowRecoverableExceptionOnProducerFencedException$3(RecordCollector collector) throws Throwable {
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
    }

    private /* synthetic */ void lambda$shouldThrowStreamsExceptionOnAnyExceptionButProducerFencedException$2(RecordCollector collector) throws Throwable {
        collector.send("topic1", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
    }

    private static class CustomStringSerializer
    extends StringSerializer {
        private boolean isKey;

        private CustomStringSerializer() {
        }

        public void configure(Map<String, ?> configs, boolean isKey) {
            this.isKey = isKey;
            super.configure(configs, isKey);
        }

        public byte[] serialize(String topic, Headers headers, String data) {
            if (this.isKey) {
                headers.add((Header)new RecordHeader("key", "key".getBytes()));
            } else {
                headers.add((Header)new RecordHeader("value", "value".getBytes()));
            }
            return this.serialize(topic, data);
        }
    }
}

