/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StreamsProducer;
import org.apache.kafka.test.MockClientSupplier;
import org.easymock.EasyMock;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StreamsProducerTest {
    private final LogContext logContext = new LogContext("test ");
    private final String topic = "topic";
    private final Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), Collections.singletonList(new PartitionInfo("topic", 0, Node.noNode(), new Node[0], new Node[0])), Collections.emptySet(), Collections.emptySet());
    private final StreamsConfig nonEosConfig = new StreamsConfig(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)"appId"), Utils.mkEntry((Object)"bootstrap.servers", (Object)"dummy:1234")}));
    private final StreamsConfig eosAlphaConfig = new StreamsConfig(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)"appId"), Utils.mkEntry((Object)"bootstrap.servers", (Object)"dummy:1234"), Utils.mkEntry((Object)"processing.guarantee", (Object)"exactly_once")}));
    private final StreamsConfig eosBetaConfig = new StreamsConfig(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)"appId"), Utils.mkEntry((Object)"bootstrap.servers", (Object)"dummy:1234"), Utils.mkEntry((Object)"processing.guarantee", (Object)"exactly_once_beta")}));
    final Producer<byte[], byte[]> mockedProducer = (Producer)EasyMock.mock(Producer.class);
    final KafkaClientSupplier clientSupplier = new MockClientSupplier(){

        @Override
        public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
            return StreamsProducerTest.this.mockedProducer;
        }
    };
    final StreamsProducer streamsProducerWithMock = new StreamsProducer(this.nonEosConfig, "threadId", this.clientSupplier, null, null, this.logContext);
    final StreamsProducer eosAlphaStreamsProducerWithMock = new StreamsProducer(this.eosAlphaConfig, "threadId", this.clientSupplier, new TaskId(0, 0), null, this.logContext);
    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
    private StreamsProducer nonEosStreamsProducer;
    private MockProducer<byte[], byte[]> nonEosMockProducer;
    private final MockClientSupplier eosAlphaMockClientSupplier = new MockClientSupplier();
    private StreamsProducer eosAlphaStreamsProducer;
    private MockProducer<byte[], byte[]> eosAlphaMockProducer;
    private final MockClientSupplier eosBetaMockClientSupplier = new MockClientSupplier();
    private StreamsProducer eosBetaStreamsProducer;
    private MockProducer<byte[], byte[]> eosBetaMockProducer;
    private final ProducerRecord<byte[], byte[]> record = new ProducerRecord("topic", Integer.valueOf(0), Long.valueOf(0L), (Object)new byte[0], (Object)new byte[0], (Iterable)new RecordHeaders());
    private final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("topic", 0), (Object)new OffsetAndMetadata(0L, null))});

    @Before
    public void before() {
        this.mockClientSupplier.setCluster(this.cluster);
        this.nonEosStreamsProducer = new StreamsProducer(this.nonEosConfig, "threadId-StreamThread-0", (KafkaClientSupplier)this.mockClientSupplier, null, null, this.logContext);
        this.nonEosMockProducer = this.mockClientSupplier.producers.get(0);
        this.eosAlphaMockClientSupplier.setCluster(this.cluster);
        this.eosAlphaMockClientSupplier.setApplicationIdForProducer("appId");
        this.eosAlphaStreamsProducer = new StreamsProducer(this.eosAlphaConfig, "threadId-StreamThread-0", (KafkaClientSupplier)this.eosAlphaMockClientSupplier, new TaskId(0, 0), null, this.logContext);
        this.eosAlphaStreamsProducer.initTransaction();
        this.eosAlphaMockProducer = this.eosAlphaMockClientSupplier.producers.get(0);
        this.eosBetaMockClientSupplier.setCluster(this.cluster);
        this.eosBetaMockClientSupplier.setApplicationIdForProducer("appId");
        this.eosBetaStreamsProducer = new StreamsProducer(this.eosBetaConfig, "threadId-StreamThread-0", (KafkaClientSupplier)this.eosBetaMockClientSupplier, null, UUID.randomUUID(), this.logContext);
        this.eosBetaStreamsProducer.initTransaction();
        this.eosBetaMockProducer = this.eosBetaMockClientSupplier.producers.get(0);
    }

    @Test
    public void shouldCreateProducer() {
        MatcherAssert.assertThat((Object)this.mockClientSupplier.producers.size(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat((Object)this.eosAlphaMockClientSupplier.producers.size(), (Matcher)Matchers.is((Object)1));
    }

    @Test
    public void shouldForwardCallToPartitionsFor() {
        List expectedPartitionInfo = Collections.emptyList();
        EasyMock.expect((Object)this.mockedProducer.partitionsFor("topic")).andReturn(expectedPartitionInfo);
        EasyMock.replay((Object[])new Object[]{this.mockedProducer});
        List partitionInfo = this.streamsProducerWithMock.partitionsFor("topic");
        MatcherAssert.assertThat((Object)partitionInfo, (Matcher)Matchers.sameInstance(expectedPartitionInfo));
        EasyMock.verify((Object[])new Object[]{this.mockedProducer});
    }

    @Test
    public void shouldForwardCallToFlush() {
        this.mockedProducer.flush();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.mockedProducer});
        this.streamsProducerWithMock.flush();
        EasyMock.verify((Object[])new Object[]{this.mockedProducer});
    }

    @Test
    public void shouldForwardCallToMetrics() {
        HashMap metrics = new HashMap();
        EasyMock.expect((Object)this.mockedProducer.metrics()).andReturn(metrics);
        EasyMock.replay((Object[])new Object[]{this.mockedProducer});
        Assert.assertSame(metrics, (Object)this.streamsProducerWithMock.metrics());
        EasyMock.verify((Object[])new Object[]{this.mockedProducer});
    }

    @Test
    public void shouldForwardCallToClose() {
        this.mockedProducer.close();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.mockedProducer});
        this.streamsProducerWithMock.close();
        EasyMock.verify((Object[])new Object[]{this.mockedProducer});
    }

    @Test
    public void shouldFailIfStreamsConfigIsNull() {
        NullPointerException thrown = (NullPointerException)Assert.assertThrows(NullPointerException.class, () -> new StreamsProducer(null, "threadId", (KafkaClientSupplier)this.mockClientSupplier, new TaskId(0, 0), UUID.randomUUID(), this.logContext));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"config cannot be null"));
    }

    @Test
    public void shouldFailIfThreadIdIsNull() {
        NullPointerException thrown = (NullPointerException)Assert.assertThrows(NullPointerException.class, () -> new StreamsProducer(this.nonEosConfig, null, (KafkaClientSupplier)this.mockClientSupplier, new TaskId(0, 0), UUID.randomUUID(), this.logContext));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"threadId cannot be null"));
    }

    @Test
    public void shouldFailIfClientSupplierIsNull() {
        NullPointerException thrown = (NullPointerException)Assert.assertThrows(NullPointerException.class, () -> new StreamsProducer(this.nonEosConfig, "threadId", null, new TaskId(0, 0), UUID.randomUUID(), this.logContext));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"clientSupplier cannot be null"));
    }

    @Test
    public void shouldFailIfLogContextIsNull() {
        NullPointerException thrown = (NullPointerException)Assert.assertThrows(NullPointerException.class, () -> new StreamsProducer(this.nonEosConfig, "threadId", (KafkaClientSupplier)this.mockClientSupplier, new TaskId(0, 0), UUID.randomUUID(), null));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"logContext cannot be null"));
    }

    @Test
    public void shouldFailOnResetProducerForAtLeastOnce() {
        IllegalStateException thrown = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> this.nonEosStreamsProducer.resetProducer());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Exactly-once beta is not enabled [test]"));
    }

    @Test
    public void shouldFailOnResetProducerForExactlyOnceAlpha() {
        IllegalStateException thrown = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> this.eosAlphaStreamsProducer.resetProducer());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Exactly-once beta is not enabled [test]"));
    }

    @Test
    public void shouldNotSetTransactionIdIfEosDisable() {
        StreamsConfig mockConfig = (StreamsConfig)EasyMock.mock(StreamsConfig.class);
        EasyMock.expect((Object)mockConfig.getProducerConfigs("threadId-producer")).andReturn(EasyMock.mock(Map.class));
        EasyMock.expect((Object)mockConfig.getString("processing.guarantee")).andReturn((Object)"at_least_once").anyTimes();
        EasyMock.replay((Object[])new Object[]{mockConfig});
        new StreamsProducer(mockConfig, "threadId", (KafkaClientSupplier)this.mockClientSupplier, null, null, this.logContext);
    }

    @Test
    public void shouldNotHaveEosEnabledIfEosDisabled() {
        MatcherAssert.assertThat((Object)this.nonEosStreamsProducer.eosEnabled(), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldNotInitTxIfEosDisable() {
        MatcherAssert.assertThat((Object)this.nonEosMockProducer.transactionInitialized(), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldNotBeginTxOnSendIfEosDisable() {
        this.nonEosStreamsProducer.send(this.record, null);
        MatcherAssert.assertThat((Object)this.nonEosMockProducer.transactionInFlight(), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldForwardRecordOnSend() {
        this.nonEosStreamsProducer.send(this.record, null);
        MatcherAssert.assertThat((Object)this.nonEosMockProducer.history().size(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat(this.nonEosMockProducer.history().get(0), (Matcher)Matchers.is(this.record));
    }

    @Test
    public void shouldFailOnInitTxIfEosDisabled() {
        IllegalStateException thrown = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> ((StreamsProducer)this.nonEosStreamsProducer).initTransaction());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Exactly-once is not enabled [test]"));
    }

    @Test
    public void shouldThrowStreamsExceptionOnSendError() {
        this.nonEosMockProducer.sendException = new KafkaException("KABOOM!");
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.nonEosStreamsProducer.send(this.record, null));
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.is((Object)this.nonEosMockProducer.sendException));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Error encountered trying to send record to topic topic [test]"));
    }

    @Test
    public void shouldFailOnSendFatal() {
        this.nonEosMockProducer.sendException = new RuntimeException("KABOOM!");
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.nonEosStreamsProducer.send(this.record, null));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"KABOOM!"));
    }

    @Test
    public void shouldFailOnCommitIfEosDisabled() {
        IllegalStateException thrown = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> this.nonEosStreamsProducer.commitTransaction(null, new ConsumerGroupMetadata("appId")));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Exactly-once is not enabled [test]"));
    }

    @Test
    public void shouldFailOnAbortIfEosDisabled() {
        IllegalStateException thrown = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> ((StreamsProducer)this.nonEosStreamsProducer).abortTransaction());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Exactly-once is not enabled [test]"));
    }

    @Test
    public void shouldEnableEosIfEosAlphaEnabled() {
        MatcherAssert.assertThat((Object)this.eosAlphaStreamsProducer.eosEnabled(), (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void shouldEnableEosIfEosBetaEnabled() {
        MatcherAssert.assertThat((Object)this.eosBetaStreamsProducer.eosEnabled(), (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void shouldSetTransactionIdUsingTaskIdIfEosAlphaEnabled() {
        Map mockMap = (Map)EasyMock.mock(Map.class);
        EasyMock.expect((Object)mockMap.put("transactional.id", "appId-0_0")).andReturn(null);
        EasyMock.expect(mockMap.get("transactional.id")).andReturn((Object)"appId-0_0");
        StreamsConfig mockConfig = (StreamsConfig)EasyMock.mock(StreamsConfig.class);
        EasyMock.expect((Object)mockConfig.getProducerConfigs("threadId-0_0-producer")).andReturn((Object)mockMap);
        EasyMock.expect((Object)mockConfig.getString("application.id")).andReturn((Object)"appId");
        EasyMock.expect((Object)mockConfig.getString("processing.guarantee")).andReturn((Object)"exactly_once");
        EasyMock.replay((Object[])new Object[]{mockMap, mockConfig});
        new StreamsProducer(mockConfig, "threadId", (KafkaClientSupplier)this.eosAlphaMockClientSupplier, new TaskId(0, 0), null, this.logContext);
        EasyMock.verify((Object[])new Object[]{mockMap});
    }

    @Test
    public void shouldSetTransactionIdUsingProcessIdIfEosBetaEnable() {
        UUID processId = UUID.randomUUID();
        Map mockMap = (Map)EasyMock.mock(Map.class);
        EasyMock.expect((Object)mockMap.put("transactional.id", "appId-" + processId + "-0")).andReturn(null);
        EasyMock.expect(mockMap.get("transactional.id")).andReturn((Object)("appId-" + processId));
        StreamsConfig mockConfig = (StreamsConfig)EasyMock.mock(StreamsConfig.class);
        EasyMock.expect((Object)mockConfig.getProducerConfigs("threadId-StreamThread-0-producer")).andReturn((Object)mockMap);
        EasyMock.expect((Object)mockConfig.getString("application.id")).andReturn((Object)"appId");
        EasyMock.expect((Object)mockConfig.getString("processing.guarantee")).andReturn((Object)"exactly_once_beta").anyTimes();
        EasyMock.replay((Object[])new Object[]{mockMap, mockConfig});
        new StreamsProducer(mockConfig, "threadId-StreamThread-0", (KafkaClientSupplier)this.eosAlphaMockClientSupplier, null, processId, this.logContext);
        EasyMock.verify((Object[])new Object[]{mockMap});
    }

    @Test
    public void shouldNotHaveEosEnabledIfEosAlphaEnable() {
        MatcherAssert.assertThat((Object)this.eosAlphaStreamsProducer.eosEnabled(), (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void shouldHaveEosEnabledIfEosBetaEnabled() {
        MatcherAssert.assertThat((Object)this.eosBetaStreamsProducer.eosEnabled(), (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void shouldInitTxOnEos() {
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.transactionInitialized(), (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void shouldBeginTxOnEosSend() {
        this.eosAlphaStreamsProducer.send(this.record, null);
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.transactionInFlight(), (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void shouldContinueTxnSecondEosSend() {
        this.eosAlphaStreamsProducer.send(this.record, null);
        this.eosAlphaStreamsProducer.send(this.record, null);
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.transactionInFlight(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.uncommittedRecords().size(), (Matcher)Matchers.is((Object)2));
    }

    @Test
    public void shouldForwardRecordButNotCommitOnEosSend() {
        this.eosAlphaStreamsProducer.send(this.record, null);
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.transactionInFlight(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.history().isEmpty(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.uncommittedRecords().size(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat(this.eosAlphaMockProducer.uncommittedRecords().get(0), (Matcher)Matchers.is(this.record));
    }

    @Test
    public void shouldBeginTxOnEosCommit() {
        this.mockedProducer.initTransactions();
        this.mockedProducer.beginTransaction();
        this.mockedProducer.sendOffsetsToTransaction(this.offsetsAndMetadata, new ConsumerGroupMetadata("appId"));
        this.mockedProducer.commitTransaction();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.mockedProducer});
        this.eosAlphaStreamsProducerWithMock.initTransaction();
        this.eosAlphaStreamsProducerWithMock.commitTransaction(this.offsetsAndMetadata, new ConsumerGroupMetadata("appId"));
        EasyMock.verify((Object[])new Object[]{this.mockedProducer});
    }

    @Test
    public void shouldSendOffsetToTxOnEosCommit() {
        this.eosAlphaStreamsProducer.commitTransaction(this.offsetsAndMetadata, new ConsumerGroupMetadata("appId"));
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.sentOffsets(), (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void shouldCommitTxOnEosCommit() {
        this.eosAlphaStreamsProducer.send(this.record, null);
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.transactionInFlight(), (Matcher)Matchers.is((Object)true));
        this.eosAlphaStreamsProducer.commitTransaction(this.offsetsAndMetadata, new ConsumerGroupMetadata("appId"));
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.transactionInFlight(), (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.uncommittedRecords().isEmpty(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.uncommittedOffsets().isEmpty(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.history().size(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat(this.eosAlphaMockProducer.history().get(0), (Matcher)Matchers.is(this.record));
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.consumerGroupOffsetsHistory().size(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat(((Map)this.eosAlphaMockProducer.consumerGroupOffsetsHistory().get(0)).get("appId"), (Matcher)Matchers.is(this.offsetsAndMetadata));
    }

    @Test
    public void shouldCommitTxWithApplicationIdOnEosAlphaCommit() {
        this.mockedProducer.initTransactions();
        EasyMock.expectLastCall();
        this.mockedProducer.beginTransaction();
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.mockedProducer.send(this.record, null)).andReturn(null);
        this.mockedProducer.sendOffsetsToTransaction(null, new ConsumerGroupMetadata("appId"));
        EasyMock.expectLastCall();
        this.mockedProducer.commitTransaction();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.mockedProducer});
        this.eosAlphaStreamsProducerWithMock.initTransaction();
        this.eosAlphaStreamsProducerWithMock.send(this.record, null);
        this.eosAlphaStreamsProducerWithMock.commitTransaction(null, new ConsumerGroupMetadata("appId"));
        EasyMock.verify((Object[])new Object[]{this.mockedProducer});
    }

    @Test
    public void shouldCommitTxWithConsumerGroupMetadataOnEosBetaCommit() {
        this.mockedProducer.initTransactions();
        EasyMock.expectLastCall();
        this.mockedProducer.beginTransaction();
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.mockedProducer.send(this.record, null)).andReturn(null);
        this.mockedProducer.sendOffsetsToTransaction(null, new ConsumerGroupMetadata("appId"));
        EasyMock.expectLastCall();
        this.mockedProducer.commitTransaction();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.mockedProducer});
        StreamsProducer streamsProducer = new StreamsProducer(this.eosBetaConfig, "threadId-StreamThread-0", this.clientSupplier, null, UUID.randomUUID(), this.logContext);
        streamsProducer.initTransaction();
        streamsProducer.send(this.record, null);
        streamsProducer.commitTransaction(null, new ConsumerGroupMetadata("appId"));
        EasyMock.verify((Object[])new Object[]{this.mockedProducer});
    }

    @Test
    public void shouldAbortTxOnEosAbort() {
        this.eosAlphaStreamsProducer.send(this.record, null);
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.transactionInFlight(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.uncommittedRecords().size(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat(this.eosAlphaMockProducer.uncommittedRecords().get(0), (Matcher)Matchers.is(this.record));
        this.eosAlphaStreamsProducer.abortTransaction();
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.transactionInFlight(), (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.uncommittedRecords().isEmpty(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.uncommittedOffsets().isEmpty(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.history().isEmpty(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.consumerGroupOffsetsHistory().isEmpty(), (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void shouldSkipAbortTxOnEosAbortIfNotTxInFlight() {
        this.mockedProducer.initTransactions();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.mockedProducer});
        this.eosAlphaStreamsProducerWithMock.initTransaction();
        this.eosAlphaStreamsProducerWithMock.abortTransaction();
        EasyMock.verify((Object[])new Object[]{this.mockedProducer});
    }

    @Test
    public void shouldFailIfTaskIdIsNullForEosAlpha() {
        NullPointerException thrown = (NullPointerException)Assert.assertThrows(NullPointerException.class, () -> new StreamsProducer(this.eosAlphaConfig, "threadId", (KafkaClientSupplier)this.mockClientSupplier, null, UUID.randomUUID(), this.logContext));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"taskId cannot be null for exactly-once alpha"));
    }

    @Test
    public void shouldFailIfProcessIdNullForEosBeta() {
        NullPointerException thrown = (NullPointerException)Assert.assertThrows(NullPointerException.class, () -> new StreamsProducer(this.eosBetaConfig, "threadId", (KafkaClientSupplier)this.mockClientSupplier, new TaskId(0, 0), null, this.logContext));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"processId cannot be null for exactly-once beta"));
    }

    @Test
    public void shouldThrowTimeoutExceptionOnEosInitTxTimeout() {
        this.nonEosMockProducer.initTransactionException = new TimeoutException("KABOOM!");
        MockClientSupplier clientSupplier = new MockClientSupplier(){

            @Override
            public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
                return StreamsProducerTest.this.nonEosMockProducer;
            }
        };
        StreamsProducer streamsProducer = new StreamsProducer(this.eosAlphaConfig, "threadId", (KafkaClientSupplier)clientSupplier, new TaskId(0, 0), null, this.logContext);
        TimeoutException thrown = (TimeoutException)Assert.assertThrows(TimeoutException.class, () -> ((StreamsProducer)streamsProducer).initTransaction());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"KABOOM!"));
    }

    @Test
    public void shouldFailOnMaybeBeginTransactionIfTransactionsNotInitializedForExactlyOnceAlpha() {
        StreamsProducer streamsProducer = new StreamsProducer(this.eosAlphaConfig, "threadId", (KafkaClientSupplier)this.eosAlphaMockClientSupplier, new TaskId(0, 0), null, this.logContext);
        IllegalStateException thrown = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> streamsProducer.send(this.record, null));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"MockProducer hasn't been initialized for transactions."));
    }

    @Test
    public void shouldFailOnMaybeBeginTransactionIfTransactionsNotInitializedForExactlyOnceBeta() {
        StreamsProducer streamsProducer = new StreamsProducer(this.eosBetaConfig, "threadId-StreamThread-0", (KafkaClientSupplier)this.eosBetaMockClientSupplier, null, UUID.randomUUID(), this.logContext);
        IllegalStateException thrown = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> streamsProducer.send(this.record, null));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"MockProducer hasn't been initialized for transactions."));
    }

    @Test
    public void shouldThrowStreamsExceptionOnEosInitError() {
        this.nonEosMockProducer.initTransactionException = new KafkaException("KABOOM!");
        MockClientSupplier clientSupplier = new MockClientSupplier(){

            @Override
            public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
                return StreamsProducerTest.this.nonEosMockProducer;
            }
        };
        StreamsProducer streamsProducer = new StreamsProducer(this.eosAlphaConfig, "threadId", (KafkaClientSupplier)clientSupplier, new TaskId(0, 0), null, this.logContext);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> ((StreamsProducer)streamsProducer).initTransaction());
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.is((Object)this.nonEosMockProducer.initTransactionException));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Error encountered trying to initialize transactions [test]"));
    }

    @Test
    public void shouldFailOnEosInitFatal() {
        this.nonEosMockProducer.initTransactionException = new RuntimeException("KABOOM!");
        MockClientSupplier clientSupplier = new MockClientSupplier(){

            @Override
            public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
                return StreamsProducerTest.this.nonEosMockProducer;
            }
        };
        StreamsProducer streamsProducer = new StreamsProducer(this.eosAlphaConfig, "threadId", (KafkaClientSupplier)clientSupplier, new TaskId(0, 0), null, this.logContext);
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> ((StreamsProducer)streamsProducer).initTransaction());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigrateExceptionOnEosBeginTxnFenced() {
        this.eosAlphaMockProducer.fenceProducer();
        TaskMigratedException thrown = (TaskMigratedException)Assert.assertThrows(TaskMigratedException.class, () -> this.eosAlphaStreamsProducer.send(null, null));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Producer got fenced trying to begin a new transaction [test]; it means all tasks belonging to this thread should be migrated."));
    }

    @Test
    public void shouldThrowTaskMigrateExceptionOnEosBeginTxnError() {
        this.eosAlphaMockProducer.beginTransactionException = new KafkaException("KABOOM!");
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.eosAlphaStreamsProducer.send(null, null));
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.is((Object)this.eosAlphaMockProducer.beginTransactionException));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Error encountered trying to begin a new transaction [test]"));
    }

    @Test
    public void shouldFailOnEosBeginTxnFatal() {
        this.eosAlphaMockProducer.beginTransactionException = new RuntimeException("KABOOM!");
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.eosAlphaStreamsProducer.send(null, null));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnEosSendProducerFenced() {
        this.testThrowTaskMigratedExceptionOnEosSend((RuntimeException)new ProducerFencedException("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnEosSendInvalidEpoch() {
        this.testThrowTaskMigratedExceptionOnEosSend((RuntimeException)new InvalidProducerEpochException("KABOOM!"));
    }

    private void testThrowTaskMigratedExceptionOnEosSend(RuntimeException exception) {
        this.eosAlphaMockProducer.sendException = new KafkaException((Throwable)exception);
        TaskMigratedException thrown = (TaskMigratedException)Assert.assertThrows(TaskMigratedException.class, () -> this.eosAlphaStreamsProducer.send(this.record, null));
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.is((Object)exception));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Producer got fenced trying to send a record [test]; it means all tasks belonging to this thread should be migrated."));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnEosSendUnknownPid() {
        UnknownProducerIdException exception = new UnknownProducerIdException("KABOOM!");
        this.eosAlphaMockProducer.sendException = new KafkaException((Throwable)exception);
        TaskMigratedException thrown = (TaskMigratedException)Assert.assertThrows(TaskMigratedException.class, () -> this.eosAlphaStreamsProducer.send(this.record, null));
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.is((Object)exception));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Producer got fenced trying to send a record [test]; it means all tasks belonging to this thread should be migrated."));
    }

    @Test
    public void shouldThrowTaskMigrateExceptionOnEosSendOffsetProducerFenced() {
        this.testThrowTaskMigrateExceptionOnEosSendOffset((RuntimeException)new ProducerFencedException("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigrateExceptionOnEosSendOffsetInvalidEpoch() {
        this.testThrowTaskMigrateExceptionOnEosSendOffset((RuntimeException)new InvalidProducerEpochException("KABOOM!"));
    }

    private void testThrowTaskMigrateExceptionOnEosSendOffset(RuntimeException exception) {
        this.eosAlphaMockProducer.sendOffsetsToTransactionException = exception;
        TaskMigratedException thrown = (TaskMigratedException)Assert.assertThrows(TaskMigratedException.class, () -> this.eosAlphaStreamsProducer.commitTransaction(null, new ConsumerGroupMetadata("appId")));
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.is((Object)this.eosAlphaMockProducer.sendOffsetsToTransactionException));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Producer got fenced trying to commit a transaction [test]; it means all tasks belonging to this thread should be migrated."));
    }

    @Test
    public void shouldThrowStreamsExceptionOnEosSendOffsetError() {
        this.eosAlphaMockProducer.sendOffsetsToTransactionException = new KafkaException("KABOOM!");
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.eosAlphaStreamsProducer.commitTransaction(null, new ConsumerGroupMetadata("appId")));
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.is((Object)this.eosAlphaMockProducer.sendOffsetsToTransactionException));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Error encountered trying to commit a transaction [test]"));
    }

    @Test
    public void shouldFailOnEosSendOffsetFatal() {
        this.eosAlphaMockProducer.sendOffsetsToTransactionException = new RuntimeException("KABOOM!");
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.eosAlphaStreamsProducer.commitTransaction(null, new ConsumerGroupMetadata("appId")));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnEosCommitWithProducerFenced() {
        this.testThrowTaskMigratedExceptionOnEos((RuntimeException)new ProducerFencedException("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnEosCommitWithInvalidEpoch() {
        this.testThrowTaskMigratedExceptionOnEos((RuntimeException)new InvalidProducerEpochException("KABOOM!"));
    }

    private void testThrowTaskMigratedExceptionOnEos(RuntimeException exception) {
        this.eosAlphaMockProducer.commitTransactionException = exception;
        TaskMigratedException thrown = (TaskMigratedException)Assert.assertThrows(TaskMigratedException.class, () -> this.eosAlphaStreamsProducer.commitTransaction(this.offsetsAndMetadata, new ConsumerGroupMetadata("appId")));
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.sentOffsets(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.is((Object)this.eosAlphaMockProducer.commitTransactionException));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Producer got fenced trying to commit a transaction [test]; it means all tasks belonging to this thread should be migrated."));
    }

    @Test
    public void shouldThrowStreamsExceptionOnEosCommitTxTimeout() {
        this.eosAlphaMockProducer.commitTransactionException = new TimeoutException("KABOOM!");
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.eosAlphaStreamsProducer.commitTransaction(this.offsetsAndMetadata, new ConsumerGroupMetadata("appId")));
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.sentOffsets(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.is((Object)this.eosAlphaMockProducer.commitTransactionException));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Timed out trying to commit a transaction [test]"));
    }

    @Test
    public void shouldThrowStreamsExceptionOnEosCommitTxError() {
        this.eosAlphaMockProducer.commitTransactionException = new KafkaException("KABOOM!");
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.eosAlphaStreamsProducer.commitTransaction(this.offsetsAndMetadata, new ConsumerGroupMetadata("appId")));
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.sentOffsets(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.is((Object)this.eosAlphaMockProducer.commitTransactionException));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Error encountered trying to commit a transaction [test]"));
    }

    @Test
    public void shouldFailOnEosCommitTxFatal() {
        this.eosAlphaMockProducer.commitTransactionException = new RuntimeException("KABOOM!");
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> this.eosAlphaStreamsProducer.commitTransaction(this.offsetsAndMetadata, new ConsumerGroupMetadata("appId")));
        MatcherAssert.assertThat((Object)this.eosAlphaMockProducer.sentOffsets(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"KABOOM!"));
    }

    @Test
    public void shouldSwallowExceptionOnEosAbortTxProducerFenced() {
        this.testSwallowExceptionOnEosAbortTx((RuntimeException)new ProducerFencedException("KABOOM!"));
    }

    @Test
    public void shouldSwallowExceptionOnEosAbortTxInvalidEpoch() {
        this.testSwallowExceptionOnEosAbortTx((RuntimeException)new InvalidProducerEpochException("KABOOM!"));
    }

    private void testSwallowExceptionOnEosAbortTx(RuntimeException exception) {
        this.mockedProducer.initTransactions();
        this.mockedProducer.beginTransaction();
        EasyMock.expect((Object)this.mockedProducer.send(this.record, null)).andReturn(null);
        this.mockedProducer.abortTransaction();
        EasyMock.expectLastCall().andThrow((Throwable)exception);
        EasyMock.replay((Object[])new Object[]{this.mockedProducer});
        this.eosAlphaStreamsProducerWithMock.initTransaction();
        this.eosAlphaStreamsProducerWithMock.send(this.record, null);
        this.eosAlphaStreamsProducerWithMock.abortTransaction();
        EasyMock.verify((Object[])new Object[]{this.mockedProducer});
    }

    @Test
    public void shouldThrowStreamsExceptionOnEosAbortTxError() {
        this.eosAlphaMockProducer.abortTransactionException = new KafkaException("KABOOM!");
        this.eosAlphaStreamsProducer.send(this.record, null);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> ((StreamsProducer)this.eosAlphaStreamsProducer).abortTransaction());
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.is((Object)this.eosAlphaMockProducer.abortTransactionException));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Error encounter trying to abort a transaction [test]"));
    }

    @Test
    public void shouldFailOnEosAbortTxFatal() {
        this.eosAlphaMockProducer.abortTransactionException = new RuntimeException("KABOOM!");
        this.eosAlphaStreamsProducer.send(this.record, null);
        RuntimeException thrown = (RuntimeException)Assert.assertThrows(RuntimeException.class, () -> ((StreamsProducer)this.eosAlphaStreamsProducer).abortTransaction());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"KABOOM!"));
    }

    @Test
    public void shouldCloseExistingProducerOnResetProducer() {
        this.eosBetaStreamsProducer.resetProducer();
        Assert.assertTrue((boolean)this.eosBetaMockProducer.closed());
    }

    @Test
    public void shouldSetNewProducerOnResetProducer() {
        this.eosBetaStreamsProducer.resetProducer();
        MatcherAssert.assertThat((Object)this.eosBetaMockClientSupplier.producers.size(), (Matcher)Matchers.is((Object)2));
        MatcherAssert.assertThat((Object)this.eosBetaStreamsProducer.kafkaProducer(), (Matcher)Matchers.is(this.eosBetaMockClientSupplier.producers.get(1)));
    }

    @Test
    public void shouldResetTransactionInitializedOnResetProducer() {
        StreamsProducer streamsProducer = new StreamsProducer(this.eosBetaConfig, "threadId-StreamThread-0", this.clientSupplier, null, UUID.randomUUID(), this.logContext);
        streamsProducer.initTransaction();
        EasyMock.reset((Object[])new Object[]{this.mockedProducer});
        this.mockedProducer.close();
        this.mockedProducer.initTransactions();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.mockedProducer});
        streamsProducer.resetProducer();
        streamsProducer.initTransaction();
        EasyMock.verify((Object[])new Object[]{this.mockedProducer});
    }
}

