/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.MockChangelogReader;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockBatchingStateRestoreListener;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.NoOpProcessorContext;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ProcessorStateManagerTest {
    private final Set<TopicPartition> noPartitions = Collections.emptySet();
    private final String applicationId = "test-application";
    private final String persistentStoreName = "persistentStore";
    private final String nonPersistentStoreName = "nonPersistentStore";
    private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"persistentStore");
    private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"nonPersistentStore");
    private final MockKeyValueStore persistentStore = new MockKeyValueStore("persistentStore", true);
    private final MockKeyValueStore nonPersistentStore = new MockKeyValueStore("nonPersistentStore", false);
    private final TopicPartition persistentStorePartition = new TopicPartition(this.persistentStoreTopicName, 1);
    private final String storeName = "mockKeyValueStore";
    private final String changelogTopic = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"mockKeyValueStore");
    private final TopicPartition changelogTopicPartition = new TopicPartition(this.changelogTopic, 0);
    private final TaskId taskId = new TaskId(0, 1);
    private final MockChangelogReader changelogReader = new MockChangelogReader();
    private final MockKeyValueStore mockKeyValueStore = new MockKeyValueStore("mockKeyValueStore", true);
    private final byte[] key = new byte[]{0, 0, 0, 1};
    private final byte[] value = "the-value".getBytes(Charset.forName("UTF-8"));
    private final ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord(this.changelogTopic, 0, 0L, (Object)this.key, (Object)this.value);
    private final LogContext logContext = new LogContext("process-state-manager-test ");
    private File baseDir;
    private File checkpointFile;
    private OffsetCheckpoint checkpoint;
    private StateDirectory stateDirectory;

    @Before
    public void setup() {
        this.baseDir = TestUtils.tempDirectory();
        this.stateDirectory = new StateDirectory(new StreamsConfig((Map)new Properties(){
            {
                this.put("application.id", "test-application");
                this.put("bootstrap.servers", "dummy:1234");
                this.put("state.dir", ProcessorStateManagerTest.this.baseDir.getPath());
            }
        }), (Time)new MockTime());
        this.checkpointFile = new File(this.stateDirectory.directoryForTask(this.taskId), ".checkpoint");
        this.checkpoint = new OffsetCheckpoint(this.checkpointFile);
    }

    @After
    public void cleanup() throws IOException {
        Utils.delete((File)this.baseDir);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldRestoreStoreWithBatchingRestoreSpecification() throws Exception {
        TaskId taskId = new TaskId(0, 2);
        MockBatchingStateRestoreListener batchingRestoreCallback = new MockBatchingStateRestoreListener();
        KeyValue expectedKeyValue = KeyValue.pair((Object)this.key, (Object)this.value);
        MockKeyValueStore persistentStore = this.getPersistentStore();
        ProcessorStateManager stateMgr = this.getStandByStateManager(taskId);
        try {
            stateMgr.register((StateStore)persistentStore, (StateRestoreCallback)batchingRestoreCallback);
            stateMgr.updateStandbyStates(this.persistentStorePartition, Collections.singletonList(this.consumerRecord), this.consumerRecord.offset());
            MatcherAssert.assertThat((Object)batchingRestoreCallback.getRestoredRecords().size(), (Matcher)Is.is((Object)1));
            Assert.assertTrue((boolean)batchingRestoreCallback.getRestoredRecords().contains(expectedKeyValue));
        }
        finally {
            stateMgr.close(Collections.emptyMap());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldRestoreStoreWithSinglePutRestoreSpecification() throws Exception {
        TaskId taskId = new TaskId(0, 2);
        Integer intKey = 1;
        MockKeyValueStore persistentStore = this.getPersistentStore();
        ProcessorStateManager stateMgr = this.getStandByStateManager(taskId);
        try {
            stateMgr.register((StateStore)persistentStore, persistentStore.stateRestoreCallback);
            stateMgr.updateStandbyStates(this.persistentStorePartition, Collections.singletonList(this.consumerRecord), this.consumerRecord.offset());
            MatcherAssert.assertThat((Object)persistentStore.keys.size(), (Matcher)Is.is((Object)1));
            Assert.assertTrue((boolean)persistentStore.keys.contains(intKey));
        }
        finally {
            stateMgr.close(Collections.emptyMap());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRegisterPersistentStore() throws IOException {
        TaskId taskId = new TaskId(0, 2);
        MockKeyValueStore persistentStore = this.getPersistentStore();
        ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, this.noPartitions, false, this.stateDirectory, (Map)new HashMap<String, String>(){
            {
                this.put("persistentStore", ProcessorStateManagerTest.this.persistentStoreTopicName);
                this.put("nonPersistentStore", "nonPersistentStore");
            }
        }, (ChangelogReader)this.changelogReader, false, this.logContext);
        try {
            stateMgr.register((StateStore)persistentStore, persistentStore.stateRestoreCallback);
            Assert.assertTrue((boolean)this.changelogReader.wasRegistered(new TopicPartition(this.persistentStoreTopicName, 2)));
        }
        finally {
            stateMgr.close(Collections.emptyMap());
        }
    }

    @Test
    public void testRegisterNonPersistentStore() throws IOException {
        MockKeyValueStore nonPersistentStore = new MockKeyValueStore("nonPersistentStore", false);
        ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 2), this.noPartitions, false, this.stateDirectory, (Map)new HashMap<String, String>(){
            {
                this.put("persistentStore", ProcessorStateManagerTest.this.persistentStoreTopicName);
                this.put("nonPersistentStore", ProcessorStateManagerTest.this.nonPersistentStoreTopicName);
            }
        }, (ChangelogReader)this.changelogReader, false, this.logContext);
        try {
            stateMgr.register((StateStore)nonPersistentStore, nonPersistentStore.stateRestoreCallback);
            Assert.assertTrue((boolean)this.changelogReader.wasRegistered(new TopicPartition(this.nonPersistentStoreTopicName, 2)));
        }
        finally {
            stateMgr.close(Collections.emptyMap());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testChangeLogOffsets() throws IOException {
        TaskId taskId = new TaskId(0, 0);
        long lastCheckpointedOffset = 10L;
        String storeName1 = "store1";
        String storeName2 = "store2";
        String storeName3 = "store3";
        String storeTopicName1 = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"store1");
        String storeTopicName2 = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"store2");
        String storeTopicName3 = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"store3");
        HashMap<String, String> storeToChangelogTopic = new HashMap<String, String>();
        storeToChangelogTopic.put("store1", storeTopicName1);
        storeToChangelogTopic.put("store2", storeTopicName2);
        storeToChangelogTopic.put("store3", storeTopicName3);
        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.stateDirectory.directoryForTask(taskId), ".checkpoint"));
        checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), 10L));
        TopicPartition partition1 = new TopicPartition(storeTopicName1, 0);
        TopicPartition partition2 = new TopicPartition(storeTopicName2, 0);
        TopicPartition partition3 = new TopicPartition(storeTopicName3, 1);
        MockKeyValueStore store1 = new MockKeyValueStore("store1", true);
        MockKeyValueStore store2 = new MockKeyValueStore("store2", true);
        MockKeyValueStore store3 = new MockKeyValueStore("store3", true);
        Set sourcePartitions = Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition(storeTopicName3, 1)});
        ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, (Collection)sourcePartitions, true, this.stateDirectory, storeToChangelogTopic, (ChangelogReader)this.changelogReader, false, this.logContext);
        try {
            stateMgr.register((StateStore)store1, store1.stateRestoreCallback);
            stateMgr.register((StateStore)store2, store2.stateRestoreCallback);
            stateMgr.register((StateStore)store3, store3.stateRestoreCallback);
            Map changeLogOffsets = stateMgr.checkpointed();
            Assert.assertEquals((long)3L, (long)changeLogOffsets.size());
            Assert.assertTrue((boolean)changeLogOffsets.containsKey(partition1));
            Assert.assertTrue((boolean)changeLogOffsets.containsKey(partition2));
            Assert.assertTrue((boolean)changeLogOffsets.containsKey(partition3));
            Assert.assertEquals((long)10L, (long)((Long)changeLogOffsets.get(partition1)));
            Assert.assertEquals((long)-1L, (long)((Long)changeLogOffsets.get(partition2)));
            Assert.assertEquals((long)-1L, (long)((Long)changeLogOffsets.get(partition3)));
        }
        finally {
            stateMgr.close(Collections.emptyMap());
        }
    }

    @Test
    public void testGetStore() throws IOException {
        MockKeyValueStore mockKeyValueStore = new MockKeyValueStore("nonPersistentStore", false);
        ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), this.noPartitions, false, this.stateDirectory, Collections.emptyMap(), (ChangelogReader)this.changelogReader, false, this.logContext);
        try {
            stateMgr.register((StateStore)mockKeyValueStore, mockKeyValueStore.stateRestoreCallback);
            Assert.assertNull((Object)stateMgr.getStore("noSuchStore"));
            Assert.assertEquals((Object)mockKeyValueStore, (Object)stateMgr.getStore("nonPersistentStore"));
        }
        finally {
            stateMgr.close(Collections.emptyMap());
        }
    }

    @Test
    public void testFlushAndClose() throws IOException {
        this.checkpoint.write(Collections.emptyMap());
        HashMap<TopicPartition, Long> ackedOffsets = new HashMap<TopicPartition, Long>();
        ackedOffsets.put(new TopicPartition(this.persistentStoreTopicName, 1), 123L);
        ackedOffsets.put(new TopicPartition(this.nonPersistentStoreTopicName, 1), 456L);
        ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"otherTopic"), 1), 789L);
        ProcessorStateManager stateMgr = new ProcessorStateManager(this.taskId, this.noPartitions, false, this.stateDirectory, (Map)new HashMap<String, String>(){
            {
                this.put("persistentStore", ProcessorStateManagerTest.this.persistentStoreTopicName);
                this.put("nonPersistentStore", ProcessorStateManagerTest.this.nonPersistentStoreTopicName);
            }
        }, (ChangelogReader)this.changelogReader, false, this.logContext);
        try {
            Assert.assertFalse((boolean)this.checkpointFile.exists());
            stateMgr.register((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
            stateMgr.register((StateStore)this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
        }
        finally {
            stateMgr.flush();
            stateMgr.close(ackedOffsets);
        }
        Assert.assertTrue((boolean)this.persistentStore.flushed);
        Assert.assertTrue((boolean)this.persistentStore.closed);
        Assert.assertTrue((boolean)this.nonPersistentStore.flushed);
        Assert.assertTrue((boolean)this.nonPersistentStore.closed);
        Assert.assertTrue((boolean)this.checkpointFile.exists());
        Map checkpointedOffsets = this.checkpoint.read();
        Assert.assertEquals((long)1L, (long)checkpointedOffsets.size());
        Assert.assertEquals((Object)new Long(124L), checkpointedOffsets.get(new TopicPartition(this.persistentStoreTopicName, 1)));
    }

    @Test
    public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws IOException {
        ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), this.noPartitions, false, this.stateDirectory, Collections.emptyMap(), (ChangelogReader)this.changelogReader, false, this.logContext);
        stateMgr.register((StateStore)this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
        Assert.assertNotNull((Object)stateMgr.getStore("nonPersistentStore"));
    }

    @Test
    public void shouldNotChangeOffsetsIfAckedOffsetsIsNull() throws IOException {
        Map<TopicPartition, Long> offsets = Collections.singletonMap(this.persistentStorePartition, 99L);
        this.checkpoint.write(offsets);
        MockKeyValueStore persistentStore = new MockKeyValueStore("persistentStore", true);
        ProcessorStateManager stateMgr = new ProcessorStateManager(this.taskId, this.noPartitions, false, this.stateDirectory, Collections.emptyMap(), (ChangelogReader)this.changelogReader, false, this.logContext);
        stateMgr.register((StateStore)persistentStore, persistentStore.stateRestoreCallback);
        stateMgr.close(null);
        Map read = this.checkpoint.read();
        MatcherAssert.assertThat((Object)read, (Matcher)CoreMatchers.equalTo(offsets));
    }

    @Test
    public void shouldWriteCheckpointForPersistentLogEnabledStore() throws IOException {
        ProcessorStateManager stateMgr = new ProcessorStateManager(this.taskId, this.noPartitions, false, this.stateDirectory, Collections.singletonMap(this.persistentStore.name(), this.persistentStoreTopicName), (ChangelogReader)this.changelogReader, false, this.logContext);
        stateMgr.register((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
        stateMgr.checkpoint(Collections.singletonMap(this.persistentStorePartition, 10L));
        Map read = this.checkpoint.read();
        MatcherAssert.assertThat((Object)read, (Matcher)CoreMatchers.equalTo(Collections.singletonMap(this.persistentStorePartition, 11L)));
    }

    @Test
    public void shouldWriteCheckpointForStandbyReplica() throws IOException {
        ProcessorStateManager stateMgr = new ProcessorStateManager(this.taskId, this.noPartitions, true, this.stateDirectory, Collections.singletonMap(this.persistentStore.name(), this.persistentStoreTopicName), (ChangelogReader)this.changelogReader, false, this.logContext);
        stateMgr.register((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
        byte[] bytes = Serdes.Integer().serializer().serialize("", (Object)10);
        stateMgr.updateStandbyStates(this.persistentStorePartition, Collections.singletonList(new ConsumerRecord("", 0, 0L, (Object)bytes, (Object)bytes)), 888L);
        stateMgr.checkpoint(Collections.emptyMap());
        Map read = this.checkpoint.read();
        MatcherAssert.assertThat((Object)read, (Matcher)CoreMatchers.equalTo(Collections.singletonMap(this.persistentStorePartition, 889L)));
    }

    @Test
    public void shouldNotWriteCheckpointForNonPersistent() throws IOException {
        TopicPartition topicPartition = new TopicPartition(this.nonPersistentStoreTopicName, 1);
        ProcessorStateManager stateMgr = new ProcessorStateManager(this.taskId, this.noPartitions, true, this.stateDirectory, Collections.singletonMap("nonPersistentStore", this.nonPersistentStoreTopicName), (ChangelogReader)this.changelogReader, false, this.logContext);
        stateMgr.register((StateStore)this.nonPersistentStore, this.nonPersistentStore.stateRestoreCallback);
        stateMgr.checkpoint(Collections.singletonMap(topicPartition, 876L));
        Map read = this.checkpoint.read();
        MatcherAssert.assertThat((Object)read, (Matcher)CoreMatchers.equalTo(Collections.emptyMap()));
    }

    @Test
    public void shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws IOException {
        ProcessorStateManager stateMgr = new ProcessorStateManager(this.taskId, this.noPartitions, true, this.stateDirectory, Collections.emptyMap(), (ChangelogReader)this.changelogReader, false, this.logContext);
        stateMgr.register((StateStore)this.persistentStore, this.persistentStore.stateRestoreCallback);
        stateMgr.checkpoint(Collections.singletonMap(this.persistentStorePartition, 987L));
        Map read = this.checkpoint.read();
        MatcherAssert.assertThat((Object)read, (Matcher)CoreMatchers.equalTo(Collections.emptyMap()));
    }

    @Test
    public void shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName() throws IOException {
        ProcessorStateManager stateManager = new ProcessorStateManager(this.taskId, this.noPartitions, false, this.stateDirectory, Collections.emptyMap(), (ChangelogReader)this.changelogReader, false, this.logContext);
        try {
            stateManager.register((StateStore)new MockKeyValueStore(".checkpoint", true), null);
            Assert.fail((String)"should have thrown illegal argument exception when store name same as checkpoint file");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeenRegistered() throws IOException {
        ProcessorStateManager stateManager = new ProcessorStateManager(this.taskId, this.noPartitions, false, this.stateDirectory, Collections.emptyMap(), (ChangelogReader)this.changelogReader, false, this.logContext);
        stateManager.register((StateStore)this.mockKeyValueStore, null);
        try {
            stateManager.register((StateStore)this.mockKeyValueStore, null);
            Assert.fail((String)"should have thrown illegal argument exception when store with same name already registered");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowProcessorStateExceptionOnFlushIfStoreThrowsAnException() throws IOException {
        ProcessorStateManager stateManager = new ProcessorStateManager(this.taskId, Collections.singleton(this.changelogTopicPartition), false, this.stateDirectory, Collections.singletonMap("mockKeyValueStore", this.changelogTopic), (ChangelogReader)this.changelogReader, false, this.logContext);
        MockKeyValueStore stateStore = new MockKeyValueStore("mockKeyValueStore", true){

            @Override
            public void flush() {
                throw new RuntimeException("KABOOM!");
            }
        };
        stateManager.register((StateStore)stateStore, stateStore.stateRestoreCallback);
        try {
            stateManager.flush();
            Assert.fail((String)"Should throw ProcessorStateException if store flush throws exception");
        }
        catch (ProcessorStateException processorStateException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAnException() throws IOException {
        ProcessorStateManager stateManager = new ProcessorStateManager(this.taskId, Collections.singleton(this.changelogTopicPartition), false, this.stateDirectory, Collections.singletonMap("mockKeyValueStore", this.changelogTopic), (ChangelogReader)this.changelogReader, false, this.logContext);
        MockKeyValueStore stateStore = new MockKeyValueStore("mockKeyValueStore", true){

            @Override
            public void close() {
                throw new RuntimeException("KABOOM!");
            }
        };
        stateManager.register((StateStore)stateStore, stateStore.stateRestoreCallback);
        try {
            stateManager.close(Collections.emptyMap());
            Assert.fail((String)"Should throw ProcessorStateException if store close throws exception");
        }
        catch (ProcessorStateException processorStateException) {
            // empty catch block
        }
    }

    @Test
    public void shouldFlushAllStoresEvenIfStoreThrowsExcepiton() throws IOException {
        ProcessorStateManager stateManager = new ProcessorStateManager(this.taskId, Collections.singleton(this.changelogTopicPartition), false, this.stateDirectory, Collections.singletonMap("mockKeyValueStore", this.changelogTopic), (ChangelogReader)this.changelogReader, false, this.logContext);
        final AtomicBoolean flushedStore = new AtomicBoolean(false);
        MockKeyValueStore stateStore1 = new MockKeyValueStore("mockKeyValueStore", true){

            @Override
            public void flush() {
                throw new RuntimeException("KABOOM!");
            }
        };
        MockKeyValueStore stateStore2 = new MockKeyValueStore("mockKeyValueStore2", true){

            @Override
            public void flush() {
                flushedStore.set(true);
            }
        };
        stateManager.register((StateStore)stateStore1, stateStore1.stateRestoreCallback);
        stateManager.register((StateStore)stateStore2, stateStore2.stateRestoreCallback);
        try {
            stateManager.flush();
        }
        catch (ProcessorStateException processorStateException) {
            // empty catch block
        }
        Assert.assertTrue((boolean)flushedStore.get());
    }

    @Test
    public void shouldCloseAllStoresEvenIfStoreThrowsExcepiton() throws IOException {
        ProcessorStateManager stateManager = new ProcessorStateManager(this.taskId, Collections.singleton(this.changelogTopicPartition), false, this.stateDirectory, Collections.singletonMap("mockKeyValueStore", this.changelogTopic), (ChangelogReader)this.changelogReader, false, this.logContext);
        final AtomicBoolean closedStore = new AtomicBoolean(false);
        MockKeyValueStore stateStore1 = new MockKeyValueStore("mockKeyValueStore", true){

            @Override
            public void close() {
                throw new RuntimeException("KABOOM!");
            }
        };
        MockKeyValueStore stateStore2 = new MockKeyValueStore("mockKeyValueStore2", true){

            @Override
            public void close() {
                closedStore.set(true);
            }
        };
        stateManager.register((StateStore)stateStore1, stateStore1.stateRestoreCallback);
        stateManager.register((StateStore)stateStore2, stateStore2.stateRestoreCallback);
        try {
            stateManager.close(Collections.emptyMap());
        }
        catch (ProcessorStateException processorStateException) {
            // empty catch block
        }
        Assert.assertTrue((boolean)closedStore.get());
    }

    @Test
    public void shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws IOException {
        this.checkpoint.write(Collections.singletonMap(new TopicPartition(this.persistentStoreTopicName, 1), 123L));
        Assert.assertTrue((boolean)this.checkpointFile.exists());
        ProcessorStateManager stateManager = null;
        try {
            stateManager = new ProcessorStateManager(this.taskId, this.noPartitions, false, this.stateDirectory, Collections.emptyMap(), (ChangelogReader)this.changelogReader, true, this.logContext);
            Assert.assertFalse((boolean)this.checkpointFile.exists());
        }
        finally {
            if (stateManager != null) {
                stateManager.close(null);
            }
        }
    }

    @Test
    public void shouldSuccessfullyReInitializeStateStoresWithEosDisable() throws Exception {
        this.shouldSuccessfullyReInitializeStateStores(false);
    }

    @Test
    public void shouldSuccessfullyReInitializeStateStoresWithEosEnable() throws Exception {
        this.shouldSuccessfullyReInitializeStateStores(true);
    }

    private void shouldSuccessfullyReInitializeStateStores(boolean eosEnabled) throws Exception {
        String store2Name = "store2";
        String store2Changelog = "store2-changelog";
        TopicPartition store2Partition = new TopicPartition("store2-changelog", 0);
        List<TopicPartition> changelogPartitions = Arrays.asList(this.changelogTopicPartition, store2Partition);
        HashMap<String, String> storeToChangelog = new HashMap<String, String>(){
            {
                this.put("mockKeyValueStore", ProcessorStateManagerTest.this.changelogTopic);
                this.put("store2", "store2-changelog");
            }
        };
        final ProcessorStateManager stateManager = new ProcessorStateManager(this.taskId, changelogPartitions, false, this.stateDirectory, (Map)storeToChangelog, (ChangelogReader)this.changelogReader, eosEnabled, this.logContext);
        MockKeyValueStore stateStore = new MockKeyValueStore("mockKeyValueStore", true);
        MockKeyValueStore stateStore2 = new MockKeyValueStore("store2", true);
        stateManager.register((StateStore)stateStore, stateStore.stateRestoreCallback);
        stateManager.register((StateStore)stateStore2, stateStore2.stateRestoreCallback);
        stateStore.initialized = false;
        stateStore2.initialized = false;
        stateManager.reinitializeStateStoresForPartitions(changelogPartitions, (InternalProcessorContext)new NoOpProcessorContext(){

            @Override
            public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
                stateManager.register(store, stateRestoreCallback);
            }
        });
        Assert.assertTrue((boolean)stateStore.initialized);
        Assert.assertTrue((boolean)stateStore2.initialized);
    }

    private ProcessorStateManager getStandByStateManager(TaskId taskId) throws IOException {
        return new ProcessorStateManager(taskId, this.noPartitions, true, this.stateDirectory, (Map)new HashMap<String, String>(){
            {
                this.put("persistentStore", ProcessorStateManagerTest.this.persistentStoreTopicName);
            }
        }, (ChangelogReader)this.changelogReader, false, this.logContext);
    }

    private MockKeyValueStore getPersistentStore() {
        return new MockKeyValueStore("persistentStore", true);
    }
}

