/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.CacheFlushListener;
import org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore;
import org.apache.kafka.streams.state.internals.ValueAndTimestampSerde;
import org.apache.kafka.test.KeyValueIteratorStub;
import org.easymock.EasyMock;
import org.easymock.EasyMockRule;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class MeteredTimestampedKeyValueStoreTest {
    @Rule
    public EasyMockRule rule = new EasyMockRule((Object)this);
    private static final String STORE_TYPE = "scope";
    private static final String STORE_LEVEL_GROUP_FROM_0100_TO_24 = "stream-scope-state-metrics";
    private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
    private static final String THREAD_ID_TAG_KEY_FROM_0100_TO_24 = "client-id";
    private static final String THREAD_ID_TAG_KEY = "thread-id";
    private final String threadId = Thread.currentThread().getName();
    private final TaskId taskId = new TaskId(0, 0);
    @Mock(type=MockType.NICE)
    private KeyValueStore<Bytes, byte[]> inner;
    @Mock(type=MockType.NICE)
    private InternalProcessorContext context;
    private MeteredTimestampedKeyValueStore<String, String> metered;
    private final String key = "key";
    private final Bytes keyBytes = Bytes.wrap((byte[])"key".getBytes());
    private final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make((Object)"value", (long)97L);
    private final byte[] valueAndTimestampBytes = "\u0000\u0000\u0000\u0000\u0000\u0000\u0000avalue".getBytes();
    private final KeyValue<Bytes, byte[]> byteKeyValueTimestampPair = KeyValue.pair((Object)this.keyBytes, (Object)this.valueAndTimestampBytes);
    private final Metrics metrics = new Metrics();
    private String storeLevelGroup;
    private String threadIdTagKey;
    private Map<String, String> tags;
    @Parameterized.Parameter
    public String builtInMetricsVersion;

    @Parameterized.Parameters(name="{0}")
    public static Collection<Object[]> data() {
        return Arrays.asList({"latest"}, {"0.10.0-2.4"});
    }

    @Before
    public void before() {
        this.metered = new MeteredTimestampedKeyValueStore(this.inner, STORE_TYPE, (Time)new MockTime(), Serdes.String(), (Serde)new ValueAndTimestampSerde(Serdes.String()));
        this.metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
        EasyMock.expect((Object)this.context.metrics()).andReturn((Object)new StreamsMetricsImpl(this.metrics, "test", this.builtInMetricsVersion)).anyTimes();
        EasyMock.expect((Object)this.context.taskId()).andReturn((Object)this.taskId).anyTimes();
        EasyMock.expect((Object)this.inner.name()).andReturn((Object)"metered").anyTimes();
        this.storeLevelGroup = "0.10.0-2.4".equals(this.builtInMetricsVersion) ? STORE_LEVEL_GROUP_FROM_0100_TO_24 : STORE_LEVEL_GROUP;
        this.threadIdTagKey = "0.10.0-2.4".equals(this.builtInMetricsVersion) ? THREAD_ID_TAG_KEY_FROM_0100_TO_24 : THREAD_ID_TAG_KEY;
        this.tags = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.threadIdTagKey, (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)this.taskId.toString()), Utils.mkEntry((Object)"scope-state-id", (Object)"metered")});
    }

    private void init() {
        EasyMock.replay((Object[])new Object[]{this.inner, this.context});
        this.metered.init((ProcessorContext)this.context, this.metered);
    }

    @Test
    public void testMetrics() {
        this.init();
        JmxReporter reporter = new JmxReporter();
        KafkaMetricsContext metricsContext = new KafkaMetricsContext("kafka.streams");
        reporter.contextChange((MetricsContext)metricsContext);
        this.metrics.addReporter((MetricsReporter)reporter);
        Assert.assertTrue((boolean)reporter.containsMbean(String.format("kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s", this.storeLevelGroup, this.threadIdTagKey, this.threadId, this.taskId.toString(), STORE_TYPE, "metered")));
        if ("0.10.0-2.4".equals(this.builtInMetricsVersion)) {
            Assert.assertTrue((boolean)reporter.containsMbean(String.format("kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s", this.storeLevelGroup, this.threadIdTagKey, this.threadId, this.taskId.toString(), STORE_TYPE, "all")));
        }
    }

    @Test
    public void shouldWriteBytesToInnerStoreAndRecordPutMetric() {
        this.inner.put(EasyMock.eq((Object)this.keyBytes), (Object)EasyMock.aryEq((byte[])this.valueAndTimestampBytes));
        EasyMock.expectLastCall();
        this.init();
        this.metered.put((Object)"key", this.valueAndTimestamp);
        KafkaMetric metric = this.metric("put-rate");
        Assert.assertTrue(((Double)metric.metricValue() > 0.0 ? 1 : 0) != 0);
        EasyMock.verify((Object[])new Object[]{this.inner});
    }

    @Test
    public void shouldGetWithBinary() {
        EasyMock.expect((Object)this.inner.get((Object)this.keyBytes)).andReturn((Object)this.valueAndTimestampBytes);
        this.inner.put(EasyMock.eq((Object)this.keyBytes), (Object)EasyMock.aryEq((byte[])this.valueAndTimestampBytes));
        EasyMock.expectLastCall();
        this.init();
        MeteredTimestampedKeyValueStore.RawAndDeserializedValue valueWithBinary = this.metered.getWithBinary((Object)"key");
        Assert.assertEquals((Object)valueWithBinary.value, this.valueAndTimestamp);
        Assert.assertEquals((Object)valueWithBinary.serializedValue, (Object)this.valueAndTimestampBytes);
    }

    @Test
    public void shouldNotPutIfSameValuesAndGreaterTimestamp() {
        this.init();
        this.metered.put((Object)"key", this.valueAndTimestamp);
        ValueAndTimestampSerde stringSerde = new ValueAndTimestampSerde(Serdes.String());
        byte[] encodedOldValue = stringSerde.serializer().serialize("TOPIC", this.valueAndTimestamp);
        ValueAndTimestamp newValueAndTimestamp = ValueAndTimestamp.make((Object)"value", (long)98L);
        Assert.assertFalse((boolean)this.metered.putIfDifferentValues((Object)"key", newValueAndTimestamp, encodedOldValue));
        EasyMock.verify((Object[])new Object[]{this.inner});
    }

    @Test
    public void shouldPutIfOutOfOrder() {
        this.inner.put(EasyMock.eq((Object)this.keyBytes), (Object)EasyMock.aryEq((byte[])this.valueAndTimestampBytes));
        EasyMock.expectLastCall();
        this.init();
        this.metered.put((Object)"key", this.valueAndTimestamp);
        ValueAndTimestampSerde stringSerde = new ValueAndTimestampSerde(Serdes.String());
        byte[] encodedOldValue = stringSerde.serializer().serialize("TOPIC", this.valueAndTimestamp);
        ValueAndTimestamp outOfOrderValueAndTimestamp = ValueAndTimestamp.make((Object)"value", (long)95L);
        Assert.assertTrue((boolean)this.metered.putIfDifferentValues((Object)"key", outOfOrderValueAndTimestamp, encodedOldValue));
        EasyMock.verify((Object[])new Object[]{this.inner});
    }

    @Test
    public void shouldGetBytesFromInnerStoreAndReturnGetMetric() {
        EasyMock.expect((Object)this.inner.get((Object)this.keyBytes)).andReturn((Object)this.valueAndTimestampBytes);
        this.init();
        MatcherAssert.assertThat((Object)this.metered.get((Object)"key"), (Matcher)CoreMatchers.equalTo(this.valueAndTimestamp));
        KafkaMetric metric = this.metric("get-rate");
        Assert.assertTrue(((Double)metric.metricValue() > 0.0 ? 1 : 0) != 0);
        EasyMock.verify((Object[])new Object[]{this.inner});
    }

    @Test
    public void shouldPutIfAbsentAndRecordPutIfAbsentMetric() {
        EasyMock.expect((Object)this.inner.putIfAbsent(EasyMock.eq((Object)this.keyBytes), (Object)EasyMock.aryEq((byte[])this.valueAndTimestampBytes))).andReturn(null);
        this.init();
        this.metered.putIfAbsent((Object)"key", this.valueAndTimestamp);
        KafkaMetric metric = this.metric("put-if-absent-rate");
        Assert.assertTrue(((Double)metric.metricValue() > 0.0 ? 1 : 0) != 0);
        EasyMock.verify((Object[])new Object[]{this.inner});
    }

    private KafkaMetric metric(String name) {
        return this.metrics.metric(new MetricName(name, this.storeLevelGroup, "", this.tags));
    }

    @Test
    public void shouldPutAllToInnerStoreAndRecordPutAllMetric() {
        this.inner.putAll((List)EasyMock.anyObject(List.class));
        EasyMock.expectLastCall();
        this.init();
        this.metered.putAll(Collections.singletonList(KeyValue.pair((Object)"key", this.valueAndTimestamp)));
        KafkaMetric metric = this.metric("put-all-rate");
        Assert.assertTrue(((Double)metric.metricValue() > 0.0 ? 1 : 0) != 0);
        EasyMock.verify((Object[])new Object[]{this.inner});
    }

    @Test
    public void shouldDeleteFromInnerStoreAndRecordDeleteMetric() {
        EasyMock.expect((Object)this.inner.delete((Object)this.keyBytes)).andReturn((Object)this.valueAndTimestampBytes);
        this.init();
        this.metered.delete((Object)"key");
        KafkaMetric metric = this.metric("delete-rate");
        Assert.assertTrue(((Double)metric.metricValue() > 0.0 ? 1 : 0) != 0);
        EasyMock.verify((Object[])new Object[]{this.inner});
    }

    @Test
    public void shouldGetRangeFromInnerStoreAndRecordRangeMetric() {
        EasyMock.expect((Object)this.inner.range((Object)this.keyBytes, (Object)this.keyBytes)).andReturn(new KeyValueIteratorStub(Collections.singletonList(this.byteKeyValueTimestampPair).iterator()));
        this.init();
        KeyValueIterator iterator = this.metered.range((Object)"key", (Object)"key");
        MatcherAssert.assertThat((Object)((KeyValue)iterator.next()).value, (Matcher)CoreMatchers.equalTo(this.valueAndTimestamp));
        Assert.assertFalse((boolean)iterator.hasNext());
        iterator.close();
        KafkaMetric metric = this.metric("range-rate");
        Assert.assertTrue(((Double)metric.metricValue() > 0.0 ? 1 : 0) != 0);
        EasyMock.verify((Object[])new Object[]{this.inner});
    }

    @Test
    public void shouldGetAllFromInnerStoreAndRecordAllMetric() {
        EasyMock.expect((Object)this.inner.all()).andReturn(new KeyValueIteratorStub(Collections.singletonList(this.byteKeyValueTimestampPair).iterator()));
        this.init();
        KeyValueIterator iterator = this.metered.all();
        MatcherAssert.assertThat((Object)((KeyValue)iterator.next()).value, (Matcher)CoreMatchers.equalTo(this.valueAndTimestamp));
        Assert.assertFalse((boolean)iterator.hasNext());
        iterator.close();
        KafkaMetric metric = this.metric(new MetricName("all-rate", this.storeLevelGroup, "", this.tags));
        Assert.assertTrue(((Double)metric.metricValue() > 0.0 ? 1 : 0) != 0);
        EasyMock.verify((Object[])new Object[]{this.inner});
    }

    @Test
    public void shouldFlushInnerWhenFlushTimeRecords() {
        this.inner.flush();
        EasyMock.expectLastCall().once();
        this.init();
        this.metered.flush();
        KafkaMetric metric = this.metric("flush-rate");
        Assert.assertTrue(((Double)metric.metricValue() > 0.0 ? 1 : 0) != 0);
        EasyMock.verify((Object[])new Object[]{this.inner});
    }

    @Test
    public void shouldSetFlushListenerOnWrappedCachingStore() {
        CachedKeyValueStore cachedKeyValueStore = (CachedKeyValueStore)EasyMock.mock(CachedKeyValueStore.class);
        EasyMock.expect((Object)cachedKeyValueStore.setFlushListener((CacheFlushListener)EasyMock.anyObject(CacheFlushListener.class), EasyMock.eq((boolean)false))).andReturn((Object)true);
        EasyMock.replay((Object[])new Object[]{cachedKeyValueStore});
        this.metered = new MeteredTimestampedKeyValueStore((KeyValueStore)cachedKeyValueStore, STORE_TYPE, (Time)new MockTime(), Serdes.String(), (Serde)new ValueAndTimestampSerde(Serdes.String()));
        Assert.assertTrue((boolean)this.metered.setFlushListener(null, false));
        EasyMock.verify((Object[])new Object[]{cachedKeyValueStore});
    }

    @Test
    public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() {
        Assert.assertFalse((boolean)this.metered.setFlushListener(null, false));
    }

    private KafkaMetric metric(MetricName metricName) {
        return this.metrics.metric(metricName);
    }

    @Test
    public void shouldNotThrowExceptionIfSerdesCorrectlySetFromProcessorContext() {
        EasyMock.expect((Object)this.context.keySerde()).andStubReturn((Object)Serdes.String());
        EasyMock.expect((Object)this.context.valueSerde()).andStubReturn((Object)Serdes.Long());
        MeteredTimestampedKeyValueStore store = new MeteredTimestampedKeyValueStore(this.inner, STORE_TYPE, (Time)new MockTime(), null, null);
        EasyMock.replay((Object[])new Object[]{this.inner, this.context});
        store.init((ProcessorContext)this.context, this.inner);
        try {
            store.put((Object)"key", (Object)ValueAndTimestamp.make((Object)42L, (long)60000L));
        }
        catch (StreamsException exception) {
            if (exception.getCause() instanceof ClassCastException) {
                Assert.fail((String)"Serdes are not correctly set from processor context.");
            }
            throw exception;
        }
    }

    @Test
    public void shouldNotThrowExceptionIfSerdesCorrectlySetFromConstructorParameters() {
        EasyMock.expect((Object)this.context.keySerde()).andStubReturn((Object)Serdes.String());
        EasyMock.expect((Object)this.context.valueSerde()).andStubReturn((Object)Serdes.Long());
        MeteredTimestampedKeyValueStore store = new MeteredTimestampedKeyValueStore(this.inner, STORE_TYPE, (Time)new MockTime(), Serdes.String(), (Serde)new ValueAndTimestampSerde(Serdes.Long()));
        EasyMock.replay((Object[])new Object[]{this.inner, this.context});
        store.init((ProcessorContext)this.context, this.inner);
        try {
            store.put((Object)"key", (Object)ValueAndTimestamp.make((Object)42L, (long)60000L));
        }
        catch (StreamsException exception) {
            if (exception.getCause() instanceof ClassCastException) {
                Assert.fail((String)"Serdes are not correctly set from constructor parameters.");
            }
            throw exception;
        }
    }

    private static interface CachedKeyValueStore
    extends KeyValueStore<Bytes, byte[]>,
    CachedStateStore<byte[], byte[]> {
    }
}

