/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.AlwaysContinueProductionExceptionHandler;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.StreamsProducer;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.test.MockClientSupplier;
import org.easymock.EasyMock;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.hamcrest.core.IsInstanceOf;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class RecordCollectorTest {
    private final LogContext logContext = new LogContext("test ");
    private final TaskId taskId = new TaskId(0, 0);
    private final ProductionExceptionHandler productionExceptionHandler = new DefaultProductionExceptionHandler();
    private final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(new Metrics());
    private final StreamsConfig config = new StreamsConfig(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)"appId"), Utils.mkEntry((Object)"bootstrap.servers", (Object)"dummy:1234")}));
    private final StreamsConfig eosConfig = new StreamsConfig(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)"appId"), Utils.mkEntry((Object)"bootstrap.servers", (Object)"dummy:1234"), Utils.mkEntry((Object)"processing.guarantee", (Object)"exactly_once")}));
    private final String topic = "topic";
    private final Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), Arrays.asList(new PartitionInfo("topic", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic", 2, Node.noNode(), new Node[0], new Node[0])), Collections.emptySet(), Collections.emptySet());
    private final StringSerializer stringSerializer = new StringSerializer();
    private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
    private final StreamPartitioner<String, Object> streamPartitioner = (topic, key, value, numPartitions) -> Integer.parseInt(key) % numPartitions;
    private MockProducer<byte[], byte[]> mockProducer;
    private StreamsProducer streamsProducer;
    private RecordCollectorImpl collector;

    @Before
    public void setup() {
        MockClientSupplier clientSupplier = new MockClientSupplier();
        clientSupplier.setCluster(this.cluster);
        this.streamsProducer = new StreamsProducer(this.config, "threadId", (KafkaClientSupplier)clientSupplier, null, null, this.logContext);
        this.mockProducer = clientSupplier.producers.get(0);
        this.collector = new RecordCollectorImpl(this.logContext, this.taskId, this.streamsProducer, this.productionExceptionHandler, this.streamsMetrics);
    }

    @After
    public void cleanup() {
        this.collector.closeClean();
    }

    @Test
    public void shouldSendToSpecificPartition() {
        RecordHeaders headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        this.collector.send("topic", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        this.collector.send("topic", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        this.collector.send("topic", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        this.collector.send("topic", (Object)"999", (Object)"0", (Headers)headers, Integer.valueOf(1), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        this.collector.send("topic", (Object)"999", (Object)"0", (Headers)headers, Integer.valueOf(1), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        this.collector.send("topic", (Object)"999", (Object)"0", (Headers)headers, Integer.valueOf(2), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        Map offsets = this.collector.offsets();
        Assert.assertEquals((long)2L, (long)((Long)offsets.get(new TopicPartition("topic", 0))));
        Assert.assertEquals((long)1L, (long)((Long)offsets.get(new TopicPartition("topic", 1))));
        Assert.assertEquals((long)0L, (long)((Long)offsets.get(new TopicPartition("topic", 2))));
        Assert.assertEquals((long)6L, (long)this.mockProducer.history().size());
        this.collector.send("topic", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        this.collector.send("topic", (Object)"999", (Object)"0", null, Integer.valueOf(1), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        this.collector.send("topic", (Object)"999", (Object)"0", (Headers)headers, Integer.valueOf(2), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        offsets = this.collector.offsets();
        Assert.assertEquals((long)3L, (long)((Long)offsets.get(new TopicPartition("topic", 0))));
        Assert.assertEquals((long)2L, (long)((Long)offsets.get(new TopicPartition("topic", 1))));
        Assert.assertEquals((long)1L, (long)((Long)offsets.get(new TopicPartition("topic", 2))));
        Assert.assertEquals((long)9L, (long)this.mockProducer.history().size());
    }

    @Test
    public void shouldSendWithPartitioner() {
        RecordHeaders headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        this.collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        this.collector.send("topic", (Object)"9", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        this.collector.send("topic", (Object)"27", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        this.collector.send("topic", (Object)"81", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        this.collector.send("topic", (Object)"243", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        this.collector.send("topic", (Object)"28", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        this.collector.send("topic", (Object)"82", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        this.collector.send("topic", (Object)"244", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        this.collector.send("topic", (Object)"245", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        Map offsets = this.collector.offsets();
        Assert.assertEquals((long)4L, (long)((Long)offsets.get(new TopicPartition("topic", 0))));
        Assert.assertEquals((long)2L, (long)((Long)offsets.get(new TopicPartition("topic", 1))));
        Assert.assertEquals((long)0L, (long)((Long)offsets.get(new TopicPartition("topic", 2))));
        Assert.assertEquals((long)9L, (long)this.mockProducer.history().size());
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        Assert.assertThrows(UnsupportedOperationException.class, () -> offsets.put(topicPartition, 50L));
    }

    @Test
    public void shouldSendWithNoPartition() {
        RecordHeaders headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        this.collector.send("topic", (Object)"3", (Object)"0", (Headers)headers, null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        this.collector.send("topic", (Object)"9", (Object)"0", (Headers)headers, null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        this.collector.send("topic", (Object)"27", (Object)"0", (Headers)headers, null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        this.collector.send("topic", (Object)"81", (Object)"0", (Headers)headers, null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        this.collector.send("topic", (Object)"243", (Object)"0", (Headers)headers, null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        this.collector.send("topic", (Object)"28", (Object)"0", (Headers)headers, null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        this.collector.send("topic", (Object)"82", (Object)"0", (Headers)headers, null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        this.collector.send("topic", (Object)"244", (Object)"0", (Headers)headers, null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        this.collector.send("topic", (Object)"245", (Object)"0", (Headers)headers, null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        Map offsets = this.collector.offsets();
        Assert.assertEquals((long)3L, (long)((Long)offsets.get(new TopicPartition("topic", 0))));
        Assert.assertEquals((long)2L, (long)((Long)offsets.get(new TopicPartition("topic", 1))));
        Assert.assertEquals((long)1L, (long)((Long)offsets.get(new TopicPartition("topic", 2))));
        Assert.assertEquals((long)9L, (long)this.mockProducer.history().size());
    }

    @Test
    public void shouldUpdateOffsetsUponCompletion() {
        Map offsets = this.collector.offsets();
        this.collector.send("topic", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        this.collector.send("topic", (Object)"999", (Object)"0", null, Integer.valueOf(1), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        this.collector.send("topic", (Object)"999", (Object)"0", null, Integer.valueOf(2), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer);
        Assert.assertEquals(Collections.emptyMap(), (Object)offsets);
        this.collector.flush();
        offsets = this.collector.offsets();
        Assert.assertEquals((Object)0L, offsets.get(new TopicPartition("topic", 0)));
        Assert.assertEquals((Object)0L, offsets.get(new TopicPartition("topic", 1)));
        Assert.assertEquals((Object)0L, offsets.get(new TopicPartition("topic", 2)));
    }

    @Test
    public void shouldPassThroughRecordHeaderToSerializer() {
        CustomStringSerializer keySerializer = new CustomStringSerializer();
        CustomStringSerializer valueSerializer = new CustomStringSerializer();
        keySerializer.configure(Collections.emptyMap(), true);
        this.collector.send("topic", (Object)"3", (Object)"0", (Headers)new RecordHeaders(), null, (Serializer)keySerializer, (Serializer)valueSerializer, this.streamPartitioner);
        List recordHistory = this.mockProducer.history();
        for (ProducerRecord sentRecord : recordHistory) {
            Headers headers = sentRecord.headers();
            Assert.assertEquals((long)2L, (long)headers.toArray().length);
            Assert.assertEquals((Object)new RecordHeader("key", "key".getBytes()), (Object)headers.lastHeader("key"));
            Assert.assertEquals((Object)new RecordHeader("value", "value".getBytes()), (Object)headers.lastHeader("value"));
        }
    }

    @Test
    public void shouldForwardFlushToStreamsProducer() {
        StreamsProducer streamsProducer = (StreamsProducer)EasyMock.mock(StreamsProducer.class);
        EasyMock.expect((Object)streamsProducer.eosEnabled()).andReturn((Object)false);
        streamsProducer.flush();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{streamsProducer});
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, streamsProducer, this.productionExceptionHandler, this.streamsMetrics);
        collector.flush();
        EasyMock.verify((Object[])new Object[]{streamsProducer});
    }

    @Test
    public void shouldForwardFlushToStreamsProducerEosEnabled() {
        StreamsProducer streamsProducer = (StreamsProducer)EasyMock.mock(StreamsProducer.class);
        EasyMock.expect((Object)streamsProducer.eosEnabled()).andReturn((Object)true);
        streamsProducer.flush();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{streamsProducer});
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, streamsProducer, this.productionExceptionHandler, this.streamsMetrics);
        collector.flush();
        EasyMock.verify((Object[])new Object[]{streamsProducer});
    }

    @Test
    public void shouldNotAbortTxOnCloseCleanIfEosEnabled() {
        StreamsProducer streamsProducer = (StreamsProducer)EasyMock.mock(StreamsProducer.class);
        EasyMock.expect((Object)streamsProducer.eosEnabled()).andReturn((Object)true);
        EasyMock.replay((Object[])new Object[]{streamsProducer});
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, streamsProducer, this.productionExceptionHandler, this.streamsMetrics);
        collector.closeClean();
        EasyMock.verify((Object[])new Object[]{streamsProducer});
    }

    @Test
    public void shouldAbortTxOnCloseDirtyIfEosEnabled() {
        StreamsProducer streamsProducer = (StreamsProducer)EasyMock.mock(StreamsProducer.class);
        EasyMock.expect((Object)streamsProducer.eosEnabled()).andReturn((Object)true);
        streamsProducer.abortTransaction();
        EasyMock.replay((Object[])new Object[]{streamsProducer});
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, streamsProducer, this.productionExceptionHandler, this.streamsMetrics);
        collector.closeDirty();
        EasyMock.verify((Object[])new Object[]{streamsProducer});
    }

    @Test
    public void shouldThrowInformativeStreamsExceptionOnKeyClassCastException() {
        StreamsException expected = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.collector.send("topic", (Object)"key", (Object)"value", (Headers)new RecordHeaders(), Integer.valueOf(0), Long.valueOf(0L), (Serializer)new LongSerializer(), (Serializer)new StringSerializer()));
        MatcherAssert.assertThat((Object)expected.getCause(), (Matcher)IsInstanceOf.instanceOf(ClassCastException.class));
        MatcherAssert.assertThat((Object)expected.getMessage(), (Matcher)IsEqual.equalTo((Object)"ClassCastException while producing data to topic topic. A serializer (key: org.apache.kafka.common.serialization.LongSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`)."));
    }

    @Test
    public void shouldThrowInformativeStreamsExceptionOnKeyAndNullValueClassCastException() {
        StreamsException expected = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.collector.send("topic", (Object)"key", null, (Headers)new RecordHeaders(), Integer.valueOf(0), Long.valueOf(0L), (Serializer)new LongSerializer(), (Serializer)new StringSerializer()));
        MatcherAssert.assertThat((Object)expected.getCause(), (Matcher)IsInstanceOf.instanceOf(ClassCastException.class));
        MatcherAssert.assertThat((Object)expected.getMessage(), (Matcher)IsEqual.equalTo((Object)"ClassCastException while producing data to topic topic. A serializer (key: org.apache.kafka.common.serialization.LongSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: unknown because value is null). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`)."));
    }

    @Test
    public void shouldThrowInformativeStreamsExceptionOnValueClassCastException() {
        StreamsException expected = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.collector.send("topic", (Object)"key", (Object)"value", (Headers)new RecordHeaders(), Integer.valueOf(0), Long.valueOf(0L), (Serializer)new StringSerializer(), (Serializer)new LongSerializer()));
        MatcherAssert.assertThat((Object)expected.getCause(), (Matcher)IsInstanceOf.instanceOf(ClassCastException.class));
        MatcherAssert.assertThat((Object)expected.getMessage(), (Matcher)IsEqual.equalTo((Object)"ClassCastException while producing data to topic topic. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.LongSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`)."));
    }

    @Test
    public void shouldThrowInformativeStreamsExceptionOnValueAndNullKeyClassCastException() {
        StreamsException expected = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.collector.send("topic", null, (Object)"value", (Headers)new RecordHeaders(), Integer.valueOf(0), Long.valueOf(0L), (Serializer)new StringSerializer(), (Serializer)new LongSerializer()));
        MatcherAssert.assertThat((Object)expected.getCause(), (Matcher)IsInstanceOf.instanceOf(ClassCastException.class));
        MatcherAssert.assertThat((Object)expected.getMessage(), (Matcher)IsEqual.equalTo((Object)"ClassCastException while producing data to topic topic. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.LongSerializer) is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`)."));
    }

    @Test
    public void shouldThrowInformativeStreamsExceptionOnKafkaExceptionFromStreamPartitioner() {
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamProducerOnPartitionsFor((RuntimeException)new KafkaException("Kaboom!")), this.productionExceptionHandler, this.streamsMetrics);
        collector.initialize();
        StreamsException exception = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.lambda$shouldThrowInformativeStreamsExceptionOnKafkaExceptionFromStreamPartitioner$6((RecordCollector)collector));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)IsEqual.equalTo((Object)("Could not determine the number of partitions for topic 'topic' for task " + this.taskId + " due to org.apache.kafka.common.KafkaException: Kaboom!")));
    }

    @Test
    public void shouldForwardTimeoutExceptionFromStreamPartitionerWithoutWrappingIt() {
        this.shouldForwardExceptionWithoutWrappingIt(new TimeoutException("Kaboom!"));
    }

    @Test
    public void shouldForwardRuntimeExceptionFromStreamPartitionerWithoutWrappingIt() {
        this.shouldForwardExceptionWithoutWrappingIt(new RuntimeException("Kaboom!"));
    }

    private <E extends RuntimeException> void shouldForwardExceptionWithoutWrappingIt(E runtimeException) {
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamProducerOnPartitionsFor(runtimeException), this.productionExceptionHandler, this.streamsMetrics);
        collector.initialize();
        RuntimeException exception = (RuntimeException)Assert.assertThrows(runtimeException.getClass(), () -> this.lambda$shouldForwardExceptionWithoutWrappingIt$7((RecordCollector)collector));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)IsEqual.equalTo((Object)"Kaboom!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnSubsequentSendWhenProducerFencedInCallback() {
        this.testThrowTaskMigratedExceptionOnSubsequentSend((RuntimeException)new ProducerFencedException("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnSubsequentSendWhenInvalidEpochInCallback() {
        this.testThrowTaskMigratedExceptionOnSubsequentSend((RuntimeException)new InvalidProducerEpochException("KABOOM!"));
    }

    private void testThrowTaskMigratedExceptionOnSubsequentSend(RuntimeException exception) {
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamsProducerOnSend(exception), this.productionExceptionHandler, this.streamsMetrics);
        collector.initialize();
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        TaskMigratedException thrown = (TaskMigratedException)Assert.assertThrows(TaskMigratedException.class, () -> this.lambda$testThrowTaskMigratedExceptionOnSubsequentSend$8((RecordCollector)collector));
        Assert.assertEquals((Object)exception, (Object)thrown.getCause());
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnSubsequentFlushWhenProducerFencedInCallback() {
        this.testThrowTaskMigratedExceptionOnSubsequentFlush((RuntimeException)new ProducerFencedException("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnSubsequentFlushWhenInvalidEpochInCallback() {
        this.testThrowTaskMigratedExceptionOnSubsequentFlush((RuntimeException)new InvalidProducerEpochException("KABOOM!"));
    }

    private void testThrowTaskMigratedExceptionOnSubsequentFlush(RuntimeException exception) {
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamsProducerOnSend(exception), this.productionExceptionHandler, this.streamsMetrics);
        collector.initialize();
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        TaskMigratedException thrown = (TaskMigratedException)Assert.assertThrows(TaskMigratedException.class, () -> ((RecordCollector)collector).flush());
        Assert.assertEquals((Object)exception, (Object)thrown.getCause());
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnSubsequentCloseWhenProducerFencedInCallback() {
        this.testThrowTaskMigratedExceptionOnSubsequentClose((RuntimeException)new ProducerFencedException("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnSubsequentCloseWhenInvalidEpochInCallback() {
        this.testThrowTaskMigratedExceptionOnSubsequentClose((RuntimeException)new InvalidProducerEpochException("KABOOM!"));
    }

    private void testThrowTaskMigratedExceptionOnSubsequentClose(RuntimeException exception) {
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamsProducerOnSend(exception), this.productionExceptionHandler, this.streamsMetrics);
        collector.initialize();
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        TaskMigratedException thrown = (TaskMigratedException)Assert.assertThrows(TaskMigratedException.class, () -> ((RecordCollector)collector).closeClean());
        Assert.assertEquals((Object)exception, (Object)thrown.getCause());
    }

    @Test
    public void shouldThrowStreamsExceptionOnSubsequentSendIfASendFailsWithDefaultExceptionHandler() {
        KafkaException exception = new KafkaException("KABOOM!");
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamsProducerOnSend((Exception)exception), this.productionExceptionHandler, this.streamsMetrics);
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.lambda$shouldThrowStreamsExceptionOnSubsequentSendIfASendFailsWithDefaultExceptionHandler$9((RecordCollector)collector));
        Assert.assertEquals((Object)exception, (Object)thrown.getCause());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"Error encountered sending record to topic topic for task 0_0 due to:\norg.apache.kafka.common.KafkaException: KABOOM!\nException handler choose to FAIL the processing, no more records would be sent."));
    }

    @Test
    public void shouldThrowStreamsExceptionOnSubsequentFlushIfASendFailsWithDefaultExceptionHandler() {
        KafkaException exception = new KafkaException("KABOOM!");
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamsProducerOnSend((Exception)exception), this.productionExceptionHandler, this.streamsMetrics);
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> ((RecordCollector)collector).flush());
        Assert.assertEquals((Object)exception, (Object)thrown.getCause());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"Error encountered sending record to topic topic for task 0_0 due to:\norg.apache.kafka.common.KafkaException: KABOOM!\nException handler choose to FAIL the processing, no more records would be sent."));
    }

    @Test
    public void shouldThrowStreamsExceptionOnSubsequentCloseIfASendFailsWithDefaultExceptionHandler() {
        KafkaException exception = new KafkaException("KABOOM!");
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamsProducerOnSend((Exception)exception), this.productionExceptionHandler, this.streamsMetrics);
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> ((RecordCollector)collector).closeClean());
        Assert.assertEquals((Object)exception, (Object)thrown.getCause());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"Error encountered sending record to topic topic for task 0_0 due to:\norg.apache.kafka.common.KafkaException: KABOOM!\nException handler choose to FAIL the processing, no more records would be sent."));
    }

    @Test
    public void shouldThrowStreamsExceptionOnSubsequentSendIfFatalEvenWithContinueExceptionHandler() {
        AuthenticationException exception = new AuthenticationException("KABOOM!");
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamsProducerOnSend((Exception)exception), (ProductionExceptionHandler)new AlwaysContinueProductionExceptionHandler(), this.streamsMetrics);
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.lambda$shouldThrowStreamsExceptionOnSubsequentSendIfFatalEvenWithContinueExceptionHandler$10((RecordCollector)collector));
        Assert.assertEquals((Object)exception, (Object)thrown.getCause());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"Error encountered sending record to topic topic for task 0_0 due to:\norg.apache.kafka.common.errors.AuthenticationException: KABOOM!\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error."));
    }

    @Test
    public void shouldThrowStreamsExceptionOnSubsequentFlushIfFatalEvenWithContinueExceptionHandler() {
        AuthenticationException exception = new AuthenticationException("KABOOM!");
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamsProducerOnSend((Exception)exception), (ProductionExceptionHandler)new AlwaysContinueProductionExceptionHandler(), this.streamsMetrics);
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> ((RecordCollector)collector).flush());
        Assert.assertEquals((Object)exception, (Object)thrown.getCause());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"Error encountered sending record to topic topic for task 0_0 due to:\norg.apache.kafka.common.errors.AuthenticationException: KABOOM!\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error."));
    }

    @Test
    public void shouldThrowStreamsExceptionOnSubsequentCloseIfFatalEvenWithContinueExceptionHandler() {
        AuthenticationException exception = new AuthenticationException("KABOOM!");
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamsProducerOnSend((Exception)exception), (ProductionExceptionHandler)new AlwaysContinueProductionExceptionHandler(), this.streamsMetrics);
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> ((RecordCollector)collector).closeClean());
        Assert.assertEquals((Object)exception, (Object)thrown.getCause());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"Error encountered sending record to topic topic for task 0_0 due to:\norg.apache.kafka.common.errors.AuthenticationException: KABOOM!\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error."));
    }

    @Test
    public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContinueExceptionHandler() {
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamsProducerOnSend(new Exception()), (ProductionExceptionHandler)new AlwaysContinueProductionExceptionHandler(), this.streamsMetrics);
        try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(RecordCollectorImpl.class);){
            collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
            collector.flush();
            List<String> messages = logCaptureAppender.getMessages();
            StringBuilder errorMessage = new StringBuilder("Messages received:");
            for (String error : messages) {
                errorMessage.append("\n - ").append(error);
            }
            Assert.assertTrue((String)errorMessage.toString(), (boolean)messages.get(messages.size() - 1).endsWith("Exception handler choose to CONTINUE processing in spite of this error but written offsets would not be recorded."));
        }
        Metric metric = (Metric)this.streamsMetrics.metrics().get(new MetricName("dropped-records-total", "stream-task-metrics", "The total number of dropped records", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)Thread.currentThread().getName()), Utils.mkEntry((Object)"task-id", (Object)this.taskId.toString())})));
        Assert.assertEquals((Object)1.0, (Object)metric.metricValue());
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
        collector.flush();
        collector.closeClean();
    }

    @Test
    public void shouldNotAbortTxnOnEOSCloseDirtyIfNothingSent() {
        final AtomicBoolean functionCalled = new AtomicBoolean(false);
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, new StreamsProducer(this.eosConfig, "threadId", (KafkaClientSupplier)new MockClientSupplier(){

            @Override
            public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
                return new MockProducer<byte[], byte[]>(RecordCollectorTest.this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)RecordCollectorTest.this.byteArraySerializer, (Serializer)RecordCollectorTest.this.byteArraySerializer){

                    public void abortTransaction() {
                        functionCalled.set(true);
                    }
                };
            }
        }, this.taskId, null, this.logContext), this.productionExceptionHandler, this.streamsMetrics);
        collector.closeDirty();
        Assert.assertFalse((boolean)functionCalled.get());
    }

    @Test
    public void shouldThrowIfTopicIsUnknownOnSendWithPartitioner() {
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, new StreamsProducer(this.config, "threadId", (KafkaClientSupplier)new MockClientSupplier(){

            @Override
            public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
                return new MockProducer<byte[], byte[]>(RecordCollectorTest.this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)RecordCollectorTest.this.byteArraySerializer, (Serializer)RecordCollectorTest.this.byteArraySerializer){

                    public List<PartitionInfo> partitionsFor(String topic) {
                        return Collections.emptyList();
                    }
                };
            }
        }, null, null, this.logContext), this.productionExceptionHandler, this.streamsMetrics);
        collector.initialize();
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.lambda$shouldThrowIfTopicIsUnknownOnSendWithPartitioner$11((RecordCollector)collector));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"Could not get partition information for topic topic for task 0_0. This can happen if the topic does not exist."));
    }

    @Test
    public void shouldNotCloseInternalProducerForEOS() {
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, new StreamsProducer(this.eosConfig, "threadId", (KafkaClientSupplier)new MockClientSupplier(){

            @Override
            public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
                return RecordCollectorTest.this.mockProducer;
            }
        }, this.taskId, null, this.logContext), this.productionExceptionHandler, this.streamsMetrics);
        collector.closeClean();
        this.streamsProducer.flush();
    }

    @Test
    public void shouldNotCloseInternalProducerForNonEOS() {
        this.collector.closeClean();
        this.streamsProducer.flush();
    }

    private StreamsProducer getExceptionalStreamsProducerOnSend(final Exception exception) {
        return new StreamsProducer(this.config, "threadId", (KafkaClientSupplier)new MockClientSupplier(){

            @Override
            public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
                return new MockProducer<byte[], byte[]>(RecordCollectorTest.this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)RecordCollectorTest.this.byteArraySerializer, (Serializer)RecordCollectorTest.this.byteArraySerializer){

                    public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> record, Callback callback) {
                        callback.onCompletion(null, exception);
                        return null;
                    }
                };
            }
        }, null, null, this.logContext);
    }

    private StreamsProducer getExceptionalStreamProducerOnPartitionsFor(final RuntimeException exception) {
        return new StreamsProducer(this.config, "threadId", (KafkaClientSupplier)new MockClientSupplier(){

            @Override
            public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
                return new MockProducer<byte[], byte[]>(RecordCollectorTest.this.cluster, true, (Partitioner)new DefaultPartitioner(), (Serializer)RecordCollectorTest.this.byteArraySerializer, (Serializer)RecordCollectorTest.this.byteArraySerializer){

                    public synchronized List<PartitionInfo> partitionsFor(String topic) {
                        throw exception;
                    }
                };
            }
        }, null, null, this.logContext);
    }

    private /* synthetic */ void lambda$shouldThrowIfTopicIsUnknownOnSendWithPartitioner$11(RecordCollector collector) throws Throwable {
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
    }

    private /* synthetic */ void lambda$shouldThrowStreamsExceptionOnSubsequentSendIfFatalEvenWithContinueExceptionHandler$10(RecordCollector collector) throws Throwable {
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
    }

    private /* synthetic */ void lambda$shouldThrowStreamsExceptionOnSubsequentSendIfASendFailsWithDefaultExceptionHandler$9(RecordCollector collector) throws Throwable {
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
    }

    private /* synthetic */ void lambda$testThrowTaskMigratedExceptionOnSubsequentSend$8(RecordCollector collector) throws Throwable {
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
    }

    private /* synthetic */ void lambda$shouldForwardExceptionWithoutWrappingIt$7(RecordCollector collector) throws Throwable {
        collector.send("topic", (Object)"0", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
    }

    private /* synthetic */ void lambda$shouldThrowInformativeStreamsExceptionOnKafkaExceptionFromStreamPartitioner$6(RecordCollector collector) throws Throwable {
        collector.send("topic", (Object)"0", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, this.streamPartitioner);
    }

    private static class CustomStringSerializer
    extends StringSerializer {
        private boolean isKey;

        private CustomStringSerializer() {
        }

        public void configure(Map<String, ?> configs, boolean isKey) {
            this.isKey = isKey;
            super.configure(configs, isKey);
        }

        public byte[] serialize(String topic, Headers headers, String data) {
            if (this.isKey) {
                headers.add((Header)new RecordHeader("key", "key".getBytes()));
            } else {
                headers.add((Header)new RecordHeader("value", "value".getBytes()));
            }
            return this.serialize(topic, data);
        }
    }
}

