/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.Map;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStore;
import org.apache.kafka.streams.state.internals.SerdeThatDoesntHandleNull;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.ValueAndTimestampSerde;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(value=MockitoJUnitRunner.StrictStubs.class)
public class MeteredTimestampedWindowStoreTest {
    private static final String STORE_NAME = "mocked-store";
    private static final String STORE_TYPE = "scope";
    private static final String CHANGELOG_TOPIC = "changelog-topic";
    private static final String KEY = "key";
    private static final Bytes KEY_BYTES = Bytes.wrap((byte[])"key".getBytes());
    private static final long TIMESTAMP = 97L;
    private static final ValueAndTimestamp<String> VALUE_AND_TIMESTAMP = ValueAndTimestamp.make((Object)"value", (long)97L);
    private static final byte[] VALUE_AND_TIMESTAMP_BYTES = "\u0000\u0000\u0000\u0000\u0000\u0000\u0000avalue".getBytes();
    private static final int WINDOW_SIZE_MS = 10;
    private InternalMockProcessorContext context;
    private final TaskId taskId = new TaskId(0, 0, "My-Topology");
    @Mock
    private WindowStore<Bytes, byte[]> innerStoreMock;
    private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));
    private MeteredTimestampedWindowStore<String, String> store;

    @Before
    public void setUp() {
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, "test", "latest", (Time)new MockTime());
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), streamsMetrics, new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig()), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0L, streamsMetrics), Time.SYSTEM, this.taskId);
        Mockito.when((Object)this.innerStoreMock.name()).thenReturn((Object)STORE_NAME);
        this.store = new MeteredTimestampedWindowStore(this.innerStoreMock, 10L, STORE_TYPE, (Time)new MockTime(), Serdes.String(), (Serde)new ValueAndTimestampSerde((Serde)new SerdeThatDoesntHandleNull()));
    }

    @Test
    public void shouldDelegateDeprecatedInit() {
        WindowStore inner = (WindowStore)Mockito.mock(WindowStore.class);
        MeteredTimestampedWindowStore outer = new MeteredTimestampedWindowStore(inner, 10L, STORE_TYPE, (Time)new MockTime(), Serdes.String(), (Serde)new ValueAndTimestampSerde((Serde)new SerdeThatDoesntHandleNull()));
        Mockito.when((Object)inner.name()).thenReturn((Object)"store");
        outer.init((ProcessorContext)this.context, (StateStore)outer);
        ((WindowStore)Mockito.verify((Object)inner)).init((ProcessorContext)this.context, (StateStore)outer);
    }

    @Test
    public void shouldDelegateInit() {
        WindowStore inner = (WindowStore)Mockito.mock(WindowStore.class);
        MeteredTimestampedWindowStore outer = new MeteredTimestampedWindowStore(inner, 10L, STORE_TYPE, (Time)new MockTime(), Serdes.String(), (Serde)new ValueAndTimestampSerde((Serde)new SerdeThatDoesntHandleNull()));
        Mockito.when((Object)inner.name()).thenReturn((Object)"store");
        outer.init((StateStoreContext)this.context, (StateStore)outer);
        ((WindowStore)Mockito.verify((Object)inner)).init((StateStoreContext)this.context, (StateStore)outer);
    }

    @Test
    public void shouldPassChangelogTopicNameToStateStoreSerde() {
        this.context.addChangelogForStore(STORE_NAME, CHANGELOG_TOPIC);
        this.doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC);
    }

    @Test
    public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
        String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic((String)this.context.applicationId(), (String)STORE_NAME, (String)this.taskId.topologyName());
        this.doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
    }

    private void doShouldPassChangelogTopicNameToStateStoreSerde(String topic) {
        Serde keySerde = (Serde)Mockito.mock(Serde.class);
        Serializer keySerializer = (Serializer)Mockito.mock(Serializer.class);
        Serde valueSerde = (Serde)Mockito.mock(Serde.class);
        Deserializer valueDeserializer = (Deserializer)Mockito.mock(Deserializer.class);
        Serializer valueSerializer = (Serializer)Mockito.mock(Serializer.class);
        Mockito.when((Object)keySerde.serializer()).thenReturn((Object)keySerializer);
        Mockito.when((Object)keySerializer.serialize(topic, (Object)KEY)).thenReturn((Object)KEY.getBytes());
        Mockito.when((Object)valueSerde.deserializer()).thenReturn((Object)valueDeserializer);
        Mockito.when((Object)valueDeserializer.deserialize(topic, VALUE_AND_TIMESTAMP_BYTES)).thenReturn(VALUE_AND_TIMESTAMP);
        Mockito.when((Object)valueSerde.serializer()).thenReturn((Object)valueSerializer);
        Mockito.when((Object)valueSerializer.serialize(topic, VALUE_AND_TIMESTAMP)).thenReturn((Object)VALUE_AND_TIMESTAMP_BYTES);
        Mockito.when((Object)this.innerStoreMock.fetch((Object)KEY_BYTES, 97L)).thenReturn((Object)VALUE_AND_TIMESTAMP_BYTES);
        this.store = new MeteredTimestampedWindowStore(this.innerStoreMock, 10L, STORE_TYPE, (Time)new MockTime(), keySerde, valueSerde);
        this.store.init((StateStoreContext)this.context, this.store);
        this.store.fetch((Object)KEY, 97L);
        this.store.put((Object)KEY, VALUE_AND_TIMESTAMP, 97L);
        ((WindowStore)Mockito.verify(this.innerStoreMock)).fetch((Object)KEY_BYTES, 97L);
        ((WindowStore)Mockito.verify(this.innerStoreMock)).put((Object)KEY_BYTES, (Object)VALUE_AND_TIMESTAMP_BYTES, 97L);
    }

    @Test
    public void shouldCloseUnderlyingStore() {
        this.store.init((StateStoreContext)this.context, this.store);
        this.store.close();
        ((WindowStore)Mockito.verify(this.innerStoreMock)).close();
    }

    @Test
    public void shouldNotExceptionIfFetchReturnsNull() {
        Mockito.when((Object)this.innerStoreMock.fetch((Object)Bytes.wrap((byte[])"a".getBytes()), 0L)).thenReturn(null);
        this.store.init((StateStoreContext)this.context, this.store);
        Assert.assertNull((Object)this.store.fetch((Object)"a", 0L));
    }

    @Test
    public void shouldNotThrowExceptionIfSerdesCorrectlySetFromProcessorContext() {
        Mockito.when((Object)this.innerStoreMock.name()).thenReturn((Object)STORE_NAME);
        MeteredTimestampedWindowStore store = new MeteredTimestampedWindowStore(this.innerStoreMock, 10L, STORE_TYPE, (Time)new MockTime(), null, null);
        store.init((StateStoreContext)this.context, this.innerStoreMock);
        try {
            store.put((Object)KEY, (Object)ValueAndTimestamp.make((Object)42L, (long)60000L), 60000L);
        }
        catch (StreamsException exception) {
            if (exception.getCause() instanceof ClassCastException) {
                Assert.fail((String)"Serdes are not correctly set from processor context.");
            }
            throw exception;
        }
    }

    @Test
    public void shouldNotThrowExceptionIfSerdesCorrectlySetFromConstructorParameters() {
        Mockito.when((Object)this.innerStoreMock.name()).thenReturn((Object)STORE_NAME);
        MeteredTimestampedWindowStore store = new MeteredTimestampedWindowStore(this.innerStoreMock, 10L, STORE_TYPE, (Time)new MockTime(), Serdes.String(), (Serde)new ValueAndTimestampSerde(Serdes.Long()));
        store.init((StateStoreContext)this.context, this.innerStoreMock);
        try {
            store.put((Object)KEY, (Object)ValueAndTimestamp.make((Object)42L, (long)60000L), 60000L);
        }
        catch (StreamsException exception) {
            if (exception.getCause() instanceof ClassCastException) {
                Assert.fail((String)"Serdes are not correctly set from constructor parameters.");
            }
            throw exception;
        }
    }
}

