/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockChangelogReader;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ProcessorStateManagerTest {
    private final Set<TopicPartition> noPartitions = Collections.emptySet();
    private final String applicationId = "test-application";
    private final String persistentStoreName = "persistentStore";
    private final String nonPersistentStoreName = "nonPersistentStore";
    private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"persistentStore");
    private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"nonPersistentStore");
    private final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true);
    private final MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false);
    private final TopicPartition persistentStorePartition = new TopicPartition(this.persistentStoreTopicName, 1);
    private final String storeName = "mockStateStore";
    private final String changelogTopic = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"mockStateStore");
    private final TopicPartition changelogTopicPartition = new TopicPartition(this.changelogTopic, 0);
    private final TaskId taskId = new TaskId(0, 1);
    private final MockChangelogReader changelogReader = new MockChangelogReader();
    private final MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStateStore", true);
    private File baseDir;
    private File checkpointFile;
    private OffsetCheckpoint checkpoint;
    private StateDirectory stateDirectory;

    @Before
    public void setup() {
        this.baseDir = TestUtils.tempDirectory();
        this.stateDirectory = new StateDirectory("test-application", this.baseDir.getPath(), (Time)new MockTime());
        this.checkpointFile = new File(this.stateDirectory.directoryForTask(this.taskId), ".checkpoint");
        this.checkpoint = new OffsetCheckpoint(this.checkpointFile);
    }

    @After
    public void cleanup() throws IOException {
        Utils.delete((File)this.baseDir);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRegisterPersistentStore() throws IOException {
        TaskId taskId = new TaskId(0, 2);
        MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true);
        ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, this.noPartitions, false, this.stateDirectory, (Map)new HashMap<String, String>(){
            {
                this.put("persistentStore", ProcessorStateManagerTest.this.persistentStoreTopicName);
                this.put("nonPersistentStore", "nonPersistentStore");
            }
        }, (ChangelogReader)this.changelogReader, false);
        try {
            stateMgr.register((StateStore)persistentStore, true, persistentStore.stateRestoreCallback);
            Assert.assertTrue((boolean)this.changelogReader.wasRegistered(new TopicPartition(this.persistentStoreTopicName, 2)));
        }
        finally {
            stateMgr.close(Collections.emptyMap());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRegisterNonPersistentStore() throws IOException {
        MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false);
        ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 2), this.noPartitions, false, this.stateDirectory, (Map)new HashMap<String, String>(){
            {
                this.put("persistentStore", ProcessorStateManagerTest.this.persistentStoreTopicName);
                this.put("nonPersistentStore", ProcessorStateManagerTest.this.nonPersistentStoreTopicName);
            }
        }, (ChangelogReader)this.changelogReader, false);
        try {
            stateMgr.register((StateStore)nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
            Assert.assertTrue((boolean)this.changelogReader.wasRegistered(new TopicPartition(this.nonPersistentStoreTopicName, 2)));
        }
        finally {
            stateMgr.close(Collections.emptyMap());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testChangeLogOffsets() throws IOException {
        TaskId taskId = new TaskId(0, 0);
        long lastCheckpointedOffset = 10L;
        String storeName1 = "store1";
        String storeName2 = "store2";
        String storeName3 = "store3";
        String storeTopicName1 = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"store1");
        String storeTopicName2 = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"store2");
        String storeTopicName3 = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"store3");
        HashMap<String, String> storeToChangelogTopic = new HashMap<String, String>();
        storeToChangelogTopic.put("store1", storeTopicName1);
        storeToChangelogTopic.put("store2", storeTopicName2);
        storeToChangelogTopic.put("store3", storeTopicName3);
        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.stateDirectory.directoryForTask(taskId), ".checkpoint"));
        checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), 10L));
        TopicPartition partition1 = new TopicPartition(storeTopicName1, 0);
        TopicPartition partition2 = new TopicPartition(storeTopicName2, 0);
        TopicPartition partition3 = new TopicPartition(storeTopicName3, 1);
        MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore("store1", true);
        MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore("store2", true);
        MockStateStoreSupplier.MockStateStore store3 = new MockStateStoreSupplier.MockStateStore("store3", true);
        Set sourcePartitions = Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition(storeTopicName3, 1)});
        ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, (Collection)sourcePartitions, true, this.stateDirectory, storeToChangelogTopic, (ChangelogReader)this.changelogReader, false);
        try {
            stateMgr.register((StateStore)store1, true, store1.stateRestoreCallback);
            stateMgr.register((StateStore)store2, true, store2.stateRestoreCallback);
            stateMgr.register((StateStore)store3, true, store3.stateRestoreCallback);
            Map changeLogOffsets = stateMgr.checkpointed();
            Assert.assertEquals((long)3L, (long)changeLogOffsets.size());
            Assert.assertTrue((boolean)changeLogOffsets.containsKey(partition1));
            Assert.assertTrue((boolean)changeLogOffsets.containsKey(partition2));
            Assert.assertTrue((boolean)changeLogOffsets.containsKey(partition3));
            Assert.assertEquals((long)10L, (long)((Long)changeLogOffsets.get(partition1)));
            Assert.assertEquals((long)-1L, (long)((Long)changeLogOffsets.get(partition2)));
            Assert.assertEquals((long)-1L, (long)((Long)changeLogOffsets.get(partition3)));
        }
        finally {
            stateMgr.close(Collections.emptyMap());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testGetStore() throws IOException {
        MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false);
        ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), this.noPartitions, false, this.stateDirectory, Collections.emptyMap(), (ChangelogReader)this.changelogReader, false);
        try {
            stateMgr.register((StateStore)mockStateStore, true, mockStateStore.stateRestoreCallback);
            Assert.assertNull((Object)stateMgr.getStore("noSuchStore"));
            Assert.assertEquals((Object)mockStateStore, (Object)stateMgr.getStore("nonPersistentStore"));
        }
        finally {
            stateMgr.close(Collections.emptyMap());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFlushAndClose() throws IOException {
        this.checkpoint.write(Collections.emptyMap());
        HashMap<TopicPartition, Long> ackedOffsets = new HashMap<TopicPartition, Long>();
        ackedOffsets.put(new TopicPartition(this.persistentStoreTopicName, 1), 123L);
        ackedOffsets.put(new TopicPartition(this.nonPersistentStoreTopicName, 1), 456L);
        ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"otherTopic"), 1), 789L);
        ProcessorStateManager stateMgr = new ProcessorStateManager(this.taskId, this.noPartitions, false, this.stateDirectory, (Map)new HashMap<String, String>(){
            {
                this.put("persistentStore", ProcessorStateManagerTest.this.persistentStoreTopicName);
                this.put("nonPersistentStore", ProcessorStateManagerTest.this.nonPersistentStoreTopicName);
            }
        }, (ChangelogReader)this.changelogReader, false);
        try {
            Assert.assertTrue((boolean)this.checkpointFile.exists());
            stateMgr.register((StateStore)this.persistentStore, true, this.persistentStore.stateRestoreCallback);
            stateMgr.register((StateStore)this.nonPersistentStore, true, this.nonPersistentStore.stateRestoreCallback);
        }
        finally {
            stateMgr.flush();
            stateMgr.close(ackedOffsets);
        }
        Assert.assertTrue((boolean)this.persistentStore.flushed);
        Assert.assertTrue((boolean)this.persistentStore.closed);
        Assert.assertTrue((boolean)this.nonPersistentStore.flushed);
        Assert.assertTrue((boolean)this.nonPersistentStore.closed);
        Assert.assertTrue((boolean)this.checkpointFile.exists());
        Map checkpointedOffsets = this.checkpoint.read();
        Assert.assertEquals((long)1L, (long)checkpointedOffsets.size());
        Assert.assertEquals((Object)new Long(124L), checkpointedOffsets.get(new TopicPartition(this.persistentStoreTopicName, 1)));
    }

    @Test
    public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception {
        ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), this.noPartitions, false, this.stateDirectory, Collections.emptyMap(), (ChangelogReader)this.changelogReader, false);
        stateMgr.register((StateStore)this.nonPersistentStore, false, this.nonPersistentStore.stateRestoreCallback);
        Assert.assertNotNull((Object)stateMgr.getStore("nonPersistentStore"));
    }

    @Test
    public void shouldNotChangeOffsetsIfAckedOffsetsIsNull() throws Exception {
        Map<TopicPartition, Long> offsets = Collections.singletonMap(this.persistentStorePartition, 99L);
        this.checkpoint.write(offsets);
        MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true);
        ProcessorStateManager stateMgr = new ProcessorStateManager(this.taskId, this.noPartitions, false, this.stateDirectory, Collections.emptyMap(), (ChangelogReader)this.changelogReader, false);
        stateMgr.register((StateStore)persistentStore, true, persistentStore.stateRestoreCallback);
        stateMgr.close(null);
        Map read = this.checkpoint.read();
        MatcherAssert.assertThat((Object)read, (Matcher)CoreMatchers.equalTo(offsets));
    }

    @Test
    public void shouldWriteCheckpointForPersistentLogEnabledStore() throws Exception {
        ProcessorStateManager stateMgr = new ProcessorStateManager(this.taskId, this.noPartitions, false, this.stateDirectory, Collections.singletonMap(this.persistentStore.name(), this.persistentStoreTopicName), (ChangelogReader)this.changelogReader, false);
        stateMgr.register((StateStore)this.persistentStore, true, this.persistentStore.stateRestoreCallback);
        stateMgr.checkpoint(Collections.singletonMap(this.persistentStorePartition, 10L));
        Map read = this.checkpoint.read();
        MatcherAssert.assertThat((Object)read, (Matcher)CoreMatchers.equalTo(Collections.singletonMap(this.persistentStorePartition, 11L)));
    }

    @Test
    public void shouldWriteCheckpointForStandbyReplica() throws Exception {
        ProcessorStateManager stateMgr = new ProcessorStateManager(this.taskId, this.noPartitions, true, this.stateDirectory, Collections.singletonMap(this.persistentStore.name(), this.persistentStoreTopicName), (ChangelogReader)this.changelogReader, false);
        stateMgr.register((StateStore)this.persistentStore, true, this.persistentStore.stateRestoreCallback);
        byte[] bytes = Serdes.Integer().serializer().serialize("", (Object)10);
        stateMgr.updateStandbyStates(this.persistentStorePartition, Collections.singletonList(new ConsumerRecord(this.persistentStorePartition.topic(), this.persistentStorePartition.partition(), 888L, (Object)bytes, (Object)bytes)));
        stateMgr.checkpoint(Collections.emptyMap());
        Map read = this.checkpoint.read();
        MatcherAssert.assertThat((Object)read, (Matcher)CoreMatchers.equalTo(Collections.singletonMap(this.persistentStorePartition, 889L)));
    }

    @Test
    public void shouldNotWriteCheckpointForNonPersistent() throws Exception {
        TopicPartition topicPartition = new TopicPartition(this.nonPersistentStoreTopicName, 1);
        ProcessorStateManager stateMgr = new ProcessorStateManager(this.taskId, this.noPartitions, true, this.stateDirectory, Collections.singletonMap("nonPersistentStore", this.nonPersistentStoreTopicName), (ChangelogReader)this.changelogReader, false);
        stateMgr.register((StateStore)this.nonPersistentStore, true, this.nonPersistentStore.stateRestoreCallback);
        stateMgr.checkpoint(Collections.singletonMap(topicPartition, 876L));
        Map read = this.checkpoint.read();
        MatcherAssert.assertThat((Object)read, (Matcher)CoreMatchers.equalTo(Collections.emptyMap()));
    }

    @Test
    public void shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws Exception {
        ProcessorStateManager stateMgr = new ProcessorStateManager(this.taskId, this.noPartitions, true, this.stateDirectory, Collections.emptyMap(), (ChangelogReader)this.changelogReader, false);
        stateMgr.register((StateStore)this.persistentStore, true, this.persistentStore.stateRestoreCallback);
        stateMgr.checkpoint(Collections.singletonMap(this.persistentStorePartition, 987L));
        Map read = this.checkpoint.read();
        MatcherAssert.assertThat((Object)read, (Matcher)CoreMatchers.equalTo(Collections.emptyMap()));
    }

    @Test
    public void shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName() throws Exception {
        ProcessorStateManager stateManager = new ProcessorStateManager(this.taskId, this.noPartitions, false, this.stateDirectory, Collections.emptyMap(), (ChangelogReader)this.changelogReader, false);
        try {
            stateManager.register((StateStore)new MockStateStoreSupplier.MockStateStore(".checkpoint", true), true, null);
            Assert.fail((String)"should have thrown illegal argument exception when store name same as checkpoint file");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeenRegistered() throws Exception {
        ProcessorStateManager stateManager = new ProcessorStateManager(this.taskId, this.noPartitions, false, this.stateDirectory, Collections.emptyMap(), (ChangelogReader)this.changelogReader, false);
        stateManager.register((StateStore)this.mockStateStore, false, null);
        try {
            stateManager.register((StateStore)this.mockStateStore, false, null);
            Assert.fail((String)"should have thrown illegal argument exception when store with same name already registered");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAnException() throws Exception {
        ProcessorStateManager stateManager = new ProcessorStateManager(this.taskId, Collections.singleton(this.changelogTopicPartition), false, this.stateDirectory, Collections.singletonMap("mockStateStore", this.changelogTopic), (ChangelogReader)this.changelogReader, false);
        MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore("mockStateStore", true){

            @Override
            public void close() {
                throw new RuntimeException("KABOOM!");
            }
        };
        stateManager.register((StateStore)stateStore, false, stateStore.stateRestoreCallback);
        try {
            stateManager.close(Collections.emptyMap());
            Assert.fail((String)"Should throw ProcessorStateException if store close throws exception");
        }
        catch (ProcessorStateException processorStateException) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws Exception {
        this.checkpoint.write(Collections.emptyMap());
        Assert.assertTrue((boolean)this.checkpointFile.exists());
        ProcessorStateManager stateManager = null;
        try {
            stateManager = new ProcessorStateManager(this.taskId, this.noPartitions, false, this.stateDirectory, Collections.emptyMap(), (ChangelogReader)this.changelogReader, true);
            Assert.assertFalse((boolean)this.checkpointFile.exists());
        }
        finally {
            if (stateManager != null) {
                stateManager.close(null);
            }
        }
    }
}

