/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.NoOpProcessorContext;
import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StreamTaskTest {
    private final Serializer<Integer> intSerializer = new IntegerSerializer();
    private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
    private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
    private final String[] topic1 = new String[]{"topic1"};
    private final String[] topic2 = new String[]{"topic2"};
    private final TopicPartition partition1 = new TopicPartition(this.topic1[0], 1);
    private final TopicPartition partition2 = new TopicPartition(this.topic2[0], 1);
    private final Set<TopicPartition> partitions = Utils.mkSet((Object[])new TopicPartition[]{this.partition1, this.partition2});
    private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<Integer, Integer>(this.topic1, this.intDeserializer, this.intDeserializer);
    private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<Integer, Integer>(this.topic2, this.intDeserializer, this.intDeserializer);
    private final MockProcessorNode<Integer, Integer> processorStreamTime = new MockProcessorNode(10L);
    private final MockProcessorNode<Integer, Integer> processorSystemTime = new MockProcessorNode(10L, PunctuationType.WALL_CLOCK_TIME);
    private final ProcessorTopology topology = new ProcessorTopology(Arrays.asList(new ProcessorNode[]{this.source1, this.source2, this.processorStreamTime, this.processorSystemTime}), (Map)new HashMap<String, SourceNode>(){
        {
            this.put(StreamTaskTest.this.topic1[0], StreamTaskTest.this.source1);
            this.put(StreamTaskTest.this.topic2[0], StreamTaskTest.this.source2);
        }
    }, Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), Collections.emptyList());
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
    private final MockProducer<byte[], byte[]> producer = new MockProducer(false, this.bytesSerializer, this.bytesSerializer);
    private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
    private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
    private final StoreChangelogReader changelogReader = new StoreChangelogReader(this.restoreStateConsumer, this.stateRestoreListener, new LogContext("stream-task-test "));
    private final byte[] recordValue = this.intSerializer.serialize(null, (Object)10);
    private final byte[] recordKey = this.intSerializer.serialize(null, (Object)1);
    private final String applicationId = "applicationId";
    private final Metrics metrics = new Metrics();
    private final StreamsMetrics streamsMetrics = new MockStreamsMetrics(this.metrics);
    private final TaskId taskId00 = new TaskId(0, 0);
    private final MockTime time = new MockTime();
    private File baseDir = TestUtils.tempDirectory();
    private StateDirectory stateDirectory;
    private StreamsConfig config;
    private StreamsConfig eosConfig;
    private StreamTask task;
    private long punctuatedAt;
    private Punctuator punctuator = new Punctuator(){

        public void punctuate(long timestamp) {
            StreamTaskTest.this.punctuatedAt = timestamp;
        }
    };

    private StreamsConfig createConfig(final boolean enableEoS) throws IOException {
        return new StreamsConfig((Map)new Properties(){
            {
                this.setProperty("application.id", "stream-task-test");
                this.setProperty("bootstrap.servers", "localhost:2171");
                this.setProperty("buffered.records.per.partition", "3");
                this.setProperty("state.dir", StreamTaskTest.this.baseDir.getCanonicalPath());
                this.setProperty("default.timestamp.extractor", MockTimestampExtractor.class.getName());
                if (enableEoS) {
                    this.setProperty("processing.guarantee", "exactly_once");
                }
            }
        });
    }

    @Before
    public void setup() throws IOException {
        this.consumer.assign(Arrays.asList(this.partition1, this.partition2));
        this.source1.addChild(this.processorStreamTime);
        this.source2.addChild(this.processorStreamTime);
        this.source1.addChild(this.processorSystemTime);
        this.source2.addChild(this.processorSystemTime);
        this.config = this.createConfig(false);
        this.eosConfig = this.createConfig(true);
        this.stateDirectory = new StateDirectory("applicationId", this.baseDir.getPath(), (Time)new MockTime());
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, this.producer);
        this.task.initializeStateStores();
        this.task.initializeTopology();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @After
    public void cleanup() throws IOException {
        block6: {
            try {
                if (this.task == null) break block6;
                try {
                    this.task.close(true, false);
                }
                catch (IllegalStateException canHappen) {
                    if (!"There is no open transaction.".equals(canHappen.getMessage())) {
                        throw canHappen;
                    }
                }
            }
            finally {
                Utils.delete((File)this.baseDir);
            }
        }
    }

    @Test
    public void testProcessOrder() {
        this.task.addRecords(this.partition1, this.records(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 10L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 30L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.addRecords(this.partition2, this.records(new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 25L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 35L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 45L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)5L, (long)this.task.numBuffered());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)4L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)3L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)2L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)1L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)2L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)0L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)3L, (long)this.source2.numReceived);
    }

    private void testSpecificMetrics(String operation, String groupName, Map<String, String> tags) {
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName(operation + "-latency-avg", groupName, "The average latency of " + operation + " operation.", tags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName(operation + "-latency-max", groupName, "The max latency of " + operation + " operation.", tags)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName(operation + "-rate", groupName, "The average number of occurrence of " + operation + " operation per second.", tags)));
    }

    @Test
    public void testMetrics() {
        String name = this.task.id().toString();
        LinkedHashMap<String, String> metricTags = new LinkedHashMap<String, String>();
        metricTags.put("task-id", name);
        String operation = "commit";
        String groupName = "stream-task-metrics";
        Assert.assertNotNull((Object)this.metrics.getSensor("commit"));
        this.testSpecificMetrics("commit", "stream-task-metrics", metricTags);
        metricTags.put("task-id", "all");
        this.testSpecificMetrics("commit", "stream-task-metrics", metricTags);
    }

    @Test
    public void testPauseResume() {
        this.task.addRecords(this.partition1, this.records(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 10L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.addRecords(this.partition2, this.records(new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 35L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 45L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 55L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 65L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertEquals((long)1L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        this.task.addRecords(this.partition1, this.records(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 30L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 40L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 50L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        Assert.assertEquals((long)2L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition1));
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertEquals((long)1L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertEquals((long)1L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertEquals((long)0L, (long)this.consumer.paused().size());
    }

    @Test
    public void testMaybePunctuateStreamTime() {
        this.task.addRecords(this.partition1, this.records(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 32L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 40L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 60L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.addRecords(this.partition2, this.records(new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 25L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 35L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 45L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 61L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)8L, (long)this.task.numBuffered());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)7L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)6L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)5L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)4L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)2L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)3L, (long)this.task.numBuffered());
        Assert.assertEquals((long)4L, (long)this.source1.numReceived);
        Assert.assertEquals((long)2L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)2L, (long)this.task.numBuffered());
        Assert.assertEquals((long)4L, (long)this.source1.numReceived);
        Assert.assertEquals((long)3L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)1L, (long)this.task.numBuffered());
        Assert.assertEquals((long)5L, (long)this.source1.numReceived);
        Assert.assertEquals((long)3L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)0L, (long)this.task.numBuffered());
        Assert.assertEquals((long)5L, (long)this.source1.numReceived);
        Assert.assertEquals((long)4L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.process());
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        this.processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 0L, 20L, 32L, 40L, 60L);
    }

    @Test
    public void shouldPunctuateOnceStreamTimeAfterGap() {
        this.task.addRecords(this.partition1, this.records(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 142L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 155L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 160L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.addRecords(this.partition2, this.records(new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 25L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 145L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 159L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 161L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)7L, (long)this.task.numBuffered());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)6L, (long)this.task.numBuffered());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)5L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)4L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)2L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)3L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)2L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)2L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)3L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)1L, (long)this.task.numBuffered());
        Assert.assertEquals((long)4L, (long)this.source1.numReceived);
        Assert.assertEquals((long)3L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertEquals((long)0L, (long)this.task.numBuffered());
        Assert.assertEquals((long)4L, (long)this.source1.numReceived);
        Assert.assertEquals((long)4L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.process());
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        this.processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L, 142L, 155L, 160L);
    }

    @Test
    public void testCancelPunctuateStreamTime() {
        this.task.addRecords(this.partition1, this.records(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 30L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 40L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.addRecords(this.partition2, this.records(new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 25L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 35L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 45L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process());
        this.processorStreamTime.supplier.scheduleCancellable.cancel();
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        this.processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L);
    }

    @Test
    public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
        long now = this.time.milliseconds();
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(9L);
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(1L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(20L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10L, now + 20L, now + 30L, now + 50L);
    }

    @Test
    public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() {
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(9L);
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, new long[0]);
    }

    @Test
    public void shouldPunctuateOnceSystemTimeAfterGap() {
        long now = this.time.milliseconds();
        this.time.sleep(100L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(12L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(7L);
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(1L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(105L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(5L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 100L, now + 110L, now + 122L, now + 130L, now + 235L, now + 240L);
    }

    @Test
    public void testCancelPunctuateSystemTime() {
        long now = this.time.milliseconds();
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.supplier.scheduleCancellable.cancel();
        this.time.sleep(10L);
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10L);
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() {
        final MockSourceNode processorNode = new MockSourceNode(this.topic1, this.intDeserializer, this.intDeserializer){

            public void process(Object key, Object value) {
                throw new KafkaException("KABOOM!");
            }
        };
        List<4> processorNodes = Collections.singletonList(processorNode);
        HashMap sourceNodes = new HashMap(){
            {
                this.put(StreamTaskTest.this.topic1[0], processorNode);
                this.put(StreamTaskTest.this.topic2[0], processorNode);
            }
        };
        ProcessorTopology topology = new ProcessorTopology(processorNodes, (Map)sourceNodes, Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), Collections.emptyList());
        this.task.close(true, false);
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, topology, this.consumer, (ChangelogReader)this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, this.producer);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        int offset = 20;
        this.task.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        try {
            this.task.process();
            Assert.fail((String)"Should've thrown StreamsException");
        }
        catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue((String)("message=" + message + " should contain topic"), (boolean)message.contains("topic=" + this.topic1[0]));
            Assert.assertTrue((String)("message=" + message + " should contain partition"), (boolean)message.contains("partition=" + this.partition1.partition()));
            Assert.assertTrue((String)("message=" + message + " should contain offset"), (boolean)message.contains("offset=20"));
            Assert.assertTrue((String)("message=" + message + " should contain processor"), (boolean)message.contains("processor=" + processorNode.name()));
        }
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingDeprecated() {
        AbstractProcessor processor = new AbstractProcessor(){

            public void init(ProcessorContext context) {
                context.schedule(1L);
            }

            public void process(Object key, Object value) {
            }

            public void punctuate(long timestamp) {
                throw new KafkaException("KABOOM!");
            }
        };
        ProcessorNode punctuator = new ProcessorNode("test", (Processor)processor, Collections.emptySet());
        punctuator.init((ProcessorContext)new NoOpProcessorContext());
        try {
            this.task.punctuate(punctuator, 1L, PunctuationType.STREAM_TIME, new Punctuator((Processor)processor){
                final /* synthetic */ Processor val$processor;
                {
                    this.val$processor = processor;
                }

                public void punctuate(long timestamp) {
                    this.val$processor.punctuate(timestamp);
                }
            });
            Assert.fail((String)"Should've thrown StreamsException");
        }
        catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue((String)("message=" + message + " should contain processor"), (boolean)message.contains("processor 'test'"));
            MatcherAssert.assertThat((Object)((ProcessorContextImpl)this.task.processorContext()).currentNode(), (Matcher)CoreMatchers.nullValue());
        }
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime() {
        AbstractProcessor processor = new AbstractProcessor(){

            public void init(ProcessorContext context) {
            }

            public void process(Object key, Object value) {
            }

            public void punctuate(long timestamp) {
            }
        };
        ProcessorNode punctuator = new ProcessorNode("test", (Processor)processor, Collections.emptySet());
        punctuator.init((ProcessorContext)new NoOpProcessorContext());
        try {
            this.task.punctuate(punctuator, 1L, PunctuationType.STREAM_TIME, new Punctuator(){

                public void punctuate(long timestamp) {
                    throw new KafkaException("KABOOM!");
                }
            });
            Assert.fail((String)"Should've thrown StreamsException");
        }
        catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue((String)("message=" + message + " should contain processor"), (boolean)message.contains("processor 'test'"));
            MatcherAssert.assertThat((Object)((ProcessorContextImpl)this.task.processorContext()).currentNode(), (Matcher)CoreMatchers.nullValue());
        }
    }

    @Test
    public void shouldFlushRecordCollectorOnFlushState() {
        final AtomicBoolean flushed = new AtomicBoolean(false);
        MockStreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
        StreamTask streamTask = new StreamTask(this.taskId00, "appId", this.partitions, this.topology, (Consumer)this.consumer, (ChangelogReader)this.changelogReader, this.config, (StreamsMetrics)streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)this.producer){

            RecordCollector createRecordCollector(LogContext logContext) {
                return new NoOpRecordCollector(){

                    @Override
                    public void flush() {
                        flushed.set(true);
                    }
                };
            }
        };
        streamTask.flushState();
        Assert.assertTrue((boolean)flushed.get());
    }

    @Test
    public void shouldCheckpointOffsetsOnCommit() throws IOException {
        String storeName = "test";
        String changelogTopic = ProcessorStateManager.storeChangelogTopic((String)"appId", (String)"test");
        InMemoryKeyValueStore inMemoryStore = new InMemoryKeyValueStore("test", null, null){

            public void init(ProcessorContext context, StateStore root) {
                context.register(root, false, null);
            }

            public boolean persistent() {
                return true;
            }
        };
        HashMap sourceByTopics = new HashMap(){
            {
                this.put(StreamTaskTest.this.partition1.topic(), StreamTaskTest.this.source1);
                this.put(StreamTaskTest.this.partition2.topic(), StreamTaskTest.this.source2);
            }
        };
        ProcessorTopology topology = new ProcessorTopology(Collections.emptyList(), (Map)sourceByTopics, Collections.emptyMap(), Collections.singletonList(inMemoryStore), Collections.singletonMap("test", changelogTopic), Collections.emptyList());
        final TopicPartition partition = new TopicPartition(changelogTopic, 0);
        this.restoreStateConsumer.updatePartitions(changelogTopic, Collections.singletonList(new PartitionInfo(changelogTopic, 0, null, new Node[0], new Node[0])));
        this.restoreStateConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
        this.restoreStateConsumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L));
        long offset = 543L;
        StreamTask streamTask = new StreamTask(this.taskId00, "appId", this.partitions, topology, (Consumer)this.consumer, (ChangelogReader)this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)this.producer){

            RecordCollector createRecordCollector(LogContext logContext) {
                return new NoOpRecordCollector(){

                    @Override
                    public Map<TopicPartition, Long> offsets() {
                        return Collections.singletonMap(partition, 543L);
                    }
                };
            }
        };
        streamTask.initializeStateStores();
        streamTask.initializeTopology();
        this.time.sleep(this.config.getLong("commit.interval.ms").longValue());
        streamTask.commit();
        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.stateDirectory.directoryForTask(this.taskId00), ".checkpoint"));
        MatcherAssert.assertThat((Object)checkpoint.read(), (Matcher)CoreMatchers.equalTo(Collections.singletonMap(partition, 544L)));
    }

    @Test
    public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() {
        Map properties = this.config.originals();
        properties.put("processing.guarantee", "exactly_once");
        StreamsConfig testConfig = new StreamsConfig(properties);
        String storeName = "test";
        String changelogTopic = ProcessorStateManager.storeChangelogTopic((String)"appId", (String)"test");
        InMemoryKeyValueStore inMemoryStore = new InMemoryKeyValueStore("test", null, null){

            public void init(ProcessorContext context, StateStore root) {
                context.register(root, false, null);
            }

            public boolean persistent() {
                return true;
            }
        };
        HashMap sourceByTopics = new HashMap(){
            {
                this.put(StreamTaskTest.this.partition1.topic(), StreamTaskTest.this.source1);
                this.put(StreamTaskTest.this.partition2.topic(), StreamTaskTest.this.source2);
            }
        };
        ProcessorTopology topology = new ProcessorTopology(Collections.emptyList(), (Map)sourceByTopics, Collections.emptyMap(), Collections.singletonList(inMemoryStore), Collections.singletonMap("test", changelogTopic), Collections.emptyList());
        final TopicPartition partition = new TopicPartition(changelogTopic, 0);
        this.restoreStateConsumer.updatePartitions(changelogTopic, Collections.singletonList(new PartitionInfo(changelogTopic, 0, null, new Node[0], new Node[0])));
        this.restoreStateConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
        this.restoreStateConsumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L));
        long offset = 543L;
        StreamTask streamTask = new StreamTask(this.taskId00, "appId", this.partitions, topology, (Consumer)this.consumer, (ChangelogReader)this.changelogReader, testConfig, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)this.producer){

            RecordCollector createRecordCollector(LogContext logContext) {
                return new NoOpRecordCollector(){

                    @Override
                    public Map<TopicPartition, Long> offsets() {
                        return Collections.singletonMap(partition, 543L);
                    }
                };
            }
        };
        streamTask.initializeTopology();
        this.time.sleep(testConfig.getLong("commit.interval.ms").longValue());
        streamTask.commit();
        File checkpointFile = new File(this.stateDirectory.directoryForTask(this.taskId00), ".checkpoint");
        Assert.assertFalse((boolean)checkpointFile.exists());
    }

    @Test
    public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() {
        ((ProcessorContextImpl)this.task.processorContext()).setCurrentNode(this.processorStreamTime);
        try {
            this.task.punctuate(this.processorStreamTime, 10L, PunctuationType.STREAM_TIME, this.punctuator);
            Assert.fail((String)"Should throw illegal state exception as current node is not null");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void shouldCallPunctuateOnPassedInProcessorNode() {
        this.task.punctuate(this.processorStreamTime, 5L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat((Object)this.punctuatedAt, (Matcher)CoreMatchers.equalTo((Object)5L));
        this.task.punctuate(this.processorStreamTime, 10L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat((Object)this.punctuatedAt, (Matcher)CoreMatchers.equalTo((Object)10L));
    }

    @Test
    public void shouldSetProcessorNodeOnContextBackToNullAfterSuccesfullPunctuate() {
        this.task.punctuate(this.processorStreamTime, 5L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat((Object)((ProcessorContextImpl)this.task.processorContext()).currentNode(), (Matcher)CoreMatchers.nullValue());
    }

    @Test(expected=IllegalStateException.class)
    public void shouldThrowIllegalStateExceptionOnScheduleIfCurrentNodeIsNull() {
        this.task.schedule(1L, PunctuationType.STREAM_TIME, new Punctuator(){

            public void punctuate(long timestamp) {
            }
        });
    }

    @Test
    public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() {
        ((ProcessorContextImpl)this.task.processorContext()).setCurrentNode(this.processorStreamTime);
        this.task.schedule(1L, PunctuationType.STREAM_TIME, new Punctuator(){

            public void punctuate(long timestamp) {
            }
        });
    }

    @Test
    public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology() {
        this.task.close(true, false);
        this.task = this.createTaskThatThrowsExceptionOnClose();
        this.task.initializeStateStores();
        this.task.initializeTopology();
        try {
            this.task.close(true, false);
            Assert.fail((String)"should have thrown runtime exception");
        }
        catch (RuntimeException e) {
            this.task = null;
        }
        Assert.assertTrue((boolean)this.processorStreamTime.closed);
        Assert.assertTrue((boolean)this.source1.closed);
        Assert.assertTrue((boolean)this.source2.closed);
    }

    @Test
    public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() {
        MockProducer producer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, this.eosConfig, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)producer);
        this.task.initializeTopology();
        Assert.assertTrue((boolean)producer.transactionInitialized());
        Assert.assertTrue((boolean)producer.transactionInFlight());
    }

    @Test
    public void shouldNotThrowOnCloseIfTaskWasNotInitializedWithEosEnabled() {
        MockProducer producer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, this.eosConfig, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)producer);
        this.task.close(false, false);
        this.task = null;
    }

    @Test
    public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() {
        MockProducer producer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)producer);
        Assert.assertFalse((boolean)producer.transactionInitialized());
        Assert.assertFalse((boolean)producer.transactionInFlight());
    }

    @Test
    public void shouldSendOffsetsAndCommitTransactionButNotStartNewTransactionOnSuspendIfEosEnabled() {
        MockProducer producer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, this.eosConfig, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)producer);
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.process();
        this.task.suspend();
        Assert.assertTrue((boolean)producer.sentOffsets());
        Assert.assertTrue((boolean)producer.transactionCommitted());
        Assert.assertFalse((boolean)producer.transactionInFlight());
    }

    @Test
    public void shouldCommitTransactionOnSuspendEvenIfTransactionIsEmptyIfEosEnabled() {
        MockProducer producer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, this.eosConfig, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)producer);
        this.task.initializeTopology();
        this.task.suspend();
        Assert.assertTrue((boolean)producer.transactionCommitted());
        Assert.assertFalse((boolean)producer.transactionInFlight());
    }

    @Test
    public void shouldNotSendOffsetsAndCommitTransactionNorStartNewTransactionOnSuspendIfEosDisabled() {
        MockProducer producer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)producer);
        this.task.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.process();
        this.task.suspend();
        Assert.assertFalse((boolean)producer.sentOffsets());
        Assert.assertFalse((boolean)producer.transactionCommitted());
        Assert.assertFalse((boolean)producer.transactionInFlight());
    }

    @Test
    public void shouldStartNewTransactionOnResumeIfEosEnabled() {
        MockProducer producer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, this.eosConfig, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)producer);
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.process();
        this.task.suspend();
        this.task.resume();
        this.task.initializeTopology();
        Assert.assertTrue((boolean)producer.transactionInFlight());
    }

    @Test
    public void shouldNotStartNewTransactionOnResumeIfEosDisabled() {
        MockProducer producer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)producer);
        this.task.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.process();
        this.task.suspend();
        this.task.resume();
        Assert.assertFalse((boolean)producer.transactionInFlight());
    }

    @Test
    public void shouldStartNewTransactionOnCommitIfEosEnabled() {
        MockProducer producer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, this.eosConfig, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)producer);
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.process();
        this.task.commit();
        Assert.assertTrue((boolean)producer.transactionInFlight());
    }

    @Test
    public void shouldNotStartNewTransactionOnCommitIfEosDisabled() {
        MockProducer producer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)producer);
        this.task.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
        this.task.process();
        this.task.commit();
        Assert.assertFalse((boolean)producer.transactionInFlight());
    }

    @Test
    public void shouldAbortTransactionOnDirtyClosedIfEosEnabled() {
        MockProducer producer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, this.eosConfig, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)producer);
        this.task.initializeTopology();
        this.task.close(false, false);
        this.task = null;
        Assert.assertTrue((boolean)producer.transactionAborted());
    }

    @Test
    public void shouldNotAbortTransactionOnZombieClosedIfEosEnabled() throws Exception {
        MockProducer producer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, this.eosConfig, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)producer);
        this.task.close(false, true);
        this.task = null;
        Assert.assertFalse((boolean)producer.transactionAborted());
    }

    @Test
    public void shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() {
        MockProducer producer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)producer);
        this.task.close(false, false);
        Assert.assertFalse((boolean)producer.transactionAborted());
    }

    @Test
    public void shouldCloseProducerOnCloseWhenEosEnabled() {
        MockProducer producer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, this.eosConfig, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)producer);
        this.task.initializeTopology();
        this.task.close(true, false);
        this.task = null;
        Assert.assertTrue((boolean)producer.closed());
    }

    @Test
    public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushStateWhenCommitting() {
        MockProducer producer = new MockProducer();
        Consumer consumer = (Consumer)EasyMock.createStrictMock(Consumer.class);
        EasyMock.expect((Object)consumer.committed((TopicPartition)EasyMock.anyObject(TopicPartition.class))).andStubReturn((Object)new OffsetAndMetadata(1L));
        EasyMock.replay((Object[])new Object[]{consumer});
        StreamTask task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, consumer, (ChangelogReader)this.changelogReader, this.eosConfig, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)producer){

            protected void flushState() {
                throw new RuntimeException("KABOOM!");
            }
        };
        try {
            task.commit();
            Assert.fail((String)"should have thrown an exception");
        }
        catch (Exception e) {
            // empty catch block
        }
        EasyMock.verify((Object[])new Object[]{consumer});
    }

    @Test
    public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension() {
        MockProducer producer = new MockProducer();
        Consumer consumer = (Consumer)EasyMock.createStrictMock(Consumer.class);
        EasyMock.expect((Object)consumer.committed((TopicPartition)EasyMock.anyObject(TopicPartition.class))).andStubReturn((Object)new OffsetAndMetadata(1L));
        EasyMock.replay((Object[])new Object[]{consumer});
        MockSourceNode sourceNode = new MockSourceNode(this.topic1, this.intDeserializer, this.intDeserializer){

            @Override
            public void close() {
                throw new RuntimeException("KABOOM!");
            }
        };
        ProcessorTopology topology = new ProcessorTopology(Collections.singletonList(sourceNode), Collections.singletonMap(this.topic1[0], sourceNode), Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), Collections.emptyList());
        StreamTask task = new StreamTask(this.taskId00, "applicationId", (Collection)Utils.mkSet((Object[])new TopicPartition[]{this.partition1}), topology, consumer, (ChangelogReader)this.changelogReader, this.eosConfig, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)producer);
        task.initializeStateStores();
        task.initializeTopology();
        try {
            task.suspend();
            Assert.fail((String)"should have thrown an exception");
        }
        catch (Exception e) {
            // empty catch block
        }
        EasyMock.verify((Object[])new Object[]{consumer});
    }

    @Test
    public void shouldCloseStateManagerIfFailureOnTaskClose() {
        final AtomicBoolean stateManagerCloseCalled = new AtomicBoolean(false);
        StreamTask streamTask = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, (Consumer)this.consumer, (ChangelogReader)this.changelogReader, this.eosConfig, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)new MockProducer()){

            void suspend(boolean val) {
                throw new RuntimeException("KABOOM!");
            }

            void closeStateManager(boolean writeCheckpoint) throws ProcessorStateException {
                stateManagerCloseCalled.set(true);
            }
        };
        try {
            streamTask.close(true, false);
            Assert.fail((String)"should have thrown an exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertTrue((boolean)stateManagerCloseCalled.get());
    }

    @Test
    public void shouldNotCloseTopologyProcessorNodesIfNotInitialized() {
        StreamTask task = this.createTaskThatThrowsExceptionOnClose();
        try {
            task.close(true, false);
        }
        catch (Exception e) {
            Assert.fail((String)"should have not closed unitialized topology");
        }
    }

    @Test
    public void shouldBeInitializedIfChangelogPartitionsIsEmpty() {
        ProcessorTopology topology = new ProcessorTopology(Collections.singletonList(this.source1), Collections.singletonMap(this.topic1[0], this.source1), Collections.emptyMap(), Collections.singletonList(new MockStateStoreSupplier.MockStateStore("store", false)), Collections.emptyMap(), Collections.emptyList());
        StreamTask task = new StreamTask(this.taskId00, "applicationId", (Collection)Utils.mkSet((Object[])new TopicPartition[]{this.partition1}), topology, this.consumer, (ChangelogReader)this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, this.producer);
        Assert.assertTrue((boolean)task.initializeStateStores());
    }

    @Test
    public void shouldNotBeInitializedIfChangelogPartitionsIsNonEmpty() {
        ProcessorTopology topology = new ProcessorTopology(Collections.singletonList(this.source1), Collections.singletonMap(this.topic1[0], this.source1), Collections.emptyMap(), Collections.singletonList(new MockStateStoreSupplier.MockStateStore("store", false)), Collections.singletonMap("store", "changelog"), Collections.emptyList());
        StreamTask task = new StreamTask(this.taskId00, "applicationId", (Collection)Utils.mkSet((Object[])new TopicPartition[]{this.partition1}), topology, this.consumer, (ChangelogReader)this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, this.producer);
        Assert.assertFalse((boolean)task.initializeStateStores());
    }

    @Test
    public void shouldThrowOnCleanCloseTaskWhenEosEnabledIfTransactionInFlight() {
        MockProducer producer = new MockProducer();
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, this.eosConfig, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)producer);
        try {
            this.task.close(true, false);
            Assert.fail((String)"should have throw IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        this.task = null;
        Assert.assertTrue((boolean)producer.closed());
    }

    @Test
    public void shouldAlwaysCommitIfEosEnabled() {
        MockProducer producer = new MockProducer();
        final RecordCollectorImpl recordCollector = new RecordCollectorImpl((Producer)producer, "StreamTask", new LogContext("StreamTaskTest "));
        this.task = new StreamTask(this.taskId00, "applicationId", this.partitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, this.eosConfig, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, (Producer)producer);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.punctuate(this.processorSystemTime, 5L, PunctuationType.WALL_CLOCK_TIME, new Punctuator(){

            public void punctuate(long timestamp) {
                recordCollector.send("result-topic1", (Object)3, (Object)5, Integer.valueOf(0), Long.valueOf(StreamTaskTest.this.time.milliseconds()), (Serializer)new IntegerSerializer(), (Serializer)new IntegerSerializer());
            }
        });
        this.task.commit();
        Assert.assertEquals((long)1L, (long)producer.history().size());
    }

    private StreamTask createTaskThatThrowsExceptionOnClose() {
        final MockSourceNode processorNode = new MockSourceNode(this.topic1, this.intDeserializer, this.intDeserializer){

            @Override
            public void close() {
                throw new RuntimeException("KABOOM!");
            }
        };
        List<ProcessorNode> processorNodes = Arrays.asList(new ProcessorNode[]{processorNode, this.processorStreamTime, this.source1, this.source2});
        HashMap sourceNodes = new HashMap(){
            {
                this.put(StreamTaskTest.this.topic1[0], processorNode);
                this.put(StreamTaskTest.this.topic2[0], processorNode);
            }
        };
        ProcessorTopology topology = new ProcessorTopology(processorNodes, (Map)sourceNodes, Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), Collections.emptyList());
        return new StreamTask(this.taskId00, "applicationId", this.partitions, topology, this.consumer, (ChangelogReader)this.changelogReader, this.config, this.streamsMetrics, this.stateDirectory, null, (Time)this.time, this.producer);
    }

    private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]> ... recs) {
        return Arrays.asList(recs);
    }
}

