/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.NoOpProcessorContext;
import org.apache.kafka.test.NoOpReadOnlyStore;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class GlobalStateManagerImplTest {
    private final MockTime time = new MockTime();
    private final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
    private final TopicPartition t1 = new TopicPartition("t1", 1);
    private final TopicPartition t2 = new TopicPartition("t2", 1);
    private final TopicPartition t3 = new TopicPartition("t3", 1);
    private GlobalStateManagerImpl stateManager;
    private NoOpProcessorContext context;
    private StateDirectory stateDirectory;
    private String stateDirPath;
    private NoOpReadOnlyStore<Object, Object> store1;
    private NoOpReadOnlyStore store2;
    private NoOpReadOnlyStore store3;
    private MockConsumer<byte[], byte[]> consumer;
    private File checkpointFile;
    private ProcessorTopology topology;

    @Before
    public void before() throws IOException {
        HashMap<String, String> storeToTopic = new HashMap<String, String>();
        storeToTopic.put("t1-store", "t1");
        storeToTopic.put("t2-store", "t2");
        storeToTopic.put("t3-store", "t3");
        HashMap storeToProcessorNode = new HashMap();
        this.store1 = new NoOpReadOnlyStore("t1-store");
        storeToProcessorNode.put(this.store1, new MockProcessorNode(-1L));
        this.store2 = new NoOpReadOnlyStore("t2-store");
        storeToProcessorNode.put(this.store2, new MockProcessorNode(-1L));
        this.store3 = new NoOpReadOnlyStore("t3-store", false);
        storeToProcessorNode.put(this.store2, new MockProcessorNode(-1L));
        this.topology = new ProcessorTopology(Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList(), storeToTopic, Arrays.asList(this.store1, this.store2, this.store3));
        this.context = new NoOpProcessorContext();
        this.stateDirPath = TestUtils.tempDirectory().getPath();
        this.stateDirectory = new StateDirectory("appId", this.stateDirPath, (Time)this.time);
        this.consumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
        this.stateManager = new GlobalStateManagerImpl(this.topology, this.consumer, this.stateDirectory);
        this.checkpointFile = new File(this.stateManager.baseDir(), ".checkpoint");
    }

    @After
    public void after() throws IOException {
        this.stateDirectory.unlockGlobalState();
    }

    @Test
    public void shouldLockGlobalStateDirectory() throws Exception {
        this.stateManager.initialize((InternalProcessorContext)this.context);
        Assert.assertTrue((boolean)new File(this.stateDirectory.globalStateDir(), ".lock").exists());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(expected=LockException.class)
    public void shouldThrowLockExceptionIfCantGetLock() throws Exception {
        StateDirectory stateDir = new StateDirectory("appId", this.stateDirPath, (Time)this.time);
        try {
            stateDir.lockGlobalState(1);
            this.stateManager.initialize((InternalProcessorContext)this.context);
        }
        finally {
            stateDir.unlockGlobalState();
        }
    }

    @Test
    public void shouldReadCheckpointOffsets() throws Exception {
        Map<TopicPartition, Long> expected = this.writeCheckpoint();
        this.stateManager.initialize((InternalProcessorContext)this.context);
        Map offsets = this.stateManager.checkpointed();
        Assert.assertEquals(expected, (Object)offsets);
    }

    @Test
    public void shouldNotDeleteCheckpointFileAfterLoaded() throws Exception {
        this.writeCheckpoint();
        this.stateManager.initialize((InternalProcessorContext)this.context);
        Assert.assertTrue((boolean)this.checkpointFile.exists());
    }

    @Test(expected=StreamsException.class)
    public void shouldThrowStreamsExceptionIfFailedToReadCheckpointedOffsets() throws Exception {
        this.writeCorruptCheckpoint();
        this.stateManager.initialize((InternalProcessorContext)this.context);
    }

    @Test
    public void shouldInitializeStateStores() throws Exception {
        this.stateManager.initialize((InternalProcessorContext)this.context);
        Assert.assertTrue((boolean)this.store1.initialized);
        Assert.assertTrue((boolean)this.store2.initialized);
    }

    @Test
    public void shouldReturnInitializedStoreNames() throws Exception {
        Set storeNames = this.stateManager.initialize((InternalProcessorContext)this.context);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{this.store1.name(), this.store2.name(), this.store3.name()}), (Object)storeNames);
    }

    @Test
    public void shouldThrowIllegalArgumentIfTryingToRegisterStoreThatIsNotGlobal() throws Exception {
        this.stateManager.initialize((InternalProcessorContext)this.context);
        try {
            this.stateManager.register(new NoOpReadOnlyStore("not-in-topology"), false, (StateRestoreCallback)new TheStateRestoreCallback());
            Assert.fail((String)"should have raised an illegal argument exception as store is not in the topology");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() throws Exception {
        this.stateManager.initialize((InternalProcessorContext)this.context);
        this.initializeConsumer(2L, 1L, this.t1);
        this.stateManager.register(this.store1, false, (StateRestoreCallback)new TheStateRestoreCallback());
        try {
            this.stateManager.register(this.store1, false, (StateRestoreCallback)new TheStateRestoreCallback());
            Assert.fail((String)"should have raised an illegal argument exception as store has already been registered");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowStreamsExceptionIfNoPartitionsFoundForStore() throws Exception {
        this.stateManager.initialize((InternalProcessorContext)this.context);
        try {
            this.stateManager.register(this.store1, false, (StateRestoreCallback)new TheStateRestoreCallback());
            Assert.fail((String)"Should have raised a StreamsException as there are no partition for the store");
        }
        catch (StreamsException streamsException) {
            // empty catch block
        }
    }

    @Test
    public void shouldRestoreRecordsUpToHighwatermark() throws Exception {
        this.initializeConsumer(2L, 1L, this.t1);
        this.stateManager.initialize((InternalProcessorContext)this.context);
        TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
        this.stateManager.register(this.store1, false, (StateRestoreCallback)stateRestoreCallback);
        Assert.assertEquals((long)2L, (long)stateRestoreCallback.restored.size());
    }

    @Test
    public void shouldRestoreRecordsFromCheckpointToHighwatermark() throws Exception {
        this.initializeConsumer(5L, 6L, this.t1);
        OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(this.stateManager.baseDir(), ".checkpoint"));
        offsetCheckpoint.write(Collections.singletonMap(this.t1, 6L));
        this.stateManager.initialize((InternalProcessorContext)this.context);
        TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
        this.stateManager.register(this.store1, false, (StateRestoreCallback)stateRestoreCallback);
        Assert.assertEquals((long)5L, (long)stateRestoreCallback.restored.size());
    }

    @Test
    public void shouldFlushStateStores() throws Exception {
        this.stateManager.initialize((InternalProcessorContext)this.context);
        TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
        this.initializeConsumer(1L, 1L, this.t1);
        this.stateManager.register(this.store1, false, (StateRestoreCallback)stateRestoreCallback);
        this.initializeConsumer(1L, 1L, this.t2);
        this.stateManager.register((StateStore)this.store2, false, (StateRestoreCallback)stateRestoreCallback);
        this.stateManager.flush();
        Assert.assertTrue((boolean)this.store1.flushed);
        Assert.assertTrue((boolean)this.store2.flushed);
    }

    @Test(expected=ProcessorStateException.class)
    public void shouldThrowProcessorStateStoreExceptionIfStoreFlushFailed() throws Exception {
        this.stateManager.initialize((InternalProcessorContext)this.context);
        TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
        this.initializeConsumer(1L, 1L, this.t1);
        this.stateManager.register((StateStore)new NoOpReadOnlyStore(this.store1.name()){

            @Override
            public void flush() {
                throw new RuntimeException("KABOOM!");
            }
        }, false, (StateRestoreCallback)stateRestoreCallback);
        this.stateManager.flush();
    }

    @Test
    public void shouldCloseStateStores() throws Exception {
        this.stateManager.initialize((InternalProcessorContext)this.context);
        TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
        this.initializeConsumer(1L, 1L, this.t1);
        this.stateManager.register(this.store1, false, (StateRestoreCallback)stateRestoreCallback);
        this.initializeConsumer(1L, 1L, this.t2);
        this.stateManager.register((StateStore)this.store2, false, (StateRestoreCallback)stateRestoreCallback);
        this.stateManager.close(Collections.emptyMap());
        Assert.assertFalse((boolean)this.store1.isOpen());
        Assert.assertFalse((boolean)this.store2.isOpen());
    }

    @Test
    public void shouldWriteCheckpointsOnClose() throws Exception {
        this.stateManager.initialize((InternalProcessorContext)this.context);
        TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
        this.initializeConsumer(1L, 1L, this.t1);
        this.stateManager.register(this.store1, false, (StateRestoreCallback)stateRestoreCallback);
        Map<TopicPartition, Long> expected = Collections.singletonMap(this.t1, 25L);
        this.stateManager.close(expected);
        Map<TopicPartition, Long> result = this.readOffsetsCheckpoint();
        Assert.assertEquals(expected, result);
    }

    @Test(expected=ProcessorStateException.class)
    public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() throws Exception {
        this.stateManager.initialize((InternalProcessorContext)this.context);
        this.initializeConsumer(1L, 1L, this.t1);
        this.stateManager.register((StateStore)new NoOpReadOnlyStore(this.store1.name()){

            @Override
            public void close() {
                throw new RuntimeException("KABOOM!");
            }
        }, false, (StateRestoreCallback)this.stateRestoreCallback);
        this.stateManager.close(Collections.emptyMap());
    }

    @Test
    public void shouldThrowIllegalArgumentExceptionIfCallbackIsNull() throws Exception {
        this.stateManager.initialize((InternalProcessorContext)this.context);
        try {
            this.stateManager.register(this.store1, false, null);
            Assert.fail((String)"should have thrown due to null callback");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldUnlockGlobalStateDirectoryOnClose() throws Exception {
        this.stateManager.initialize((InternalProcessorContext)this.context);
        this.stateManager.close(Collections.emptyMap());
        StateDirectory stateDir = new StateDirectory("appId", this.stateDirPath, (Time)new MockTime());
        try {
            Assert.assertTrue((boolean)stateDir.lockGlobalState(1));
        }
        finally {
            stateDir.unlockGlobalState();
        }
    }

    @Test
    public void shouldNotCloseStoresIfCloseAlreadyCalled() throws Exception {
        this.stateManager.initialize((InternalProcessorContext)this.context);
        this.initializeConsumer(1L, 1L, this.t1);
        this.stateManager.register((StateStore)new NoOpReadOnlyStore("t1-store"){

            @Override
            public void close() {
                if (!this.isOpen()) {
                    throw new RuntimeException("store already closed");
                }
                super.close();
            }
        }, false, (StateRestoreCallback)this.stateRestoreCallback);
        this.stateManager.close(Collections.emptyMap());
        this.stateManager.close(Collections.emptyMap());
    }

    @Test
    public void shouldAttemptToCloseAllStoresEvenWhenSomeException() throws Exception {
        this.stateManager.initialize((InternalProcessorContext)this.context);
        this.initializeConsumer(1L, 1L, this.t1);
        this.initializeConsumer(1L, 1L, this.t2);
        NoOpReadOnlyStore store = new NoOpReadOnlyStore("t1-store"){

            @Override
            public void close() {
                super.close();
                throw new RuntimeException("KABOOM!");
            }
        };
        this.stateManager.register((StateStore)store, false, (StateRestoreCallback)this.stateRestoreCallback);
        this.stateManager.register((StateStore)this.store2, false, (StateRestoreCallback)this.stateRestoreCallback);
        try {
            this.stateManager.close(Collections.emptyMap());
        }
        catch (ProcessorStateException processorStateException) {
            // empty catch block
        }
        Assert.assertFalse((boolean)store.isOpen());
        Assert.assertFalse((boolean)this.store2.isOpen());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldReleaseLockIfExceptionWhenLoadingCheckpoints() throws Exception {
        this.writeCorruptCheckpoint();
        try {
            this.stateManager.initialize((InternalProcessorContext)this.context);
        }
        catch (StreamsException e) {
            // empty catch block
        }
        StateDirectory stateDir = new StateDirectory("appId", this.stateDirPath, (Time)new MockTime());
        try {
            Assert.assertTrue((boolean)stateDir.lockGlobalState(1));
        }
        finally {
            stateDir.unlockGlobalState();
        }
    }

    @Test
    public void shouldCheckpointOffsets() throws Exception {
        Map<TopicPartition, Long> offsets = Collections.singletonMap(this.t1, 25L);
        this.stateManager.initialize((InternalProcessorContext)this.context);
        this.stateManager.checkpoint(offsets);
        Map<TopicPartition, Long> result = this.readOffsetsCheckpoint();
        MatcherAssert.assertThat(result, (Matcher)CoreMatchers.equalTo(offsets));
        MatcherAssert.assertThat((Object)this.stateManager.checkpointed(), (Matcher)CoreMatchers.equalTo(offsets));
    }

    @Test
    public void shouldNotRemoveOffsetsOfUnUpdatedTablesDuringCheckpoint() throws Exception {
        this.stateManager.initialize((InternalProcessorContext)this.context);
        TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
        this.initializeConsumer(10L, 1L, this.t1);
        this.stateManager.register(this.store1, false, (StateRestoreCallback)stateRestoreCallback);
        this.initializeConsumer(20L, 1L, this.t2);
        this.stateManager.register((StateStore)this.store2, false, (StateRestoreCallback)stateRestoreCallback);
        Map initialCheckpoint = this.stateManager.checkpointed();
        this.stateManager.checkpoint(Collections.singletonMap(this.t1, 101L));
        Map updatedCheckpoint = this.stateManager.checkpointed();
        MatcherAssert.assertThat(updatedCheckpoint.get(this.t2), (Matcher)CoreMatchers.equalTo(initialCheckpoint.get(this.t2)));
        MatcherAssert.assertThat(updatedCheckpoint.get(this.t1), (Matcher)CoreMatchers.equalTo((Object)101L));
    }

    @Test
    public void shouldSkipNullKeysWhenRestoring() throws Exception {
        HashMap<TopicPartition, Long> startOffsets = new HashMap<TopicPartition, Long>();
        startOffsets.put(this.t1, 1L);
        HashMap<TopicPartition, Long> endOffsets = new HashMap<TopicPartition, Long>();
        endOffsets.put(this.t1, 2L);
        this.consumer.updatePartitions(this.t1.topic(), Collections.singletonList(new PartitionInfo(this.t1.topic(), this.t1.partition(), null, null, null)));
        this.consumer.assign(Collections.singletonList(this.t1));
        this.consumer.updateEndOffsets(endOffsets);
        this.consumer.updateBeginningOffsets(startOffsets);
        this.consumer.addRecord(new ConsumerRecord(this.t1.topic(), this.t1.partition(), 1L, (Object)null, (Object)"null".getBytes()));
        byte[] expectedKey = "key".getBytes();
        byte[] expectedValue = "value".getBytes();
        this.consumer.addRecord(new ConsumerRecord(this.t1.topic(), this.t1.partition(), 2L, (Object)expectedKey, (Object)expectedValue));
        this.stateManager.initialize((InternalProcessorContext)this.context);
        TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
        this.stateManager.register(this.store1, false, (StateRestoreCallback)stateRestoreCallback);
        KeyValue restoredKv = (KeyValue)stateRestoreCallback.restored.get(0);
        MatcherAssert.assertThat((Object)stateRestoreCallback.restored, (Matcher)CoreMatchers.equalTo(Collections.singletonList(KeyValue.pair((Object)restoredKv.key, (Object)restoredKv.value))));
    }

    @Test
    public void shouldCheckpointRestoredOffsetsToFile() throws IOException {
        this.stateManager.initialize((InternalProcessorContext)this.context);
        TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
        this.initializeConsumer(10L, 1L, this.t1);
        this.stateManager.register(this.store1, false, (StateRestoreCallback)stateRestoreCallback);
        this.stateManager.close(Collections.emptyMap());
        Map checkpointMap = this.stateManager.checkpointed();
        MatcherAssert.assertThat((Object)checkpointMap, (Matcher)CoreMatchers.equalTo(Collections.singletonMap(this.t1, 11L)));
        MatcherAssert.assertThat(this.readOffsetsCheckpoint(), (Matcher)CoreMatchers.equalTo((Object)checkpointMap));
    }

    @Test
    public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws IOException {
        this.stateManager.initialize((InternalProcessorContext)this.context);
        this.initializeConsumer(10L, 1L, this.t3);
        this.stateManager.register((StateStore)this.store3, false, (StateRestoreCallback)this.stateRestoreCallback);
        this.stateManager.close(Collections.emptyMap());
        MatcherAssert.assertThat(this.readOffsetsCheckpoint(), (Matcher)CoreMatchers.equalTo(Collections.emptyMap()));
    }

    private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException {
        OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(this.stateManager.baseDir(), ".checkpoint"));
        return offsetCheckpoint.read();
    }

    @Test
    public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() throws Exception {
        this.stateManager = new GlobalStateManagerImpl(this.topology, this.consumer, new StateDirectory("appId", this.stateDirPath, (Time)this.time){

            public boolean lockGlobalState(int retry) throws IOException {
                throw new IOException("KABOOM!");
            }
        });
        try {
            this.stateManager.initialize((InternalProcessorContext)this.context);
            Assert.fail((String)"Should have thrown LockException");
        }
        catch (LockException lockException) {
            // empty catch block
        }
    }

    private void writeCorruptCheckpoint() throws IOException {
        File checkpointFile = new File(this.stateManager.baseDir(), ".checkpoint");
        try (FileOutputStream stream = new FileOutputStream(checkpointFile);){
            stream.write("0\n1\nfoo".getBytes());
        }
    }

    private void initializeConsumer(long numRecords, long startOffset, TopicPartition topicPartition) {
        HashMap<TopicPartition, Long> startOffsets = new HashMap<TopicPartition, Long>();
        startOffsets.put(topicPartition, 1L);
        HashMap<TopicPartition, Long> endOffsets = new HashMap<TopicPartition, Long>();
        endOffsets.put(topicPartition, startOffset + numRecords - 1L);
        this.consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, null, null)));
        this.consumer.assign(Collections.singletonList(topicPartition));
        this.consumer.updateEndOffsets(endOffsets);
        this.consumer.updateBeginningOffsets(startOffsets);
        int i = 0;
        while ((long)i < numRecords) {
            this.consumer.addRecord(new ConsumerRecord(topicPartition.topic(), topicPartition.partition(), startOffset + (long)i, (Object)"key".getBytes(), (Object)"value".getBytes()));
            ++i;
        }
    }

    private Map<TopicPartition, Long> writeCheckpoint() throws IOException {
        OffsetCheckpoint checkpoint = new OffsetCheckpoint(this.checkpointFile);
        Map<TopicPartition, Long> expected = Collections.singletonMap(this.t1, 1L);
        checkpoint.write(expected);
        return expected;
    }

    private static class TheStateRestoreCallback
    implements StateRestoreCallback {
        private final List<KeyValue<byte[], byte[]>> restored = new ArrayList<KeyValue<byte[], byte[]>>();

        private TheStateRestoreCallback() {
        }

        public void restore(byte[] key, byte[] value) {
            this.restored.add((KeyValue<byte[], byte[]>)KeyValue.pair((Object)key, (Object)value));
        }
    }
}

