/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogRegister;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.MockChangelogReader;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockRestoreCallback;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(value=EasyMockRunner.class)
public class ProcessorStateManagerTest {
    private final String applicationId = "test-application";
    private final TaskId taskId = new TaskId(0, 1, "My-Topology");
    private final String persistentStoreName = "persistentStore";
    private final String persistentStoreTwoName = "persistentStore2";
    private final String nonPersistentStoreName = "nonPersistentStore";
    private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"persistentStore", (String)this.taskId.topologyName());
    private final String persistentStoreTwoTopicName = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"persistentStore2", (String)this.taskId.topologyName());
    private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"nonPersistentStore", (String)this.taskId.topologyName());
    private final MockKeyValueStore persistentStore = new MockKeyValueStore("persistentStore", true);
    private final MockKeyValueStore persistentStoreTwo = new MockKeyValueStore("persistentStore2", true);
    private final MockKeyValueStore nonPersistentStore = new MockKeyValueStore("nonPersistentStore", false);
    private final TopicPartition persistentStorePartition = new TopicPartition(this.persistentStoreTopicName, 1);
    private final TopicPartition persistentStoreTwoPartition = new TopicPartition(this.persistentStoreTwoTopicName, 1);
    private final TopicPartition nonPersistentStorePartition = new TopicPartition(this.nonPersistentStoreTopicName, 1);
    private final TopicPartition irrelevantPartition = new TopicPartition("other-topic", 1);
    private final Integer key = 1;
    private final String value = "the-value";
    private final byte[] keyBytes = new byte[]{0, 0, 0, 1};
    private final byte[] valueBytes = "the-value".getBytes(StandardCharsets.UTF_8);
    private final ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord(this.persistentStoreTopicName, 1, 100L, (Object)this.keyBytes, (Object)this.valueBytes);
    private final MockChangelogReader changelogReader = new MockChangelogReader();
    private final LogContext logContext = new LogContext("process-state-manager-test ");
    private final StateRestoreCallback noopStateRestoreCallback = (k, v) -> {};
    private File baseDir;
    private File checkpointFile;
    private OffsetCheckpoint checkpoint;
    private StateDirectory stateDirectory;
    @Mock(type=MockType.NICE)
    private StateStore store;
    @Mock(type=MockType.NICE)
    private ProcessorStateManager.StateStoreMetadata storeMetadata;
    @Mock(type=MockType.NICE)
    private InternalProcessorContext context;

    @Before
    public void setup() {
        this.baseDir = TestUtils.tempDirectory();
        this.stateDirectory = new StateDirectory(new StreamsConfig((Map)new Properties(){
            {
                this.put("application.id", "test-application");
                this.put("bootstrap.servers", "dummy:1234");
                this.put("state.dir", ProcessorStateManagerTest.this.baseDir.getPath());
            }
        }), (Time)new MockTime(), true, true);
        this.checkpointFile = new File(this.stateDirectory.getOrCreateDirectoryForTask(this.taskId), ".checkpoint");
        this.checkpoint = new OffsetCheckpoint(this.checkpointFile);
        EasyMock.expect((Object)this.storeMetadata.changelogPartition()).andReturn((Object)this.persistentStorePartition).anyTimes();
        EasyMock.expect((Object)this.storeMetadata.store()).andReturn((Object)this.store).anyTimes();
        EasyMock.expect((Object)this.store.name()).andReturn((Object)"persistentStore").anyTimes();
        EasyMock.replay((Object[])new Object[]{this.storeMetadata, this.store});
    }

    @After
    public void cleanup() throws IOException {
        Utils.delete((File)this.baseDir);
    }

    @Test
    public void shouldReturnDefaultChangelogTopicName() {
        String applicationId = "appId";
        String storeName = "store";
        MatcherAssert.assertThat((Object)ProcessorStateManager.storeChangelogTopic((String)"appId", (String)"store", null), (Matcher)Is.is((Object)"appId-store-changelog"));
    }

    @Test
    public void shouldReturnDefaultChangelogTopicNameWithNamedTopology() {
        String applicationId = "appId";
        String namedTopology = "namedTopology";
        String storeName = "store";
        MatcherAssert.assertThat((Object)ProcessorStateManager.storeChangelogTopic((String)"appId", (String)"store", (String)"namedTopology"), (Matcher)Is.is((Object)"appId-namedTopology-store-changelog"));
    }

    @Test
    public void shouldReturnBaseDir() {
        ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE);
        Assert.assertEquals((Object)this.stateDirectory.getOrCreateDirectoryForTask(this.taskId), (Object)stateMgr.baseDir());
    }

    @Test
    public void shouldReportTaskType() {
        ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.STANDBY);
        Assert.assertEquals((Object)Task.TaskType.STANDBY, (Object)stateMgr.taskType());
        stateMgr = this.getStateManager(Task.TaskType.ACTIVE);
        Assert.assertEquals((Object)Task.TaskType.ACTIVE, (Object)stateMgr.taskType());
    }

    @Test
    public void shouldReportChangelogAsSource() {
        ProcessorStateManager stateMgr = new ProcessorStateManager(this.taskId, Task.TaskType.STANDBY, false, this.logContext, this.stateDirectory, (ChangelogRegister)this.changelogReader, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"persistentStore", (Object)this.persistentStoreTopicName), Utils.mkEntry((Object)"persistentStore2", (Object)this.persistentStoreTwoTopicName), Utils.mkEntry((Object)"nonPersistentStore", (Object)this.nonPersistentStoreTopicName)}), (Collection)Utils.mkSet((Object[])new TopicPartition[]{this.persistentStorePartition, this.nonPersistentStorePartition}));
        Assert.assertTrue((boolean)stateMgr.changelogAsSource(this.persistentStorePartition));
        Assert.assertTrue((boolean)stateMgr.changelogAsSource(this.nonPersistentStorePartition));
        Assert.assertFalse((boolean)stateMgr.changelogAsSource(this.persistentStoreTwoPartition));
    }

    @Test
    public void shouldFindSingleStoreForChangelog() {
        ProcessorStateManager stateMgr = new ProcessorStateManager(this.taskId, Task.TaskType.STANDBY, false, this.logContext, this.stateDirectory, (ChangelogRegister)this.changelogReader, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"persistentStore", (Object)this.persistentStoreTopicName), Utils.mkEntry((Object)"persistentStore2", (Object)this.persistentStoreTopicName)}), Collections.emptySet());
        stateMgr.registerStore((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
        stateMgr.registerStore((StateStore)this.persistentStoreTwo, this.persistentStore.stateRestoreCallback);
        Assert.assertThrows(IllegalStateException.class, () -> stateMgr.updateChangelogOffsets(Collections.singletonMap(this.persistentStorePartition, 0L)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldRestoreStoreWithRestoreCallback() {
        MockRestoreCallback restoreCallback = new MockRestoreCallback();
        KeyValue expectedKeyValue = KeyValue.pair((Object)this.keyBytes, (Object)this.valueBytes);
        try (ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE);){
            stateMgr.registerStore((StateStore)this.persistentStore, (StateRestoreCallback)restoreCallback);
            ProcessorStateManager.StateStoreMetadata storeMetadata = stateMgr.storeMetadata(this.persistentStorePartition);
            MatcherAssert.assertThat((Object)storeMetadata, (Matcher)CoreMatchers.notNullValue());
            stateMgr.restore(storeMetadata, Collections.singletonList(this.consumerRecord));
            MatcherAssert.assertThat((Object)restoreCallback.restored.size(), (Matcher)Is.is((Object)1));
            Assert.assertTrue((boolean)restoreCallback.restored.contains(expectedKeyValue));
            Assert.assertEquals(Collections.singletonMap(this.persistentStorePartition, 101L), (Object)stateMgr.changelogOffsets());
        }
    }

    @Test
    public void shouldRestoreNonTimestampedStoreWithNoConverter() {
        try (ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE);){
            stateMgr.registerStore((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
            ProcessorStateManager.StateStoreMetadata storeMetadata = stateMgr.storeMetadata(this.persistentStorePartition);
            MatcherAssert.assertThat((Object)storeMetadata, (Matcher)CoreMatchers.notNullValue());
            stateMgr.restore(storeMetadata, Collections.singletonList(this.consumerRecord));
            MatcherAssert.assertThat((Object)this.persistentStore.keys.size(), (Matcher)Is.is((Object)1));
            Assert.assertTrue((boolean)this.persistentStore.keys.contains(this.key));
            Assert.assertEquals((long)9L, (long)this.persistentStore.values.get(0).length);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldRestoreTimestampedStoreWithConverter() {
        ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE);
        MockKeyValueStore store = this.getConverterStore();
        try {
            stateMgr.registerStore((StateStore)store, store.stateRestoreCallback);
            ProcessorStateManager.StateStoreMetadata storeMetadata = stateMgr.storeMetadata(this.persistentStorePartition);
            MatcherAssert.assertThat((Object)storeMetadata, (Matcher)CoreMatchers.notNullValue());
            stateMgr.restore(storeMetadata, Collections.singletonList(this.consumerRecord));
            MatcherAssert.assertThat((Object)store.keys.size(), (Matcher)Is.is((Object)1));
            Assert.assertTrue((boolean)store.keys.contains(this.key));
            Assert.assertEquals((long)17L, (long)store.values.get(0).length);
        }
        finally {
            stateMgr.close();
        }
    }

    @Test
    public void shouldUnregisterChangelogsDuringClose() {
        ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE);
        EasyMock.reset((Object[])new Object[]{this.storeMetadata});
        StateStore store = (StateStore)EasyMock.createMock(StateStore.class);
        EasyMock.expect((Object)this.storeMetadata.changelogPartition()).andStubReturn((Object)this.persistentStorePartition);
        EasyMock.expect((Object)this.storeMetadata.store()).andStubReturn((Object)store);
        EasyMock.expect((Object)store.name()).andStubReturn((Object)"persistentStore");
        this.context.uninitialize();
        store.init((StateStoreContext)this.context, store);
        EasyMock.replay((Object[])new Object[]{this.storeMetadata, this.context, store});
        stateMgr.registerStateStores(Collections.singletonList(store), this.context);
        EasyMock.verify((Object[])new Object[]{this.context, store});
        stateMgr.registerStore(store, this.noopStateRestoreCallback);
        Assert.assertTrue((boolean)this.changelogReader.isPartitionRegistered(this.persistentStorePartition));
        EasyMock.reset((Object[])new Object[]{store});
        EasyMock.expect((Object)store.name()).andStubReturn((Object)"persistentStore");
        store.close();
        EasyMock.replay((Object[])new Object[]{store});
        stateMgr.close();
        EasyMock.verify((Object[])new Object[]{store});
        Assert.assertFalse((boolean)this.changelogReader.isPartitionRegistered(this.persistentStorePartition));
    }

    @Test
    public void shouldRecycleStoreAndReregisterChangelog() {
        ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE);
        EasyMock.reset((Object[])new Object[]{this.storeMetadata});
        StateStore store = (StateStore)EasyMock.createMock(StateStore.class);
        EasyMock.expect((Object)this.storeMetadata.changelogPartition()).andStubReturn((Object)this.persistentStorePartition);
        EasyMock.expect((Object)this.storeMetadata.store()).andStubReturn((Object)store);
        EasyMock.expect((Object)store.name()).andStubReturn((Object)"persistentStore");
        this.context.uninitialize();
        store.init((StateStoreContext)this.context, store);
        EasyMock.replay((Object[])new Object[]{this.storeMetadata, this.context, store});
        stateMgr.registerStateStores(Collections.singletonList(store), this.context);
        EasyMock.verify((Object[])new Object[]{this.context, store});
        stateMgr.registerStore(store, this.noopStateRestoreCallback);
        Assert.assertTrue((boolean)this.changelogReader.isPartitionRegistered(this.persistentStorePartition));
        stateMgr.recycle();
        Assert.assertFalse((boolean)this.changelogReader.isPartitionRegistered(this.persistentStorePartition));
        MatcherAssert.assertThat((Object)stateMgr.getStore("persistentStore"), (Matcher)CoreMatchers.equalTo((Object)store));
        EasyMock.reset((Object[])new Object[]{this.context, store});
        this.context.uninitialize();
        EasyMock.expect((Object)store.name()).andStubReturn((Object)"persistentStore");
        EasyMock.replay((Object[])new Object[]{this.context, store});
        stateMgr.registerStateStores(Collections.singletonList(store), this.context);
        EasyMock.verify((Object[])new Object[]{this.context, store});
        Assert.assertTrue((boolean)this.changelogReader.isPartitionRegistered(this.persistentStorePartition));
    }

    @Test
    public void shouldRegisterPersistentStores() {
        try (ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE);){
            stateMgr.registerStore((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
            Assert.assertTrue((boolean)this.changelogReader.isPartitionRegistered(this.persistentStorePartition));
        }
    }

    @Test
    public void shouldRegisterNonPersistentStore() {
        try (ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE);){
            stateMgr.registerStore((StateStore)this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
            Assert.assertTrue((boolean)this.changelogReader.isPartitionRegistered(this.nonPersistentStorePartition));
        }
    }

    @Test
    public void shouldNotRegisterNonLoggedStore() {
        try (ProcessorStateManager stateMgr = new ProcessorStateManager(this.taskId, Task.TaskType.STANDBY, false, this.logContext, this.stateDirectory, (ChangelogRegister)this.changelogReader, Collections.emptyMap(), Collections.emptySet());){
            stateMgr.registerStore((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
            Assert.assertFalse((boolean)this.changelogReader.isPartitionRegistered(this.persistentStorePartition));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldInitializeOffsetsFromCheckpointFile() throws IOException {
        long checkpointOffset = 10L;
        Map offsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.persistentStorePartition, (Object)10L), Utils.mkEntry((Object)this.nonPersistentStorePartition, (Object)10L), Utils.mkEntry((Object)this.irrelevantPartition, (Object)999L)});
        this.checkpoint.write(offsets);
        try (ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE);){
            stateMgr.registerStore((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateMgr.registerStore((StateStore)this.persistentStoreTwo, this.persistentStoreTwo.stateRestoreCallback);
            stateMgr.registerStore((StateStore)this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
            stateMgr.initializeStoreOffsetsFromCheckpoint(true);
            Assert.assertTrue((boolean)this.checkpointFile.exists());
            Assert.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.persistentStorePartition, this.persistentStoreTwoPartition, this.nonPersistentStorePartition}), (Object)stateMgr.changelogPartitions());
            Assert.assertEquals((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.persistentStorePartition, (Object)11L), Utils.mkEntry((Object)this.persistentStoreTwoPartition, (Object)0L), Utils.mkEntry((Object)this.nonPersistentStorePartition, (Object)0L)}), (Object)stateMgr.changelogOffsets());
            Assert.assertNull((Object)stateMgr.storeMetadata(this.irrelevantPartition));
            Assert.assertNull((Object)stateMgr.storeMetadata(this.persistentStoreTwoPartition).offset());
            MatcherAssert.assertThat((Object)stateMgr.storeMetadata(this.persistentStorePartition).offset(), (Matcher)CoreMatchers.equalTo((Object)10L));
            Assert.assertNull((Object)stateMgr.storeMetadata(this.nonPersistentStorePartition).offset());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldInitializeOffsetsFromCheckpointFileAndDeleteIfEOSEnabled() throws IOException {
        long checkpointOffset = 10L;
        Map offsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.persistentStorePartition, (Object)10L), Utils.mkEntry((Object)this.nonPersistentStorePartition, (Object)10L), Utils.mkEntry((Object)this.irrelevantPartition, (Object)999L)});
        this.checkpoint.write(offsets);
        try (ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE, true);){
            stateMgr.registerStore((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateMgr.registerStore((StateStore)this.persistentStoreTwo, this.persistentStoreTwo.stateRestoreCallback);
            stateMgr.registerStore((StateStore)this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
            stateMgr.initializeStoreOffsetsFromCheckpoint(true);
            Assert.assertFalse((boolean)this.checkpointFile.exists());
            Assert.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.persistentStorePartition, this.persistentStoreTwoPartition, this.nonPersistentStorePartition}), (Object)stateMgr.changelogPartitions());
            Assert.assertEquals((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.persistentStorePartition, (Object)11L), Utils.mkEntry((Object)this.persistentStoreTwoPartition, (Object)0L), Utils.mkEntry((Object)this.nonPersistentStorePartition, (Object)0L)}), (Object)stateMgr.changelogOffsets());
            Assert.assertNull((Object)stateMgr.storeMetadata(this.irrelevantPartition));
            Assert.assertNull((Object)stateMgr.storeMetadata(this.persistentStoreTwoPartition).offset());
            MatcherAssert.assertThat((Object)stateMgr.storeMetadata(this.persistentStorePartition).offset(), (Matcher)CoreMatchers.equalTo((Object)10L));
            Assert.assertNull((Object)stateMgr.storeMetadata(this.nonPersistentStorePartition).offset());
        }
    }

    @Test
    public void shouldGetRegisteredStore() {
        try (ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE);){
            stateMgr.registerStore((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateMgr.registerStore((StateStore)this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
            Assert.assertNull((Object)stateMgr.getStore("noSuchStore"));
            Assert.assertEquals((Object)this.persistentStore, (Object)stateMgr.getStore("persistentStore"));
            Assert.assertEquals((Object)this.nonPersistentStore, (Object)stateMgr.getStore("nonPersistentStore"));
        }
    }

    @Test
    public void shouldGetChangelogPartitionForRegisteredStore() {
        ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE);
        stateMgr.registerStore((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
        TopicPartition changelogPartition = stateMgr.registeredChangelogPartitionFor("persistentStore");
        MatcherAssert.assertThat((Object)changelogPartition.topic(), (Matcher)Is.is((Object)this.persistentStoreTopicName));
        MatcherAssert.assertThat((Object)changelogPartition.partition(), (Matcher)Is.is((Object)this.taskId.partition()));
    }

    @Test
    public void shouldThrowIfStateStoreIsNotRegistered() {
        ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE);
        Assert.assertThrows((String)"State store persistentStore for which the registered changelog partition should be retrieved has not been registered", IllegalStateException.class, () -> stateMgr.registeredChangelogPartitionFor("persistentStore"));
    }

    @Test
    public void shouldThrowIfStateStoreHasLoggingDisabled() {
        ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE);
        String storeName = "store-with-logging-disabled";
        MockKeyValueStore storeWithLoggingDisabled = new MockKeyValueStore("store-with-logging-disabled", true);
        stateMgr.registerStore((StateStore)storeWithLoggingDisabled, null);
        Assert.assertThrows((String)"Registered state store store-with-logging-disabled does not have a registered changelog partition. This may happen if logging is disabled for the state store.", IllegalStateException.class, () -> stateMgr.registeredChangelogPartitionFor("store-with-logging-disabled"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldFlushCheckpointAndClose() throws IOException {
        this.checkpoint.write(Collections.emptyMap());
        HashMap<TopicPartition, Long> ackedOffsets = new HashMap<TopicPartition, Long>();
        ackedOffsets.put(this.persistentStorePartition, 123L);
        ackedOffsets.put(this.nonPersistentStorePartition, 456L);
        ackedOffsets.put(new TopicPartition("nonRegisteredTopic", 1), 789L);
        ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE);
        try {
            Assert.assertFalse((boolean)this.checkpointFile.exists());
            stateMgr.registerStore((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateMgr.registerStore((StateStore)this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
        }
        finally {
            stateMgr.flush();
            Assert.assertTrue((boolean)this.persistentStore.flushed);
            Assert.assertTrue((boolean)this.nonPersistentStore.flushed);
            MatcherAssert.assertThat((Object)this.persistentStore.getLastFlushCount(), (Matcher)Matchers.lessThan((Comparable)Integer.valueOf(this.nonPersistentStore.getLastFlushCount())));
            stateMgr.updateChangelogOffsets(ackedOffsets);
            stateMgr.checkpoint();
            Assert.assertTrue((boolean)this.checkpointFile.exists());
            Map checkpointedOffsets = this.checkpoint.read();
            MatcherAssert.assertThat((Object)checkpointedOffsets, (Matcher)Is.is(Collections.singletonMap(new TopicPartition(this.persistentStoreTopicName, 1), 123L)));
            stateMgr.close();
            Assert.assertTrue((boolean)this.persistentStore.closed);
            Assert.assertTrue((boolean)this.nonPersistentStore.closed);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldOverrideOffsetsWhenRestoreAndProcess() throws IOException {
        Map<TopicPartition, Long> offsets = Collections.singletonMap(this.persistentStorePartition, 99L);
        this.checkpoint.write(offsets);
        try (ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE);){
            stateMgr.registerStore((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateMgr.initializeStoreOffsetsFromCheckpoint(true);
            ProcessorStateManager.StateStoreMetadata storeMetadata = stateMgr.storeMetadata(this.persistentStorePartition);
            MatcherAssert.assertThat((Object)storeMetadata, (Matcher)CoreMatchers.notNullValue());
            MatcherAssert.assertThat((Object)storeMetadata.offset(), (Matcher)CoreMatchers.equalTo((Object)99L));
            stateMgr.restore(storeMetadata, Collections.singletonList(this.consumerRecord));
            MatcherAssert.assertThat((Object)storeMetadata.offset(), (Matcher)CoreMatchers.equalTo((Object)100L));
            stateMgr.updateChangelogOffsets(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.persistentStorePartition, (Object)220L), Utils.mkEntry((Object)this.irrelevantPartition, (Object)9000L)}));
            stateMgr.checkpoint();
            MatcherAssert.assertThat((Object)stateMgr.storeMetadata(this.irrelevantPartition), (Matcher)CoreMatchers.equalTo(null));
            MatcherAssert.assertThat((Object)storeMetadata.offset(), (Matcher)CoreMatchers.equalTo((Object)220L));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldWriteCheckpointForPersistentStore() throws IOException {
        try (ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE);){
            stateMgr.registerStore((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateMgr.initializeStoreOffsetsFromCheckpoint(true);
            ProcessorStateManager.StateStoreMetadata storeMetadata = stateMgr.storeMetadata(this.persistentStorePartition);
            MatcherAssert.assertThat((Object)storeMetadata, (Matcher)CoreMatchers.notNullValue());
            stateMgr.restore(storeMetadata, Collections.singletonList(this.consumerRecord));
            stateMgr.checkpoint();
            Map read = this.checkpoint.read();
            MatcherAssert.assertThat((Object)read, (Matcher)CoreMatchers.equalTo(Collections.singletonMap(this.persistentStorePartition, 100L)));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldNotWriteCheckpointForNonPersistentStore() throws IOException {
        try (ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE);){
            stateMgr.registerStore((StateStore)this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
            stateMgr.initializeStoreOffsetsFromCheckpoint(true);
            ProcessorStateManager.StateStoreMetadata storeMetadata = stateMgr.storeMetadata(this.nonPersistentStorePartition);
            MatcherAssert.assertThat((Object)storeMetadata, (Matcher)CoreMatchers.notNullValue());
            stateMgr.updateChangelogOffsets(Collections.singletonMap(this.nonPersistentStorePartition, 876L));
            stateMgr.checkpoint();
            Map read = this.checkpoint.read();
            MatcherAssert.assertThat((Object)read, (Matcher)CoreMatchers.equalTo(Collections.emptyMap()));
        }
    }

    @Test
    public void shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws IOException {
        try (ProcessorStateManager stateMgr = new ProcessorStateManager(this.taskId, Task.TaskType.STANDBY, false, this.logContext, this.stateDirectory, (ChangelogRegister)this.changelogReader, Collections.emptyMap(), Collections.emptySet());){
            stateMgr.registerStore((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateMgr.updateChangelogOffsets(Collections.singletonMap(this.persistentStorePartition, 987L));
            stateMgr.checkpoint();
            Map read = this.checkpoint.read();
            MatcherAssert.assertThat((Object)read, (Matcher)CoreMatchers.equalTo(Collections.emptyMap()));
        }
    }

    @Test
    public void shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName() {
        ProcessorStateManager stateManager = this.getStateManager(Task.TaskType.ACTIVE);
        Assert.assertThrows(IllegalArgumentException.class, () -> stateManager.registerStore((StateStore)new MockKeyValueStore(".checkpoint", true), null));
    }

    @Test
    public void shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeenRegistered() {
        ProcessorStateManager stateManager = this.getStateManager(Task.TaskType.ACTIVE);
        stateManager.registerStore((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
        Assert.assertThrows(IllegalArgumentException.class, () -> stateManager.registerStore((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback));
    }

    @Test
    public void shouldThrowProcessorStateExceptionOnFlushIfStoreThrowsAnException() {
        final RuntimeException exception = new RuntimeException("KABOOM!");
        ProcessorStateManager stateManager = this.getStateManager(Task.TaskType.ACTIVE);
        MockKeyValueStore stateStore = new MockKeyValueStore("persistentStore", true){

            @Override
            public void flush() {
                throw exception;
            }
        };
        stateManager.registerStore((StateStore)stateStore, stateStore.stateRestoreCallback);
        ProcessorStateException thrown = (ProcessorStateException)Assert.assertThrows(ProcessorStateException.class, () -> ((ProcessorStateManager)stateManager).flush());
        Assert.assertEquals((Object)exception, (Object)thrown.getCause());
    }

    @Test
    public void shouldPreserveStreamsExceptionOnFlushIfStoreThrows() {
        final StreamsException exception = new StreamsException("KABOOM!");
        ProcessorStateManager stateManager = this.getStateManager(Task.TaskType.ACTIVE);
        MockKeyValueStore stateStore = new MockKeyValueStore("persistentStore", true){

            @Override
            public void flush() {
                throw exception;
            }
        };
        stateManager.registerStore((StateStore)stateStore, stateStore.stateRestoreCallback);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> ((ProcessorStateManager)stateManager).flush());
        Assert.assertEquals((Object)((Object)exception), (Object)((Object)thrown));
    }

    @Test
    public void shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAnException() {
        final RuntimeException exception = new RuntimeException("KABOOM!");
        ProcessorStateManager stateManager = this.getStateManager(Task.TaskType.ACTIVE);
        MockKeyValueStore stateStore = new MockKeyValueStore("persistentStore", true){

            @Override
            public void close() {
                throw exception;
            }
        };
        stateManager.registerStore((StateStore)stateStore, stateStore.stateRestoreCallback);
        ProcessorStateException thrown = (ProcessorStateException)Assert.assertThrows(ProcessorStateException.class, () -> ((ProcessorStateManager)stateManager).close());
        Assert.assertEquals((Object)exception, (Object)thrown.getCause());
    }

    @Test
    public void shouldPreserveStreamsExceptionOnCloseIfStoreThrows() {
        final StreamsException exception = new StreamsException("KABOOM!");
        ProcessorStateManager stateManager = this.getStateManager(Task.TaskType.ACTIVE);
        MockKeyValueStore stateStore = new MockKeyValueStore("persistentStore", true){

            @Override
            public void close() {
                throw exception;
            }
        };
        stateManager.registerStore((StateStore)stateStore, stateStore.stateRestoreCallback);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> ((ProcessorStateManager)stateManager).close());
        Assert.assertEquals((Object)((Object)exception), (Object)((Object)thrown));
    }

    @Test
    public void shouldThrowIfRestoringUnregisteredStore() {
        ProcessorStateManager stateManager = this.getStateManager(Task.TaskType.ACTIVE);
        Assert.assertThrows(IllegalStateException.class, () -> stateManager.restore(this.storeMetadata, Collections.emptyList()));
    }

    @Test
    public void shouldLogAWarningIfCheckpointThrowsAnIOException() {
        ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE);
        stateMgr.registerStore((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
        this.stateDirectory.clean();
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(ProcessorStateManager.class);){
            stateMgr.updateChangelogOffsets(Collections.singletonMap(this.persistentStorePartition, 10L));
            stateMgr.checkpoint();
            boolean foundExpectedLogMessage = false;
            for (LogCaptureAppender.Event event : appender.getEvents()) {
                if (!"WARN".equals(event.getLevel()) || !event.getMessage().startsWith("process-state-manager-test Failed to write offset checkpoint file to [") || !event.getMessage().endsWith(".checkpoint]. This may occur if OS cleaned the state.dir in case when it located in ${java.io.tmpdir} directory. This may also occur due to running multiple instances on the same machine using the same state dir. Changing the location of state.dir may resolve the problem.") || !event.getThrowableInfo().get().startsWith("java.io.FileNotFoundException: ")) continue;
                foundExpectedLogMessage = true;
                break;
            }
            Assert.assertTrue((boolean)foundExpectedLogMessage);
        }
    }

    @Test
    public void shouldThrowIfLoadCheckpointThrows() throws Exception {
        ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE);
        stateMgr.registerStore((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
        File file = new File(stateMgr.baseDir(), ".checkpoint");
        file.createNewFile();
        FileWriter writer = new FileWriter(file);
        writer.write("abcdefg");
        writer.close();
        try {
            stateMgr.initializeStoreOffsetsFromCheckpoint(true);
            Assert.fail((String)"should have thrown processor state exception when IO exception happens");
        }
        catch (ProcessorStateException processorStateException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowIfRestoreCallbackThrows() {
        ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE);
        stateMgr.registerStore((StateStore)this.persistentStore, (key, value) -> {
            throw new RuntimeException("KABOOM!");
        });
        ProcessorStateManager.StateStoreMetadata storeMetadata = stateMgr.storeMetadata(this.persistentStorePartition);
        try {
            stateMgr.restore(storeMetadata, Collections.singletonList(this.consumerRecord));
            Assert.fail((String)"should have thrown processor state exception when IO exception happens");
        }
        catch (ProcessorStateException processorStateException) {
            // empty catch block
        }
    }

    @Test
    public void shouldFlushGoodStoresEvenSomeThrowsException() {
        final AtomicBoolean flushedStore = new AtomicBoolean(false);
        MockKeyValueStore stateStore1 = new MockKeyValueStore("persistentStore", true){

            @Override
            public void flush() {
                throw new RuntimeException("KABOOM!");
            }
        };
        MockKeyValueStore stateStore2 = new MockKeyValueStore("persistentStore2", true){

            @Override
            public void flush() {
                flushedStore.set(true);
            }
        };
        ProcessorStateManager stateManager = this.getStateManager(Task.TaskType.ACTIVE);
        stateManager.registerStore((StateStore)stateStore1, stateStore1.stateRestoreCallback);
        stateManager.registerStore((StateStore)stateStore2, stateStore2.stateRestoreCallback);
        try {
            stateManager.flush();
        }
        catch (ProcessorStateException processorStateException) {
            // empty catch block
        }
        Assert.assertTrue((boolean)flushedStore.get());
    }

    @Test
    public void shouldCloseAllStoresEvenIfStoreThrowsException() {
        final AtomicBoolean closedStore = new AtomicBoolean(false);
        MockKeyValueStore stateStore1 = new MockKeyValueStore("persistentStore", true){

            @Override
            public void close() {
                throw new RuntimeException("KABOOM!");
            }
        };
        MockKeyValueStore stateStore2 = new MockKeyValueStore("persistentStore2", true){

            @Override
            public void close() {
                closedStore.set(true);
            }
        };
        ProcessorStateManager stateManager = this.getStateManager(Task.TaskType.ACTIVE);
        stateManager.registerStore((StateStore)stateStore1, stateStore1.stateRestoreCallback);
        stateManager.registerStore((StateStore)stateStore2, stateStore2.stateRestoreCallback);
        try {
            stateManager.close();
        }
        catch (ProcessorStateException processorStateException) {
            // empty catch block
        }
        Assert.assertTrue((boolean)closedStore.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldThrowTaskCorruptedWithoutPersistentStoreCheckpointAndNonEmptyDir() throws IOException {
        long checkpointOffset = 10L;
        Map offsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.persistentStorePartition, (Object)10L), Utils.mkEntry((Object)this.nonPersistentStorePartition, (Object)10L), Utils.mkEntry((Object)this.irrelevantPartition, (Object)999L)});
        this.checkpoint.write(offsets);
        try (ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE, true);){
            stateMgr.registerStore((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateMgr.registerStore((StateStore)this.persistentStoreTwo, this.persistentStoreTwo.stateRestoreCallback);
            stateMgr.registerStore((StateStore)this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
            TaskCorruptedException exception = (TaskCorruptedException)Assert.assertThrows(TaskCorruptedException.class, () -> stateMgr.initializeStoreOffsetsFromCheckpoint(false));
            Assert.assertEquals(Collections.singleton(this.taskId), (Object)exception.corruptedTasks());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldNotThrowTaskCorruptedWithoutInMemoryStoreCheckpointAndNonEmptyDir() throws IOException {
        long checkpointOffset = 10L;
        Map offsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.persistentStorePartition, (Object)10L), Utils.mkEntry((Object)this.irrelevantPartition, (Object)999L)});
        this.checkpoint.write(offsets);
        try (ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE, true);){
            stateMgr.registerStore((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateMgr.registerStore((StateStore)this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
            stateMgr.initializeStoreOffsetsFromCheckpoint(false);
        }
    }

    @Test
    public void shouldNotThrowTaskCorruptedExceptionAfterCheckpointing() {
        try (ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE, true);){
            stateMgr.registerStore((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateMgr.registerStore((StateStore)this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
            stateMgr.initializeStoreOffsetsFromCheckpoint(true);
            MatcherAssert.assertThat((Object)stateMgr.storeMetadata(this.nonPersistentStorePartition), (Matcher)CoreMatchers.notNullValue());
            MatcherAssert.assertThat((Object)stateMgr.storeMetadata(this.persistentStorePartition), (Matcher)CoreMatchers.notNullValue());
            stateMgr.updateChangelogOffsets(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.nonPersistentStorePartition, (Object)876L), Utils.mkEntry((Object)this.persistentStorePartition, (Object)666L)}));
            stateMgr.checkpoint();
            stateMgr.close();
            Assert.assertNull((Object)stateMgr.storeMetadata(this.nonPersistentStorePartition));
            Assert.assertNull((Object)stateMgr.storeMetadata(this.persistentStorePartition));
            stateMgr.registerStore((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateMgr.registerStore((StateStore)this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
            stateMgr.initializeStoreOffsetsFromCheckpoint(false);
            MatcherAssert.assertThat((Object)stateMgr.storeMetadata(this.nonPersistentStorePartition), (Matcher)CoreMatchers.notNullValue());
            MatcherAssert.assertThat((Object)stateMgr.storeMetadata(this.persistentStorePartition), (Matcher)CoreMatchers.notNullValue());
        }
    }

    @Test
    public void shouldThrowIllegalStateIfInitializingOffsetsForCorruptedTasks() {
        try (ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE, true);){
            stateMgr.registerStore((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateMgr.markChangelogAsCorrupted((Collection)Utils.mkSet((Object[])new TopicPartition[]{this.persistentStorePartition}));
            ProcessorStateException thrown = (ProcessorStateException)Assert.assertThrows(ProcessorStateException.class, () -> stateMgr.initializeStoreOffsetsFromCheckpoint(true));
            Assert.assertTrue((boolean)(thrown.getCause() instanceof IllegalStateException));
        }
    }

    @Test
    public void shouldBeAbleToCloseWithoutRegisteringAnyStores() {
        ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE, true);
        stateMgr.close();
    }

    @Test
    public void shouldDeleteCheckPointFileIfEosEnabled() throws IOException {
        long checkpointOffset = 10L;
        Map offsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.persistentStorePartition, (Object)10L), Utils.mkEntry((Object)this.nonPersistentStorePartition, (Object)10L), Utils.mkEntry((Object)this.irrelevantPartition, (Object)999L)});
        this.checkpoint.write(offsets);
        ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE, true);
        stateMgr.deleteCheckPointFileIfEOSEnabled();
        stateMgr.close();
        Assert.assertFalse((boolean)this.checkpointFile.exists());
    }

    @Test
    public void shouldNotDeleteCheckPointFileIfEosNotEnabled() throws IOException {
        long checkpointOffset = 10L;
        Map offsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.persistentStorePartition, (Object)10L), Utils.mkEntry((Object)this.nonPersistentStorePartition, (Object)10L), Utils.mkEntry((Object)this.irrelevantPartition, (Object)999L)});
        this.checkpoint.write(offsets);
        ProcessorStateManager stateMgr = this.getStateManager(Task.TaskType.ACTIVE, false);
        stateMgr.deleteCheckPointFileIfEOSEnabled();
        stateMgr.close();
        Assert.assertTrue((boolean)this.checkpointFile.exists());
    }

    private ProcessorStateManager getStateManager(Task.TaskType taskType, boolean eosEnabled) {
        return new ProcessorStateManager(this.taskId, taskType, eosEnabled, this.logContext, this.stateDirectory, (ChangelogRegister)this.changelogReader, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"persistentStore", (Object)this.persistentStoreTopicName), Utils.mkEntry((Object)"persistentStore2", (Object)this.persistentStoreTwoTopicName), Utils.mkEntry((Object)"nonPersistentStore", (Object)this.nonPersistentStoreTopicName)}), Collections.emptySet());
    }

    private ProcessorStateManager getStateManager(Task.TaskType taskType) {
        return this.getStateManager(taskType, false);
    }

    private MockKeyValueStore getConverterStore() {
        return new ConverterStore("persistentStore", true);
    }

    private static class ConverterStore
    extends MockKeyValueStore
    implements TimestampedBytesStore {
        ConverterStore(String name, boolean persistent) {
            super(name, persistent);
        }
    }
}

