/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.MultiVersionedKeyQuery;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.RangeQuery;
import org.apache.kafka.streams.query.ResultOrder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.VersionedBytesStore;
import org.apache.kafka.streams.state.VersionedRecord;
import org.apache.kafka.streams.state.VersionedRecordIterator;
import org.apache.kafka.streams.state.internals.LogicalSegmentIterator;
import org.apache.kafka.streams.state.internals.MeteredVersionedKeyValueStore;
import org.apache.kafka.streams.state.internals.ValueAndTimestampSerde;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith(value={MockitoExtension.class})
@MockitoSettings(strictness=Strictness.STRICT_STUBS)
public class MeteredVersionedKeyValueStoreTest {
    private static final String STORE_NAME = "versioned_store";
    private static final Serde<String> STRING_SERDE = new Serdes.StringSerde();
    private static final Serde<ValueAndTimestamp<String>> VALUE_AND_TIMESTAMP_SERDE = new ValueAndTimestampSerde(STRING_SERDE);
    private static final String METRICS_SCOPE = "scope";
    private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
    private static final String APPLICATION_ID = "test-app";
    private static final TaskId TASK_ID = new TaskId(0, 0, "My-Topology");
    private static final String KEY = "k";
    private static final String VALUE = "v";
    private static final long TIMESTAMP = 10L;
    private static final Bytes RAW_KEY = new Bytes(STRING_SERDE.serializer().serialize(null, (Object)"k"));
    private static final byte[] RAW_VALUE = STRING_SERDE.serializer().serialize(null, (Object)"v");
    private static final byte[] RAW_VALUE_AND_TIMESTAMP = VALUE_AND_TIMESTAMP_SERDE.serializer().serialize(null, (Object)ValueAndTimestamp.make((Object)"v", (long)10L));
    private final VersionedBytesStore inner = (VersionedBytesStore)Mockito.mock(VersionedBytesStore.class);
    private final Metrics metrics = new Metrics();
    private final Time mockTime = new MockTime();
    private final String threadId = Thread.currentThread().getName();
    private InternalProcessorContext context = (InternalProcessorContext)Mockito.mock(InternalProcessorContext.class);
    private Map<String, String> tags;
    private MeteredVersionedKeyValueStore<String, String> store;

    @BeforeEach
    public void setUp() {
        Mockito.when((Object)this.inner.name()).thenReturn((Object)STORE_NAME);
        Mockito.when((Object)this.context.metrics()).thenReturn((Object)new StreamsMetricsImpl(this.metrics, "test", "latest", this.mockTime));
        Mockito.when((Object)this.context.applicationId()).thenReturn((Object)APPLICATION_ID);
        Mockito.when((Object)this.context.taskId()).thenReturn((Object)TASK_ID);
        this.metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
        this.tags = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)TASK_ID.toString()), Utils.mkEntry((Object)"scope-state-id", (Object)STORE_NAME)});
        this.store = this.newMeteredStore(this.inner);
        this.store.init((StateStoreContext)this.context, this.store);
    }

    private MeteredVersionedKeyValueStore<String, String> newMeteredStore(VersionedBytesStore inner) {
        return new MeteredVersionedKeyValueStore(inner, METRICS_SCOPE, this.mockTime, STRING_SERDE, STRING_SERDE);
    }

    @Test
    public void shouldDelegateDeprecatedInit() {
        this.store.close();
        VersionedBytesStore mockInner = (VersionedBytesStore)Mockito.mock(VersionedBytesStore.class);
        this.store = this.newMeteredStore(mockInner);
        this.store.init((ProcessorContext)this.context, this.store);
        ((VersionedBytesStore)Mockito.verify((Object)mockInner)).init((ProcessorContext)this.context, this.store);
    }

    @Test
    public void shouldDelegateInit() {
        ((VersionedBytesStore)Mockito.verify((Object)this.inner)).init((StateStoreContext)this.context, this.store);
    }

    @Test
    public void shouldPassChangelogTopicNameToStateStoreSerde() {
        String changelogTopicName = "changelog-topic";
        Mockito.when((Object)this.context.changelogFor(STORE_NAME)).thenReturn((Object)"changelog-topic");
        this.doShouldPassChangelogTopicNameToStateStoreSerde("changelog-topic");
    }

    @Test
    public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
        String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic((String)APPLICATION_ID, (String)STORE_NAME, (String)TASK_ID.topologyName());
        Mockito.when((Object)this.context.changelogFor(STORE_NAME)).thenReturn(null);
        this.doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
    }

    private void doShouldPassChangelogTopicNameToStateStoreSerde(String changelogTopicName) {
        Serde keySerde = (Serde)Mockito.mock(Serde.class);
        Serializer keySerializer = (Serializer)Mockito.mock(Serializer.class);
        Serde valueSerde = (Serde)Mockito.mock(Serde.class);
        Serializer valueSerializer = (Serializer)Mockito.mock(Serializer.class);
        Deserializer valueDeserializer = (Deserializer)Mockito.mock(Deserializer.class);
        Mockito.when((Object)keySerde.serializer()).thenReturn((Object)keySerializer);
        Mockito.when((Object)valueSerde.serializer()).thenReturn((Object)valueSerializer);
        Mockito.when((Object)valueSerde.deserializer()).thenReturn((Object)valueDeserializer);
        this.store.close();
        this.store = new MeteredVersionedKeyValueStore(this.inner, METRICS_SCOPE, this.mockTime, keySerde, valueSerde);
        this.store.init((StateStoreContext)this.context, this.store);
        this.store.put((Object)KEY, (Object)VALUE, 10L);
        ((Serializer)Mockito.verify((Object)keySerializer)).serialize(changelogTopicName, (Object)KEY);
        ((Serializer)Mockito.verify((Object)valueSerializer)).serialize(changelogTopicName, (Object)VALUE);
    }

    @Test
    public void shouldRecordMetricsOnInit() {
        MatcherAssert.assertThat((Object)((Double)this.getMetric("restore-rate").metricValue()), (Matcher)Matchers.greaterThan((Comparable)Double.valueOf(0.0)));
    }

    @Test
    public void shouldDelegateAndRecordMetricsOnPut() {
        Mockito.when((Object)this.inner.put(RAW_KEY, RAW_VALUE, 10L)).thenReturn((Object)-1L);
        long validto = this.store.put((Object)KEY, (Object)VALUE, 10L);
        MatcherAssert.assertThat((Object)validto, (Matcher)Matchers.is((Object)-1L));
        MatcherAssert.assertThat((Object)((Double)this.getMetric("put-rate").metricValue()), (Matcher)Matchers.greaterThan((Comparable)Double.valueOf(0.0)));
    }

    @Test
    public void shouldDelegateAndRecordMetricsOnDelete() {
        Mockito.when((Object)this.inner.delete(RAW_KEY, 10L)).thenReturn((Object)RAW_VALUE_AND_TIMESTAMP);
        VersionedRecord result = this.store.delete((Object)KEY, 10L);
        MatcherAssert.assertThat((Object)result, (Matcher)Matchers.is((Object)new VersionedRecord((Object)VALUE, 10L)));
        MatcherAssert.assertThat((Object)((Double)this.getMetric("delete-rate").metricValue()), (Matcher)Matchers.greaterThan((Comparable)Double.valueOf(0.0)));
    }

    @Test
    public void shouldDelegateAndRecordMetricsOnGet() {
        Mockito.when((Object)this.inner.get((Object)RAW_KEY)).thenReturn((Object)RAW_VALUE_AND_TIMESTAMP);
        VersionedRecord result = this.store.get((Object)KEY);
        MatcherAssert.assertThat((Object)result, (Matcher)Matchers.is((Object)new VersionedRecord((Object)VALUE, 10L)));
        MatcherAssert.assertThat((Object)((Double)this.getMetric("get-rate").metricValue()), (Matcher)Matchers.greaterThan((Comparable)Double.valueOf(0.0)));
    }

    @Test
    public void shouldDelegateAndRecordMetricsOnGetWithTimestamp() {
        Mockito.when((Object)this.inner.get(RAW_KEY, 10L)).thenReturn((Object)RAW_VALUE_AND_TIMESTAMP);
        VersionedRecord result = this.store.get((Object)KEY, 10L);
        MatcherAssert.assertThat((Object)result, (Matcher)Matchers.is((Object)new VersionedRecord((Object)VALUE, 10L)));
        MatcherAssert.assertThat((Object)((Double)this.getMetric("get-rate").metricValue()), (Matcher)Matchers.greaterThan((Comparable)Double.valueOf(0.0)));
    }

    @Test
    public void shouldDelegateAndRecordMetricsOnFlush() {
        this.store.flush();
        ((VersionedBytesStore)Mockito.verify((Object)this.inner)).flush();
        MatcherAssert.assertThat((Object)((Double)this.getMetric("flush-rate").metricValue()), (Matcher)Matchers.greaterThan((Comparable)Double.valueOf(0.0)));
    }

    @Test
    public void shouldDelegateAndRemoveMetricsOnClose() {
        MatcherAssert.assertThat(this.storeMetrics(), (Matcher)Matchers.not((Matcher)Matchers.empty()));
        this.store.close();
        ((VersionedBytesStore)Mockito.verify((Object)this.inner)).close();
        MatcherAssert.assertThat(this.storeMetrics(), (Matcher)Matchers.empty());
    }

    @Test
    public void shouldRemoveMetricsOnCloseEvenIfInnerThrows() {
        ((VersionedBytesStore)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("uh oh")}).when((Object)this.inner)).close();
        MatcherAssert.assertThat(this.storeMetrics(), (Matcher)Matchers.not((Matcher)Matchers.empty()));
        Assertions.assertThrows(RuntimeException.class, () -> this.store.close());
        MatcherAssert.assertThat(this.storeMetrics(), (Matcher)Matchers.empty());
    }

    @Test
    public void shouldNotSetFlushListenerIfInnerIsNotCaching() {
        MatcherAssert.assertThat((Object)this.store.setFlushListener(null, false), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldThrowNullPointerOnPutIfKeyIsNull() {
        Assertions.assertThrows(NullPointerException.class, () -> this.store.put(null, (Object)VALUE, 10L));
    }

    @Test
    public void shouldThrowNullPointerOnDeleteIfKeyIsNull() {
        Assertions.assertThrows(NullPointerException.class, () -> this.store.delete(null, 10L));
    }

    @Test
    public void shouldThrowNullPointerOnGetIfKeyIsNull() {
        Assertions.assertThrows(NullPointerException.class, () -> this.store.get(null));
    }

    @Test
    public void shouldThrowNullPointerOnGetWithTimestampIfKeyIsNull() {
        Assertions.assertThrows(NullPointerException.class, () -> this.store.get(null, 10L));
    }

    @Test
    public void shouldThrowOnIQv2RangeQuery() {
        Assertions.assertThrows(UnsupportedOperationException.class, () -> this.store.query((Query)Mockito.mock(RangeQuery.class), null, null));
    }

    @Test
    public void shouldThrowOnIQv2KeyQuery() {
        Assertions.assertThrows(UnsupportedOperationException.class, () -> this.store.query((Query)Mockito.mock(KeyQuery.class), null, null));
    }

    @Test
    public void shouldThrowOnMultiVersionedKeyQueryInvalidTimeRange() {
        MultiVersionedKeyQuery query = MultiVersionedKeyQuery.withKey((Object)"key");
        Instant fromTime = Instant.now();
        Instant toTime = Instant.ofEpochMilli(fromTime.toEpochMilli() - 100L);
        MultiVersionedKeyQuery finalQuery = query = query.fromTime(fromTime).toTime(toTime);
        Exception exception = (Exception)Assertions.assertThrows(IllegalArgumentException.class, () -> this.store.query((Query)finalQuery, null, null));
        Assertions.assertEquals((Object)"The `fromTime` timestamp must be smaller than the `toTime` timestamp.", (Object)exception.getMessage());
    }

    @Test
    public void shouldDelegateAndAddExecutionInfoOnCustomQuery() {
        Query query = (Query)Mockito.mock(Query.class);
        PositionBound positionBound = (PositionBound)Mockito.mock(PositionBound.class);
        QueryConfig queryConfig = (QueryConfig)Mockito.mock(QueryConfig.class);
        QueryResult result = (QueryResult)Mockito.mock(QueryResult.class);
        Mockito.when((Object)this.inner.query(query, positionBound, queryConfig)).thenReturn((Object)result);
        Mockito.when((Object)queryConfig.isCollectExecutionInfo()).thenReturn((Object)true);
        MatcherAssert.assertThat((Object)this.store.query(query, positionBound, queryConfig), (Matcher)Matchers.is((Object)result));
        ((QueryResult)Mockito.verify((Object)result)).addExecutionInfo(ArgumentMatchers.anyString());
    }

    @Test
    public void shouldDelegateName() {
        Mockito.when((Object)this.inner.name()).thenReturn((Object)STORE_NAME);
        MatcherAssert.assertThat((Object)this.store.name(), (Matcher)Matchers.is((Object)STORE_NAME));
    }

    @Test
    public void shouldDelegatePersistent() {
        Mockito.when((Object)this.inner.persistent()).thenReturn((Object)true);
        MatcherAssert.assertThat((Object)this.store.persistent(), (Matcher)Matchers.is((Object)true));
        Mockito.when((Object)this.inner.persistent()).thenReturn((Object)false);
        MatcherAssert.assertThat((Object)this.store.persistent(), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldDelegateIsOpen() {
        Mockito.when((Object)this.inner.isOpen()).thenReturn((Object)true);
        MatcherAssert.assertThat((Object)this.store.isOpen(), (Matcher)Matchers.is((Object)true));
        Mockito.when((Object)this.inner.isOpen()).thenReturn((Object)false);
        MatcherAssert.assertThat((Object)this.store.isOpen(), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void shouldDelegateGetPosition() {
        Position position = (Position)Mockito.mock(Position.class);
        Mockito.when((Object)this.inner.getPosition()).thenReturn((Object)position);
        MatcherAssert.assertThat((Object)this.store.getPosition(), (Matcher)Matchers.is((Object)position));
    }

    @Test
    public void shouldTrackOpenIteratorsMetric() {
        MultiVersionedKeyQuery query = MultiVersionedKeyQuery.withKey((Object)KEY);
        PositionBound bound = PositionBound.unbounded();
        QueryConfig config = new QueryConfig(false);
        Mockito.when((Object)this.inner.query((Query)ArgumentMatchers.any(), (PositionBound)ArgumentMatchers.any(), (QueryConfig)ArgumentMatchers.any())).thenReturn((Object)QueryResult.forResult((Object)new LogicalSegmentIterator(Collections.emptyListIterator(), RAW_KEY, Long.valueOf(0L), Long.valueOf(0L), ResultOrder.ANY)));
        KafkaMetric openIteratorsMetric = this.getMetric("num-open-iterators");
        MatcherAssert.assertThat((Object)openIteratorsMetric, (Matcher)Matchers.not((Matcher)CoreMatchers.nullValue()));
        MatcherAssert.assertThat((Object)((Long)openIteratorsMetric.metricValue()), (Matcher)CoreMatchers.equalTo((Object)0L));
        QueryResult result = this.store.query((Query)query, bound, config);
        try (VersionedRecordIterator iterator = (VersionedRecordIterator)result.getResult();){
            MatcherAssert.assertThat((Object)((Long)openIteratorsMetric.metricValue()), (Matcher)CoreMatchers.equalTo((Object)1L));
        }
        MatcherAssert.assertThat((Object)((Long)openIteratorsMetric.metricValue()), (Matcher)CoreMatchers.equalTo((Object)0L));
    }

    @Test
    public void shouldTimeIteratorDuration() {
        MultiVersionedKeyQuery query = MultiVersionedKeyQuery.withKey((Object)KEY);
        PositionBound bound = PositionBound.unbounded();
        QueryConfig config = new QueryConfig(false);
        Mockito.when((Object)this.inner.query((Query)ArgumentMatchers.any(), (PositionBound)ArgumentMatchers.any(), (QueryConfig)ArgumentMatchers.any())).thenReturn((Object)QueryResult.forResult((Object)new LogicalSegmentIterator(Collections.emptyListIterator(), RAW_KEY, Long.valueOf(0L), Long.valueOf(0L), ResultOrder.ANY)));
        KafkaMetric iteratorDurationAvgMetric = this.getMetric("iterator-duration-avg");
        KafkaMetric iteratorDurationMaxMetric = this.getMetric("iterator-duration-max");
        MatcherAssert.assertThat((Object)iteratorDurationAvgMetric, (Matcher)Matchers.not((Matcher)CoreMatchers.nullValue()));
        MatcherAssert.assertThat((Object)iteratorDurationMaxMetric, (Matcher)Matchers.not((Matcher)CoreMatchers.nullValue()));
        MatcherAssert.assertThat((Object)((Double)iteratorDurationAvgMetric.metricValue()), (Matcher)CoreMatchers.equalTo((Object)Double.NaN));
        MatcherAssert.assertThat((Object)((Double)iteratorDurationMaxMetric.metricValue()), (Matcher)CoreMatchers.equalTo((Object)Double.NaN));
        QueryResult first = this.store.query((Query)query, bound, config);
        try (VersionedRecordIterator iterator = (VersionedRecordIterator)first.getResult();){
            this.mockTime.sleep(2L);
        }
        MatcherAssert.assertThat((Object)((Double)iteratorDurationAvgMetric.metricValue()), (Matcher)CoreMatchers.equalTo((Object)(2.0 * (double)TimeUnit.MILLISECONDS.toNanos(1L))));
        MatcherAssert.assertThat((Object)((Double)iteratorDurationMaxMetric.metricValue()), (Matcher)CoreMatchers.equalTo((Object)(2.0 * (double)TimeUnit.MILLISECONDS.toNanos(1L))));
        QueryResult second = this.store.query((Query)query, bound, config);
        try (VersionedRecordIterator iterator = (VersionedRecordIterator)second.getResult();){
            this.mockTime.sleep(3L);
        }
        MatcherAssert.assertThat((Object)((Double)iteratorDurationAvgMetric.metricValue()), (Matcher)CoreMatchers.equalTo((Object)(2.5 * (double)TimeUnit.MILLISECONDS.toNanos(1L))));
        MatcherAssert.assertThat((Object)((Double)iteratorDurationMaxMetric.metricValue()), (Matcher)CoreMatchers.equalTo((Object)(3.0 * (double)TimeUnit.MILLISECONDS.toNanos(1L))));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldTrackOldestOpenIteratorTimestamp() {
        MultiVersionedKeyQuery query = MultiVersionedKeyQuery.withKey((Object)KEY);
        PositionBound bound = PositionBound.unbounded();
        QueryConfig config = new QueryConfig(false);
        Mockito.when((Object)this.inner.query((Query)ArgumentMatchers.any(), (PositionBound)ArgumentMatchers.any(), (QueryConfig)ArgumentMatchers.any())).thenReturn((Object)QueryResult.forResult((Object)new LogicalSegmentIterator(Collections.emptyListIterator(), RAW_KEY, Long.valueOf(0L), Long.valueOf(0L), ResultOrder.ANY)));
        KafkaMetric oldestIteratorTimestampMetric = this.getMetric("oldest-iterator-open-since-ms");
        MatcherAssert.assertThat((Object)oldestIteratorTimestampMetric, (Matcher)Matchers.not((Matcher)CoreMatchers.nullValue()));
        MatcherAssert.assertThat((Object)oldestIteratorTimestampMetric.metricValue(), (Matcher)CoreMatchers.nullValue());
        QueryResult first = this.store.query((Query)query, bound, config);
        try (VersionedRecordIterator secondIterator = null;){
            long secondTime;
            try (VersionedRecordIterator iterator = (VersionedRecordIterator)first.getResult();){
                long oldestTimestamp = this.mockTime.milliseconds();
                MatcherAssert.assertThat((Object)((Long)oldestIteratorTimestampMetric.metricValue()), (Matcher)CoreMatchers.equalTo((Object)oldestTimestamp));
                this.mockTime.sleep(100L);
                QueryResult second = this.store.query((Query)query, bound, config);
                secondIterator = (VersionedRecordIterator)second.getResult();
                secondTime = this.mockTime.milliseconds();
                MatcherAssert.assertThat((Object)((Long)oldestIteratorTimestampMetric.metricValue()), (Matcher)CoreMatchers.equalTo((Object)oldestTimestamp));
                this.mockTime.sleep(100L);
            }
            MatcherAssert.assertThat((Object)((Long)oldestIteratorTimestampMetric.metricValue()), (Matcher)CoreMatchers.equalTo((Object)secondTime));
        }
        MatcherAssert.assertThat((Object)((Integer)oldestIteratorTimestampMetric.metricValue()), (Matcher)CoreMatchers.nullValue());
    }

    private KafkaMetric getMetric(String name) {
        return this.metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "", this.tags));
    }

    private List<MetricName> storeMetrics() {
        return this.metrics.metrics().keySet().stream().filter(name -> name.group().equals(STORE_LEVEL_GROUP) && name.tags().equals(this.tags)).collect(Collectors.toList());
    }
}

