/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.function.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.PositionSerde;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ProcessorContextImplTest {
    private ProcessorContextImpl context;
    private final StreamsConfig streamsConfig = this.streamsConfigMock();
    private final RecordCollector recordCollector = (RecordCollector)EasyMock.mock(RecordCollector.class);
    private final ProcessorStateManager stateManager = (ProcessorStateManager)EasyMock.mock(ProcessorStateManager.class);
    private static final String KEY = "key";
    private static final Bytes KEY_BYTES = Bytes.wrap((byte[])"key".getBytes());
    private static final long VALUE = 42L;
    private static final byte[] VALUE_BYTES = String.valueOf(42L).getBytes();
    private static final long TIMESTAMP = 21L;
    private static final long STREAM_TIME = 50L;
    private static final ValueAndTimestamp<Long> VALUE_AND_TIMESTAMP = ValueAndTimestamp.make((Object)42L, (long)21L);
    private static final String STORE_NAME = "underlying-store";
    private static final String REGISTERED_STORE_NAME = "registered-store";
    private static final TopicPartition CHANGELOG_PARTITION = new TopicPartition("store-changelog", 1);
    private boolean flushExecuted;
    private boolean putExecuted;
    private boolean putWithTimestampExecuted;
    private boolean putIfAbsentExecuted;
    private boolean putAllExecuted;
    private boolean deleteExecuted;
    private boolean removeExecuted;
    private KeyValueIterator<String, Long> rangeIter;
    private KeyValueIterator<String, ValueAndTimestamp<Long>> timestampedRangeIter;
    private KeyValueIterator<String, Long> allIter;
    private KeyValueIterator<String, ValueAndTimestamp<Long>> timestampedAllIter;
    private final List<KeyValueIterator<Windowed<String>, Long>> iters = new ArrayList<KeyValueIterator<Windowed<String>, Long>>(7);
    private final List<KeyValueIterator<Windowed<String>, ValueAndTimestamp<Long>>> timestampedIters = new ArrayList<KeyValueIterator<Windowed<String>, ValueAndTimestamp<Long>>>(7);
    private WindowStoreIterator windowStoreIter;

    @Before
    public void setup() {
        this.flushExecuted = false;
        this.putExecuted = false;
        this.putIfAbsentExecuted = false;
        this.putAllExecuted = false;
        this.deleteExecuted = false;
        this.removeExecuted = false;
        this.rangeIter = (KeyValueIterator)EasyMock.mock(KeyValueIterator.class);
        this.timestampedRangeIter = (KeyValueIterator)EasyMock.mock(KeyValueIterator.class);
        this.allIter = (KeyValueIterator)EasyMock.mock(KeyValueIterator.class);
        this.timestampedAllIter = (KeyValueIterator)EasyMock.mock(KeyValueIterator.class);
        this.windowStoreIter = (WindowStoreIterator)EasyMock.mock(WindowStoreIterator.class);
        for (int i = 0; i < 7; ++i) {
            this.iters.add(i, (KeyValueIterator<Windowed<String>, Long>)EasyMock.mock(KeyValueIterator.class));
            this.timestampedIters.add(i, (KeyValueIterator<Windowed<String>, ValueAndTimestamp<Long>>)EasyMock.mock(KeyValueIterator.class));
        }
        EasyMock.expect((Object)this.stateManager.taskType()).andStubReturn((Object)Task.TaskType.ACTIVE);
        EasyMock.expect((Object)this.stateManager.getGlobalStore("GlobalKeyValueStore")).andReturn(this.keyValueStoreMock());
        EasyMock.expect((Object)this.stateManager.getGlobalStore("GlobalTimestampedKeyValueStore")).andReturn(this.timestampedKeyValueStoreMock());
        EasyMock.expect((Object)this.stateManager.getGlobalStore("GlobalWindowStore")).andReturn(this.windowStoreMock());
        EasyMock.expect((Object)this.stateManager.getGlobalStore("GlobalTimestampedWindowStore")).andReturn(this.timestampedWindowStoreMock());
        EasyMock.expect((Object)this.stateManager.getGlobalStore("GlobalSessionStore")).andReturn(this.sessionStoreMock());
        EasyMock.expect((Object)this.stateManager.getGlobalStore(EasyMock.anyString())).andReturn(null);
        EasyMock.expect((Object)this.stateManager.getStore("LocalKeyValueStore")).andReturn(this.keyValueStoreMock());
        EasyMock.expect((Object)this.stateManager.getStore("LocalTimestampedKeyValueStore")).andReturn(this.timestampedKeyValueStoreMock());
        EasyMock.expect((Object)this.stateManager.getStore("LocalWindowStore")).andReturn(this.windowStoreMock());
        EasyMock.expect((Object)this.stateManager.getStore("LocalTimestampedWindowStore")).andReturn(this.timestampedWindowStoreMock());
        EasyMock.expect((Object)this.stateManager.getStore("LocalSessionStore")).andReturn(this.sessionStoreMock());
        EasyMock.expect((Object)this.stateManager.registeredChangelogPartitionFor(REGISTERED_STORE_NAME)).andStubReturn((Object)CHANGELOG_PARTITION);
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        this.context = new ProcessorContextImpl((TaskId)EasyMock.mock(TaskId.class), this.streamsConfig, this.stateManager, (StreamsMetricsImpl)EasyMock.mock(StreamsMetricsImpl.class), (ThreadCache)EasyMock.mock(ThreadCache.class));
        StreamTask task = (StreamTask)EasyMock.mock(StreamTask.class);
        EasyMock.expect((Object)task.streamTime()).andReturn((Object)50L);
        EasyMock.expect((Object)task.recordCollector()).andStubReturn((Object)this.recordCollector);
        EasyMock.replay((Object[])new Object[]{task});
        this.context.transitionToActive(task, null, null);
        this.context.setCurrentNode(new ProcessorNode("fake", (org.apache.kafka.streams.processor.api.Processor)null, new HashSet<String>(Arrays.asList("LocalKeyValueStore", "LocalTimestampedKeyValueStore", "LocalWindowStore", "LocalTimestampedWindowStore", "LocalSessionStore"))));
    }

    private ProcessorContextImpl getStandbyContext() {
        ProcessorStateManager stateManager = (ProcessorStateManager)EasyMock.createNiceMock(ProcessorStateManager.class);
        EasyMock.expect((Object)stateManager.taskType()).andStubReturn((Object)Task.TaskType.STANDBY);
        EasyMock.replay((Object[])new Object[]{stateManager});
        return new ProcessorContextImpl((TaskId)EasyMock.mock(TaskId.class), this.streamsConfig, stateManager, (StreamsMetricsImpl)EasyMock.mock(StreamsMetricsImpl.class), (ThreadCache)EasyMock.mock(ThreadCache.class));
    }

    @Test
    public void globalKeyValueStoreShouldBeReadOnly() {
        this.doTest("GlobalKeyValueStore", store -> {
            this.verifyStoreCannotBeInitializedOrClosed((StateStore)store);
            this.checkThrowsUnsupportedOperation(() -> ((KeyValueStore)store).flush(), "flush()");
            this.checkThrowsUnsupportedOperation(() -> store.put((Object)"1", (Object)1L), "put()");
            this.checkThrowsUnsupportedOperation(() -> {
                Long cfr_ignored_0 = (Long)store.putIfAbsent((Object)"1", (Object)1L);
            }, "putIfAbsent()");
            this.checkThrowsUnsupportedOperation(() -> store.putAll(Collections.emptyList()), "putAll()");
            this.checkThrowsUnsupportedOperation(() -> {
                Long cfr_ignored_0 = (Long)store.delete((Object)"1");
            }, "delete()");
            Assert.assertEquals((Object)42L, (Object)store.get((Object)KEY));
            Assert.assertEquals(this.rangeIter, (Object)store.range((Object)"one", (Object)"two"));
            Assert.assertEquals(this.allIter, (Object)store.all());
            Assert.assertEquals((long)42L, (long)store.approximateNumEntries());
        });
    }

    @Test
    public void globalTimestampedKeyValueStoreShouldBeReadOnly() {
        this.doTest("GlobalTimestampedKeyValueStore", store -> {
            this.verifyStoreCannotBeInitializedOrClosed((StateStore)store);
            this.checkThrowsUnsupportedOperation(() -> ((TimestampedKeyValueStore)store).flush(), "flush()");
            this.checkThrowsUnsupportedOperation(() -> store.put((Object)"1", (Object)ValueAndTimestamp.make((Object)1L, (long)2L)), "put()");
            this.checkThrowsUnsupportedOperation(() -> {
                ValueAndTimestamp cfr_ignored_0 = (ValueAndTimestamp)store.putIfAbsent((Object)"1", (Object)ValueAndTimestamp.make((Object)1L, (long)2L));
            }, "putIfAbsent()");
            this.checkThrowsUnsupportedOperation(() -> store.putAll(Collections.emptyList()), "putAll()");
            this.checkThrowsUnsupportedOperation(() -> {
                ValueAndTimestamp cfr_ignored_0 = (ValueAndTimestamp)store.delete((Object)"1");
            }, "delete()");
            Assert.assertEquals(VALUE_AND_TIMESTAMP, (Object)store.get((Object)KEY));
            Assert.assertEquals(this.timestampedRangeIter, (Object)store.range((Object)"one", (Object)"two"));
            Assert.assertEquals(this.timestampedAllIter, (Object)store.all());
            Assert.assertEquals((long)42L, (long)store.approximateNumEntries());
        });
    }

    @Test
    public void globalWindowStoreShouldBeReadOnly() {
        this.doTest("GlobalWindowStore", store -> {
            this.verifyStoreCannotBeInitializedOrClosed((StateStore)store);
            this.checkThrowsUnsupportedOperation(() -> ((WindowStore)store).flush(), "flush()");
            this.checkThrowsUnsupportedOperation(() -> store.put((Object)"1", (Object)1L, 1L), "put()");
            Assert.assertEquals(this.iters.get(0), (Object)store.fetchAll(0L, 0L));
            Assert.assertEquals((Object)this.windowStoreIter, (Object)store.fetch((Object)KEY, 0L, 1L));
            Assert.assertEquals(this.iters.get(1), (Object)store.fetch((Object)KEY, (Object)KEY, 0L, 1L));
            Assert.assertEquals((Object)42L, (Object)store.fetch((Object)KEY, 1L));
            Assert.assertEquals(this.iters.get(2), (Object)store.all());
        });
    }

    @Test
    public void globalTimestampedWindowStoreShouldBeReadOnly() {
        this.doTest("GlobalTimestampedWindowStore", store -> {
            this.verifyStoreCannotBeInitializedOrClosed((StateStore)store);
            this.checkThrowsUnsupportedOperation(() -> ((TimestampedWindowStore)store).flush(), "flush()");
            this.checkThrowsUnsupportedOperation(() -> store.put((Object)"1", (Object)ValueAndTimestamp.make((Object)1L, (long)1L), 1L), "put() [with timestamp]");
            Assert.assertEquals(this.timestampedIters.get(0), (Object)store.fetchAll(0L, 0L));
            Assert.assertEquals((Object)this.windowStoreIter, (Object)store.fetch((Object)KEY, 0L, 1L));
            Assert.assertEquals(this.timestampedIters.get(1), (Object)store.fetch((Object)KEY, (Object)KEY, 0L, 1L));
            Assert.assertEquals(VALUE_AND_TIMESTAMP, (Object)store.fetch((Object)KEY, 1L));
            Assert.assertEquals(this.timestampedIters.get(2), (Object)store.all());
        });
    }

    @Test
    public void globalSessionStoreShouldBeReadOnly() {
        this.doTest("GlobalSessionStore", store -> {
            this.verifyStoreCannotBeInitializedOrClosed((StateStore)store);
            this.checkThrowsUnsupportedOperation(() -> ((SessionStore)store).flush(), "flush()");
            this.checkThrowsUnsupportedOperation(() -> store.remove(null), "remove()");
            this.checkThrowsUnsupportedOperation(() -> store.put(null, null), "put()");
            Assert.assertEquals(this.iters.get(3), (Object)store.findSessions((Object)KEY, 1L, 2L));
            Assert.assertEquals(this.iters.get(4), (Object)store.findSessions((Object)KEY, (Object)KEY, 1L, 2L));
            Assert.assertEquals(this.iters.get(5), (Object)store.fetch((Object)KEY));
            Assert.assertEquals(this.iters.get(6), (Object)store.fetch((Object)KEY, (Object)KEY));
        });
    }

    @Test
    public void localKeyValueStoreShouldNotAllowInitOrClose() {
        this.doTest("LocalKeyValueStore", store -> {
            this.verifyStoreCannotBeInitializedOrClosed((StateStore)store);
            store.flush();
            Assert.assertTrue((boolean)this.flushExecuted);
            store.put((Object)"1", (Object)1L);
            Assert.assertTrue((boolean)this.putExecuted);
            store.putIfAbsent((Object)"1", (Object)1L);
            Assert.assertTrue((boolean)this.putIfAbsentExecuted);
            store.putAll(Collections.emptyList());
            Assert.assertTrue((boolean)this.putAllExecuted);
            store.delete((Object)"1");
            Assert.assertTrue((boolean)this.deleteExecuted);
            Assert.assertEquals((Object)42L, (Object)store.get((Object)KEY));
            Assert.assertEquals(this.rangeIter, (Object)store.range((Object)"one", (Object)"two"));
            Assert.assertEquals(this.allIter, (Object)store.all());
            Assert.assertEquals((long)42L, (long)store.approximateNumEntries());
        });
    }

    @Test
    public void localTimestampedKeyValueStoreShouldNotAllowInitOrClose() {
        this.doTest("LocalTimestampedKeyValueStore", store -> {
            this.verifyStoreCannotBeInitializedOrClosed((StateStore)store);
            store.flush();
            Assert.assertTrue((boolean)this.flushExecuted);
            store.put((Object)"1", (Object)ValueAndTimestamp.make((Object)1L, (long)2L));
            Assert.assertTrue((boolean)this.putExecuted);
            store.putIfAbsent((Object)"1", (Object)ValueAndTimestamp.make((Object)1L, (long)2L));
            Assert.assertTrue((boolean)this.putIfAbsentExecuted);
            store.putAll(Collections.emptyList());
            Assert.assertTrue((boolean)this.putAllExecuted);
            store.delete((Object)"1");
            Assert.assertTrue((boolean)this.deleteExecuted);
            Assert.assertEquals(VALUE_AND_TIMESTAMP, (Object)store.get((Object)KEY));
            Assert.assertEquals(this.timestampedRangeIter, (Object)store.range((Object)"one", (Object)"two"));
            Assert.assertEquals(this.timestampedAllIter, (Object)store.all());
            Assert.assertEquals((long)42L, (long)store.approximateNumEntries());
        });
    }

    @Test
    public void localWindowStoreShouldNotAllowInitOrClose() {
        this.doTest("LocalWindowStore", store -> {
            this.verifyStoreCannotBeInitializedOrClosed((StateStore)store);
            store.flush();
            Assert.assertTrue((boolean)this.flushExecuted);
            store.put((Object)"1", (Object)1L, 1L);
            Assert.assertTrue((boolean)this.putExecuted);
            Assert.assertEquals(this.iters.get(0), (Object)store.fetchAll(0L, 0L));
            Assert.assertEquals((Object)this.windowStoreIter, (Object)store.fetch((Object)KEY, 0L, 1L));
            Assert.assertEquals(this.iters.get(1), (Object)store.fetch((Object)KEY, (Object)KEY, 0L, 1L));
            Assert.assertEquals((Object)42L, (Object)store.fetch((Object)KEY, 1L));
            Assert.assertEquals(this.iters.get(2), (Object)store.all());
        });
    }

    @Test
    public void localTimestampedWindowStoreShouldNotAllowInitOrClose() {
        this.doTest("LocalTimestampedWindowStore", store -> {
            this.verifyStoreCannotBeInitializedOrClosed((StateStore)store);
            store.flush();
            Assert.assertTrue((boolean)this.flushExecuted);
            store.put((Object)"1", (Object)ValueAndTimestamp.make((Object)1L, (long)1L), 1L);
            Assert.assertTrue((boolean)this.putExecuted);
            store.put((Object)"1", (Object)ValueAndTimestamp.make((Object)1L, (long)1L), 1L);
            Assert.assertTrue((boolean)this.putWithTimestampExecuted);
            Assert.assertEquals(this.timestampedIters.get(0), (Object)store.fetchAll(0L, 0L));
            Assert.assertEquals((Object)this.windowStoreIter, (Object)store.fetch((Object)KEY, 0L, 1L));
            Assert.assertEquals(this.timestampedIters.get(1), (Object)store.fetch((Object)KEY, (Object)KEY, 0L, 1L));
            Assert.assertEquals(VALUE_AND_TIMESTAMP, (Object)store.fetch((Object)KEY, 1L));
            Assert.assertEquals(this.timestampedIters.get(2), (Object)store.all());
        });
    }

    @Test
    public void localSessionStoreShouldNotAllowInitOrClose() {
        this.doTest("LocalSessionStore", store -> {
            this.verifyStoreCannotBeInitializedOrClosed((StateStore)store);
            store.flush();
            Assert.assertTrue((boolean)this.flushExecuted);
            store.remove(null);
            Assert.assertTrue((boolean)this.removeExecuted);
            store.put(null, null);
            Assert.assertTrue((boolean)this.putExecuted);
            Assert.assertEquals(this.iters.get(3), (Object)store.findSessions((Object)KEY, 1L, 2L));
            Assert.assertEquals(this.iters.get(4), (Object)store.findSessions((Object)KEY, (Object)KEY, 1L, 2L));
            Assert.assertEquals(this.iters.get(5), (Object)store.fetch((Object)KEY));
            Assert.assertEquals(this.iters.get(6), (Object)store.fetch((Object)KEY, (Object)KEY));
        });
    }

    @Test
    public void shouldNotSendRecordHeadersToChangelogTopic() {
        this.recordCollector.send(CHANGELOG_PARTITION.topic(), (Object)KEY_BYTES, (Object)VALUE_BYTES, null, Integer.valueOf(CHANGELOG_PARTITION.partition()), Long.valueOf(21L), (Serializer)ProcessorContextImpl.BYTES_KEY_SERIALIZER, (Serializer)ProcessorContextImpl.BYTEARRAY_VALUE_SERIALIZER);
        StreamTask task = (StreamTask)EasyMock.createNiceMock(StreamTask.class);
        EasyMock.replay((Object[])new Object[]{this.recordCollector, task});
        this.context.transitionToActive(task, this.recordCollector, null);
        this.context.logChange(REGISTERED_STORE_NAME, KEY_BYTES, VALUE_BYTES, 21L, Position.emptyPosition());
        EasyMock.verify((Object[])new Object[]{this.recordCollector});
    }

    @Test
    public void shouldSendRecordHeadersToChangelogTopicWhenConsistencyEnabled() {
        Position position = Position.emptyPosition();
        RecordHeaders headers = new RecordHeaders();
        headers.add((Header)ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
        headers.add((Header)new RecordHeader("c", PositionSerde.serialize((Position)position).array()));
        this.recordCollector.send(CHANGELOG_PARTITION.topic(), (Object)KEY_BYTES, (Object)VALUE_BYTES, (Headers)headers, Integer.valueOf(CHANGELOG_PARTITION.partition()), Long.valueOf(21L), (Serializer)ProcessorContextImpl.BYTES_KEY_SERIALIZER, (Serializer)ProcessorContextImpl.BYTEARRAY_VALUE_SERIALIZER);
        StreamTask task = (StreamTask)EasyMock.createNiceMock(StreamTask.class);
        EasyMock.replay((Object[])new Object[]{this.recordCollector, task});
        this.context = new ProcessorContextImpl((TaskId)EasyMock.mock(TaskId.class), this.streamsConfigWithConsistencyMock(), this.stateManager, (StreamsMetricsImpl)EasyMock.mock(StreamsMetricsImpl.class), (ThreadCache)EasyMock.mock(ThreadCache.class));
        this.context.transitionToActive(task, this.recordCollector, null);
        this.context.logChange(REGISTERED_STORE_NAME, KEY_BYTES, VALUE_BYTES, 21L, position);
        EasyMock.verify((Object[])new Object[]{this.recordCollector});
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnLogChange() {
        this.context = this.getStandbyContext();
        Assert.assertThrows(UnsupportedOperationException.class, () -> this.context.logChange("Store", Bytes.wrap((byte[])"k".getBytes()), null, 0L, Position.emptyPosition()));
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnGetStateStore() {
        this.context = this.getStandbyContext();
        Assert.assertThrows(UnsupportedOperationException.class, () -> this.context.getStateStore("store"));
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnForward() {
        this.context = this.getStandbyContext();
        Assert.assertThrows(UnsupportedOperationException.class, () -> this.context.forward((Object)KEY, (Object)"value"));
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnForwardWithTo() {
        this.context = this.getStandbyContext();
        Assert.assertThrows(UnsupportedOperationException.class, () -> this.context.forward((Object)KEY, (Object)"value", To.child((String)"child-name")));
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnCommit() {
        this.context = this.getStandbyContext();
        Assert.assertThrows(UnsupportedOperationException.class, () -> this.context.commit());
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnSchedule() {
        this.context = this.getStandbyContext();
        Assert.assertThrows(UnsupportedOperationException.class, () -> this.context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, t -> {}));
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnTopic() {
        this.context = this.getStandbyContext();
        Assert.assertThrows(UnsupportedOperationException.class, () -> this.context.topic());
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnPartition() {
        this.context = this.getStandbyContext();
        Assert.assertThrows(UnsupportedOperationException.class, () -> this.context.partition());
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnOffset() {
        this.context = this.getStandbyContext();
        Assert.assertThrows(UnsupportedOperationException.class, () -> this.context.offset());
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnTimestamp() {
        this.context = this.getStandbyContext();
        Assert.assertThrows(UnsupportedOperationException.class, () -> this.context.timestamp());
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnCurrentNode() {
        this.context = this.getStandbyContext();
        Assert.assertThrows(UnsupportedOperationException.class, () -> this.context.currentNode());
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnSetRecordContext() {
        this.context = this.getStandbyContext();
        Assert.assertThrows(UnsupportedOperationException.class, () -> this.context.setRecordContext((ProcessorRecordContext)EasyMock.mock(ProcessorRecordContext.class)));
    }

    @Test
    public void shouldThrowUnsupportedOperationExceptionOnRecordContext() {
        this.context = this.getStandbyContext();
        Assert.assertThrows(UnsupportedOperationException.class, () -> this.context.recordContext());
    }

    @Test
    public void shouldMatchStreamTime() {
        Assert.assertEquals((long)50L, (long)this.context.currentStreamTimeMs());
    }

    private KeyValueStore<String, Long> keyValueStoreMock() {
        KeyValueStore keyValueStoreMock = (KeyValueStore)EasyMock.mock(KeyValueStore.class);
        this.initStateStoreMock((StateStore)keyValueStoreMock);
        EasyMock.expect((Object)keyValueStoreMock.get((Object)KEY)).andReturn((Object)42L);
        EasyMock.expect((Object)keyValueStoreMock.approximateNumEntries()).andReturn((Object)42L);
        EasyMock.expect((Object)keyValueStoreMock.range((Object)"one", (Object)"two")).andReturn(this.rangeIter);
        EasyMock.expect((Object)keyValueStoreMock.all()).andReturn(this.allIter);
        keyValueStoreMock.put((Object)EasyMock.anyString(), (Object)EasyMock.anyLong());
        EasyMock.expectLastCall().andAnswer(() -> {
            this.putExecuted = true;
            return null;
        });
        keyValueStoreMock.putIfAbsent((Object)EasyMock.anyString(), (Object)EasyMock.anyLong());
        EasyMock.expectLastCall().andAnswer(() -> {
            this.putIfAbsentExecuted = true;
            return null;
        });
        keyValueStoreMock.putAll((List)EasyMock.anyObject(List.class));
        EasyMock.expectLastCall().andAnswer(() -> {
            this.putAllExecuted = true;
            return null;
        });
        keyValueStoreMock.delete((Object)EasyMock.anyString());
        EasyMock.expectLastCall().andAnswer(() -> {
            this.deleteExecuted = true;
            return null;
        });
        EasyMock.replay((Object[])new Object[]{keyValueStoreMock});
        return keyValueStoreMock;
    }

    private TimestampedKeyValueStore<String, Long> timestampedKeyValueStoreMock() {
        TimestampedKeyValueStore timestampedKeyValueStoreMock = (TimestampedKeyValueStore)EasyMock.mock(TimestampedKeyValueStore.class);
        this.initStateStoreMock((StateStore)timestampedKeyValueStoreMock);
        EasyMock.expect((Object)timestampedKeyValueStoreMock.get((Object)KEY)).andReturn(VALUE_AND_TIMESTAMP);
        EasyMock.expect((Object)timestampedKeyValueStoreMock.approximateNumEntries()).andReturn((Object)42L);
        EasyMock.expect((Object)timestampedKeyValueStoreMock.range((Object)"one", (Object)"two")).andReturn(this.timestampedRangeIter);
        EasyMock.expect((Object)timestampedKeyValueStoreMock.all()).andReturn(this.timestampedAllIter);
        timestampedKeyValueStoreMock.put((Object)EasyMock.anyString(), EasyMock.anyObject(ValueAndTimestamp.class));
        EasyMock.expectLastCall().andAnswer(() -> {
            this.putExecuted = true;
            return null;
        });
        timestampedKeyValueStoreMock.putIfAbsent((Object)EasyMock.anyString(), EasyMock.anyObject(ValueAndTimestamp.class));
        EasyMock.expectLastCall().andAnswer(() -> {
            this.putIfAbsentExecuted = true;
            return null;
        });
        timestampedKeyValueStoreMock.putAll((List)EasyMock.anyObject(List.class));
        EasyMock.expectLastCall().andAnswer(() -> {
            this.putAllExecuted = true;
            return null;
        });
        timestampedKeyValueStoreMock.delete((Object)EasyMock.anyString());
        EasyMock.expectLastCall().andAnswer(() -> {
            this.deleteExecuted = true;
            return null;
        });
        EasyMock.replay((Object[])new Object[]{timestampedKeyValueStoreMock});
        return timestampedKeyValueStoreMock;
    }

    private WindowStore<String, Long> windowStoreMock() {
        WindowStore windowStore = (WindowStore)EasyMock.mock(WindowStore.class);
        this.initStateStoreMock((StateStore)windowStore);
        EasyMock.expect((Object)windowStore.fetchAll(EasyMock.anyLong(), EasyMock.anyLong())).andReturn(this.iters.get(0));
        EasyMock.expect((Object)windowStore.fetch((Object)EasyMock.anyString(), (Object)EasyMock.anyString(), EasyMock.anyLong(), EasyMock.anyLong())).andReturn(this.iters.get(1));
        EasyMock.expect((Object)windowStore.fetch((Object)EasyMock.anyString(), EasyMock.anyLong(), EasyMock.anyLong())).andReturn((Object)this.windowStoreIter);
        EasyMock.expect((Object)windowStore.fetch((Object)EasyMock.anyString(), EasyMock.anyLong())).andReturn((Object)42L);
        EasyMock.expect((Object)windowStore.all()).andReturn(this.iters.get(2));
        windowStore.put((Object)EasyMock.anyString(), (Object)EasyMock.anyLong(), EasyMock.anyLong());
        EasyMock.expectLastCall().andAnswer(() -> {
            this.putExecuted = true;
            return null;
        });
        EasyMock.replay((Object[])new Object[]{windowStore});
        return windowStore;
    }

    private TimestampedWindowStore<String, Long> timestampedWindowStoreMock() {
        TimestampedWindowStore windowStore = (TimestampedWindowStore)EasyMock.mock(TimestampedWindowStore.class);
        this.initStateStoreMock((StateStore)windowStore);
        EasyMock.expect((Object)windowStore.fetchAll(EasyMock.anyLong(), EasyMock.anyLong())).andReturn(this.timestampedIters.get(0));
        EasyMock.expect((Object)windowStore.fetch((Object)EasyMock.anyString(), (Object)EasyMock.anyString(), EasyMock.anyLong(), EasyMock.anyLong())).andReturn(this.timestampedIters.get(1));
        EasyMock.expect((Object)windowStore.fetch((Object)EasyMock.anyString(), EasyMock.anyLong(), EasyMock.anyLong())).andReturn((Object)this.windowStoreIter);
        EasyMock.expect((Object)windowStore.fetch((Object)EasyMock.anyString(), EasyMock.anyLong())).andReturn(VALUE_AND_TIMESTAMP);
        EasyMock.expect((Object)windowStore.all()).andReturn(this.timestampedIters.get(2));
        windowStore.put((Object)EasyMock.anyString(), EasyMock.anyObject(ValueAndTimestamp.class), EasyMock.anyLong());
        EasyMock.expectLastCall().andAnswer(() -> {
            this.putExecuted = true;
            return null;
        });
        windowStore.put((Object)EasyMock.anyString(), EasyMock.anyObject(ValueAndTimestamp.class), EasyMock.anyLong());
        EasyMock.expectLastCall().andAnswer(() -> {
            this.putWithTimestampExecuted = true;
            return null;
        });
        EasyMock.replay((Object[])new Object[]{windowStore});
        return windowStore;
    }

    private SessionStore<String, Long> sessionStoreMock() {
        SessionStore sessionStore = (SessionStore)EasyMock.mock(SessionStore.class);
        this.initStateStoreMock((StateStore)sessionStore);
        EasyMock.expect((Object)sessionStore.findSessions((Object)EasyMock.anyString(), EasyMock.anyLong(), EasyMock.anyLong())).andReturn(this.iters.get(3));
        EasyMock.expect((Object)sessionStore.findSessions((Object)EasyMock.anyString(), (Object)EasyMock.anyString(), EasyMock.anyLong(), EasyMock.anyLong())).andReturn(this.iters.get(4));
        EasyMock.expect((Object)sessionStore.fetch((Object)EasyMock.anyString())).andReturn(this.iters.get(5));
        EasyMock.expect((Object)sessionStore.fetch((Object)EasyMock.anyString(), (Object)EasyMock.anyString())).andReturn(this.iters.get(6));
        sessionStore.put((Windowed)EasyMock.anyObject(Windowed.class), (Object)EasyMock.anyLong());
        EasyMock.expectLastCall().andAnswer(() -> {
            this.putExecuted = true;
            return null;
        });
        sessionStore.remove((Windowed)EasyMock.anyObject(Windowed.class));
        EasyMock.expectLastCall().andAnswer(() -> {
            this.removeExecuted = true;
            return null;
        });
        EasyMock.replay((Object[])new Object[]{sessionStore});
        return sessionStore;
    }

    private StreamsConfig streamsConfigMock() {
        StreamsConfig streamsConfig = (StreamsConfig)EasyMock.mock(StreamsConfig.class);
        EasyMock.expect((Object)streamsConfig.originals()).andStubReturn(Collections.emptyMap());
        EasyMock.expect((Object)streamsConfig.values()).andStubReturn(Collections.emptyMap());
        EasyMock.expect((Object)streamsConfig.getString("application.id")).andStubReturn((Object)"add-id");
        EasyMock.expect((Object)streamsConfig.defaultValueSerde()).andStubReturn((Object)Serdes.ByteArray());
        EasyMock.expect((Object)streamsConfig.defaultKeySerde()).andStubReturn((Object)Serdes.ByteArray());
        EasyMock.replay((Object[])new Object[]{streamsConfig});
        return streamsConfig;
    }

    private StreamsConfig streamsConfigWithConsistencyMock() {
        StreamsConfig streamsConfig = (StreamsConfig)EasyMock.mock(StreamsConfig.class);
        HashMap<String, Boolean> myValues = new HashMap<String, Boolean>();
        myValues.put("__iq.consistency.offset.vector.enabled__", true);
        EasyMock.expect((Object)streamsConfig.originals()).andStubReturn(myValues);
        EasyMock.expect((Object)streamsConfig.values()).andStubReturn(Collections.emptyMap());
        EasyMock.expect((Object)streamsConfig.getString("application.id")).andStubReturn((Object)"add-id");
        EasyMock.expect((Object)streamsConfig.defaultValueSerde()).andStubReturn((Object)Serdes.ByteArray());
        EasyMock.expect((Object)streamsConfig.defaultKeySerde()).andStubReturn((Object)Serdes.ByteArray());
        EasyMock.replay((Object[])new Object[]{streamsConfig});
        return streamsConfig;
    }

    private void initStateStoreMock(StateStore stateStore) {
        EasyMock.expect((Object)stateStore.name()).andReturn((Object)STORE_NAME);
        EasyMock.expect((Object)stateStore.persistent()).andReturn((Object)true);
        EasyMock.expect((Object)stateStore.isOpen()).andReturn((Object)true);
        stateStore.flush();
        EasyMock.expectLastCall().andAnswer(() -> {
            this.flushExecuted = true;
            return null;
        });
    }

    private <T extends StateStore> void doTest(final String name, final Consumer<T> checker) {
        Processor<String, Long> processor = new Processor<String, Long>(){

            public void init(ProcessorContext context) {
                StateStore store = context.getStateStore(name);
                checker.accept(store);
            }

            public void process(String k, Long v) {
            }

            public void close() {
            }
        };
        processor.init((ProcessorContext)this.context);
    }

    private void verifyStoreCannotBeInitializedOrClosed(StateStore store) {
        Assert.assertEquals((Object)STORE_NAME, (Object)store.name());
        Assert.assertTrue((boolean)store.persistent());
        Assert.assertTrue((boolean)store.isOpen());
        this.checkThrowsUnsupportedOperation(() -> store.init((StateStoreContext)null, null), "init()");
        this.checkThrowsUnsupportedOperation(() -> ((StateStore)store).close(), "close()");
    }

    private void checkThrowsUnsupportedOperation(Runnable check, String name) {
        try {
            check.run();
            Assert.fail((String)(name + " should throw exception"));
        }
        catch (UnsupportedOperationException unsupportedOperationException) {
            // empty catch block
        }
    }
}

