/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.CacheFlushListener;
import org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore;
import org.apache.kafka.streams.state.internals.ValueAndTimestampSerde;
import org.apache.kafka.test.KeyValueIteratorStub;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(value=MockitoJUnitRunner.StrictStubs.class)
public class MeteredTimestampedKeyValueStoreTest {
    private static final String APPLICATION_ID = "test-app";
    private static final String STORE_NAME = "store-name";
    private static final String STORE_TYPE = "scope";
    private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
    private static final String CHANGELOG_TOPIC = "changelog-topic-name";
    private static final String THREAD_ID_TAG_KEY = "thread-id";
    private static final String KEY = "key";
    private static final Bytes KEY_BYTES = Bytes.wrap((byte[])"key".getBytes());
    private static final ValueAndTimestamp<String> VALUE_AND_TIMESTAMP = ValueAndTimestamp.make((Object)"value", (long)97L);
    private static final byte[] VALUE_AND_TIMESTAMP_BYTES = "\u0000\u0000\u0000\u0000\u0000\u0000\u0000avalue".getBytes();
    private final String threadId = Thread.currentThread().getName();
    private final TaskId taskId = new TaskId(0, 0, "My-Topology");
    @Mock
    private KeyValueStore<Bytes, byte[]> inner;
    @Mock
    private InternalProcessorContext context;
    private static final Map<String, Object> CONFIGS = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"__internal.override.topic.prefix__", (Object)"test-app")});
    private MeteredTimestampedKeyValueStore<String, String> metered;
    private final KeyValue<Bytes, byte[]> byteKeyValueTimestampPair = KeyValue.pair((Object)KEY_BYTES, (Object)VALUE_AND_TIMESTAMP_BYTES);
    private final Metrics metrics = new Metrics();
    private Map<String, String> tags;

    @Before
    public void before() {
        MockTime mockTime = new MockTime();
        this.metered = new MeteredTimestampedKeyValueStore(this.inner, STORE_TYPE, (Time)mockTime, Serdes.String(), (Serde)new ValueAndTimestampSerde(Serdes.String()));
        this.metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
        Mockito.when((Object)this.context.applicationId()).thenReturn((Object)APPLICATION_ID);
        Mockito.when((Object)this.context.metrics()).thenReturn((Object)new StreamsMetricsImpl(this.metrics, "test", "latest", (Time)mockTime));
        Mockito.when((Object)this.context.taskId()).thenReturn((Object)this.taskId);
        Mockito.when((Object)this.context.changelogFor(STORE_NAME)).thenReturn((Object)CHANGELOG_TOPIC);
        this.expectSerdes();
        Mockito.when((Object)this.inner.name()).thenReturn((Object)STORE_NAME);
        Mockito.when((Object)this.context.appConfigs()).thenReturn(CONFIGS);
        this.tags = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)THREAD_ID_TAG_KEY, (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)this.taskId.toString()), Utils.mkEntry((Object)"scope-state-id", (Object)STORE_NAME)});
    }

    private void expectSerdes() {
        Mockito.when((Object)this.context.keySerde()).thenReturn((Object)Serdes.String());
        Mockito.when((Object)this.context.valueSerde()).thenReturn((Object)Serdes.Long());
    }

    private void init() {
        this.metered.init((StateStoreContext)this.context, this.metered);
    }

    @Test
    public void shouldDelegateDeprecatedInit() {
        MeteredTimestampedKeyValueStore outer = new MeteredTimestampedKeyValueStore(this.inner, STORE_TYPE, (Time)new MockTime(), Serdes.String(), (Serde)new ValueAndTimestampSerde(Serdes.String()));
        ((KeyValueStore)Mockito.doNothing().when(this.inner)).init((ProcessorContext)this.context, (StateStore)outer);
        outer.init((ProcessorContext)this.context, (StateStore)outer);
    }

    @Test
    public void shouldDelegateInit() {
        MeteredTimestampedKeyValueStore outer = new MeteredTimestampedKeyValueStore(this.inner, STORE_TYPE, (Time)new MockTime(), Serdes.String(), (Serde)new ValueAndTimestampSerde(Serdes.String()));
        ((KeyValueStore)Mockito.doNothing().when(this.inner)).init((StateStoreContext)this.context, (StateStore)outer);
        outer.init((StateStoreContext)this.context, (StateStore)outer);
    }

    @Test
    public void shouldPassChangelogTopicNameToStateStoreSerde() {
        this.doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC);
    }

    @Test
    public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
        String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic((String)APPLICATION_ID, (String)STORE_NAME, (String)this.taskId.topologyName());
        Mockito.when((Object)this.context.changelogFor(STORE_NAME)).thenReturn(null);
        this.doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
    }

    private void doShouldPassChangelogTopicNameToStateStoreSerde(String topic) {
        Serde keySerde = (Serde)Mockito.mock(Serde.class);
        Serializer keySerializer = (Serializer)Mockito.mock(Serializer.class);
        Serde valueSerde = (Serde)Mockito.mock(Serde.class);
        Deserializer valueDeserializer = (Deserializer)Mockito.mock(Deserializer.class);
        Serializer valueSerializer = (Serializer)Mockito.mock(Serializer.class);
        Mockito.when((Object)keySerde.serializer()).thenReturn((Object)keySerializer);
        Mockito.when((Object)keySerializer.serialize(topic, (Object)KEY)).thenReturn((Object)KEY.getBytes());
        Mockito.when((Object)valueSerde.deserializer()).thenReturn((Object)valueDeserializer);
        Mockito.when((Object)valueDeserializer.deserialize(topic, VALUE_AND_TIMESTAMP_BYTES)).thenReturn(VALUE_AND_TIMESTAMP);
        Mockito.when((Object)valueSerde.serializer()).thenReturn((Object)valueSerializer);
        Mockito.when((Object)valueSerializer.serialize(topic, VALUE_AND_TIMESTAMP)).thenReturn((Object)VALUE_AND_TIMESTAMP_BYTES);
        Mockito.when((Object)this.inner.get((Object)KEY_BYTES)).thenReturn((Object)VALUE_AND_TIMESTAMP_BYTES);
        this.metered = new MeteredTimestampedKeyValueStore(this.inner, STORE_TYPE, (Time)new MockTime(), keySerde, valueSerde);
        this.metered.init((StateStoreContext)this.context, this.metered);
        this.metered.get((Object)KEY);
        this.metered.put((Object)KEY, VALUE_AND_TIMESTAMP);
    }

    @Test
    public void testMetrics() {
        this.init();
        JmxReporter reporter = new JmxReporter();
        KafkaMetricsContext metricsContext = new KafkaMetricsContext("kafka.streams");
        reporter.contextChange((MetricsContext)metricsContext);
        this.metrics.addReporter((MetricsReporter)reporter);
        Assert.assertTrue((boolean)reporter.containsMbean(String.format("kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s", STORE_LEVEL_GROUP, THREAD_ID_TAG_KEY, this.threadId, this.taskId, STORE_TYPE, STORE_NAME)));
    }

    @Test
    public void shouldWriteBytesToInnerStoreAndRecordPutMetric() {
        ((KeyValueStore)Mockito.doNothing().when(this.inner)).put((Object)KEY_BYTES, (Object)VALUE_AND_TIMESTAMP_BYTES);
        this.init();
        this.metered.put((Object)KEY, VALUE_AND_TIMESTAMP);
        KafkaMetric metric = this.metric("put-rate");
        Assert.assertTrue(((Double)metric.metricValue() > 0.0 ? 1 : 0) != 0);
    }

    @Test
    public void shouldGetWithBinary() {
        Mockito.when((Object)this.inner.get((Object)KEY_BYTES)).thenReturn((Object)VALUE_AND_TIMESTAMP_BYTES);
        this.init();
        MeteredTimestampedKeyValueStore.RawAndDeserializedValue valueWithBinary = this.metered.getWithBinary((Object)KEY);
        Assert.assertEquals((Object)valueWithBinary.value, VALUE_AND_TIMESTAMP);
        Assert.assertArrayEquals((byte[])valueWithBinary.serializedValue, (byte[])VALUE_AND_TIMESTAMP_BYTES);
    }

    @Test
    public void shouldNotPutIfSameValuesAndGreaterTimestamp() {
        this.init();
        this.metered.put((Object)KEY, VALUE_AND_TIMESTAMP);
        ValueAndTimestampSerde stringSerde = new ValueAndTimestampSerde(Serdes.String());
        byte[] encodedOldValue = stringSerde.serializer().serialize("TOPIC", VALUE_AND_TIMESTAMP);
        ValueAndTimestamp newValueAndTimestamp = ValueAndTimestamp.make((Object)"value", (long)98L);
        Assert.assertFalse((boolean)this.metered.putIfDifferentValues((Object)KEY, newValueAndTimestamp, encodedOldValue));
    }

    @Test
    public void shouldPutIfOutOfOrder() {
        ((KeyValueStore)Mockito.doNothing().when(this.inner)).put((Object)KEY_BYTES, (Object)VALUE_AND_TIMESTAMP_BYTES);
        this.init();
        this.metered.put((Object)KEY, VALUE_AND_TIMESTAMP);
        ValueAndTimestampSerde stringSerde = new ValueAndTimestampSerde(Serdes.String());
        byte[] encodedOldValue = stringSerde.serializer().serialize("TOPIC", VALUE_AND_TIMESTAMP);
        ValueAndTimestamp outOfOrderValueAndTimestamp = ValueAndTimestamp.make((Object)"value", (long)95L);
        Assert.assertTrue((boolean)this.metered.putIfDifferentValues((Object)KEY, outOfOrderValueAndTimestamp, encodedOldValue));
    }

    @Test
    public void shouldGetBytesFromInnerStoreAndReturnGetMetric() {
        Mockito.when((Object)this.inner.get((Object)KEY_BYTES)).thenReturn((Object)VALUE_AND_TIMESTAMP_BYTES);
        this.init();
        MatcherAssert.assertThat((Object)this.metered.get((Object)KEY), (Matcher)CoreMatchers.equalTo(VALUE_AND_TIMESTAMP));
        KafkaMetric metric = this.metric("get-rate");
        Assert.assertTrue(((Double)metric.metricValue() > 0.0 ? 1 : 0) != 0);
    }

    @Test
    public void shouldPutIfAbsentAndRecordPutIfAbsentMetric() {
        Mockito.when((Object)this.inner.putIfAbsent((Object)KEY_BYTES, (Object)VALUE_AND_TIMESTAMP_BYTES)).thenReturn(null);
        this.init();
        this.metered.putIfAbsent((Object)KEY, VALUE_AND_TIMESTAMP);
        KafkaMetric metric = this.metric("put-if-absent-rate");
        Assert.assertTrue(((Double)metric.metricValue() > 0.0 ? 1 : 0) != 0);
    }

    private KafkaMetric metric(String name) {
        return this.metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "", this.tags));
    }

    @Test
    public void shouldPutAllToInnerStoreAndRecordPutAllMetric() {
        ((KeyValueStore)Mockito.doNothing().when(this.inner)).putAll((List)ArgumentMatchers.any(List.class));
        this.init();
        this.metered.putAll(Collections.singletonList(KeyValue.pair((Object)KEY, VALUE_AND_TIMESTAMP)));
        KafkaMetric metric = this.metric("put-all-rate");
        Assert.assertTrue(((Double)metric.metricValue() > 0.0 ? 1 : 0) != 0);
    }

    @Test
    public void shouldDeleteFromInnerStoreAndRecordDeleteMetric() {
        Mockito.when((Object)this.inner.delete((Object)KEY_BYTES)).thenReturn((Object)VALUE_AND_TIMESTAMP_BYTES);
        this.init();
        this.metered.delete((Object)KEY);
        KafkaMetric metric = this.metric("delete-rate");
        Assert.assertTrue(((Double)metric.metricValue() > 0.0 ? 1 : 0) != 0);
    }

    @Test
    public void shouldGetRangeFromInnerStoreAndRecordRangeMetric() {
        Mockito.when((Object)this.inner.range((Object)KEY_BYTES, (Object)KEY_BYTES)).thenReturn(new KeyValueIteratorStub(Collections.singletonList(this.byteKeyValueTimestampPair).iterator()));
        this.init();
        KeyValueIterator iterator = this.metered.range((Object)KEY, (Object)KEY);
        MatcherAssert.assertThat((Object)((KeyValue)iterator.next()).value, (Matcher)CoreMatchers.equalTo(VALUE_AND_TIMESTAMP));
        Assert.assertFalse((boolean)iterator.hasNext());
        iterator.close();
        KafkaMetric metric = this.metric("range-rate");
        Assert.assertTrue(((Double)metric.metricValue() > 0.0 ? 1 : 0) != 0);
    }

    @Test
    public void shouldGetAllFromInnerStoreAndRecordAllMetric() {
        Mockito.when((Object)this.inner.all()).thenReturn(new KeyValueIteratorStub(Collections.singletonList(this.byteKeyValueTimestampPair).iterator()));
        this.init();
        KeyValueIterator iterator = this.metered.all();
        MatcherAssert.assertThat((Object)((KeyValue)iterator.next()).value, (Matcher)CoreMatchers.equalTo(VALUE_AND_TIMESTAMP));
        Assert.assertFalse((boolean)iterator.hasNext());
        iterator.close();
        KafkaMetric metric = this.metric(new MetricName("all-rate", STORE_LEVEL_GROUP, "", this.tags));
        Assert.assertTrue(((Double)metric.metricValue() > 0.0 ? 1 : 0) != 0);
    }

    @Test
    public void shouldFlushInnerWhenFlushTimeRecords() {
        ((KeyValueStore)Mockito.doNothing().when(this.inner)).flush();
        this.init();
        this.metered.flush();
        KafkaMetric metric = this.metric("flush-rate");
        Assert.assertTrue(((Double)metric.metricValue() > 0.0 ? 1 : 0) != 0);
    }

    @Test
    public void shouldSetFlushListenerOnWrappedCachingStore() {
        CachedKeyValueStore cachedKeyValueStore = (CachedKeyValueStore)Mockito.mock(CachedKeyValueStore.class);
        Mockito.when((Object)cachedKeyValueStore.setFlushListener((CacheFlushListener)ArgumentMatchers.any(CacheFlushListener.class), ArgumentMatchers.eq((boolean)false))).thenReturn((Object)true);
        this.metered = new MeteredTimestampedKeyValueStore((KeyValueStore)cachedKeyValueStore, STORE_TYPE, (Time)new MockTime(), Serdes.String(), (Serde)new ValueAndTimestampSerde(Serdes.String()));
        Assert.assertTrue((boolean)this.metered.setFlushListener(null, false));
    }

    @Test
    public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() {
        Assert.assertFalse((boolean)this.metered.setFlushListener(null, false));
    }

    private KafkaMetric metric(MetricName metricName) {
        return this.metrics.metric(metricName);
    }

    @Test
    public void shouldNotThrowExceptionIfSerdesCorrectlySetFromProcessorContext() {
        MeteredTimestampedKeyValueStore store = new MeteredTimestampedKeyValueStore(this.inner, STORE_TYPE, (Time)new MockTime(), null, null);
        store.init((StateStoreContext)this.context, this.inner);
        try {
            store.put((Object)KEY, (Object)ValueAndTimestamp.make((Object)42L, (long)60000L));
        }
        catch (StreamsException exception) {
            if (exception.getCause() instanceof ClassCastException) {
                throw new AssertionError("Serdes are not correctly set from processor context.", exception);
            }
            throw exception;
        }
    }

    @Test
    public void shouldNotThrowExceptionIfSerdesCorrectlySetFromConstructorParameters() {
        MeteredTimestampedKeyValueStore store = new MeteredTimestampedKeyValueStore(this.inner, STORE_TYPE, (Time)new MockTime(), Serdes.String(), (Serde)new ValueAndTimestampSerde(Serdes.Long()));
        store.init((StateStoreContext)this.context, this.inner);
        try {
            store.put((Object)KEY, (Object)ValueAndTimestamp.make((Object)42L, (long)60000L));
        }
        catch (StreamsException exception) {
            if (exception.getCause() instanceof ClassCastException) {
                Assert.fail((String)"Serdes are not correctly set from constructor parameters.");
            }
            throw exception;
        }
    }

    private static interface CachedKeyValueStore
    extends KeyValueStore<Bytes, byte[]>,
    CachedStateStore<byte[], byte[]> {
    }
}

