/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoField;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.query.MultiVersionedKeyQuery;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.ResultOrder;
import org.apache.kafka.streams.query.StateQueryRequest;
import org.apache.kafka.streams.query.StateQueryResult;
import org.apache.kafka.streams.query.VersionedKeyQuery;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.VersionedRecord;
import org.apache.kafka.streams.state.VersionedRecordIterator;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag(value="integration")
public class IQv2VersionedStoreIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final String INPUT_TOPIC_NAME = "input-topic";
    private static final String STORE_NAME = "versioned-store";
    private static final Duration HISTORY_RETENTION = Duration.ofDays(1L);
    private static final Duration SEGMENT_INTERVAL = Duration.ofHours(1L);
    private static final int RECORD_KEY = 2;
    private static final int NON_EXISTING_KEY = 3;
    private static final Instant BASE_TIMESTAMP = Instant.parse("2023-01-01T10:00:00.00Z");
    private static final Long BASE_TIMESTAMP_LONG = BASE_TIMESTAMP.getLong(ChronoField.INSTANT_SECONDS);
    private static final Integer[] RECORD_VALUES = new Integer[]{2, 20, 200, 2000};
    private static final Long[] RECORD_TIMESTAMPS = new Long[]{BASE_TIMESTAMP_LONG, BASE_TIMESTAMP_LONG + 10L, BASE_TIMESTAMP_LONG + 20L, BASE_TIMESTAMP_LONG + 30L};
    private static final int RECORD_NUMBER = RECORD_VALUES.length;
    private static final int LAST_INDEX = RECORD_NUMBER - 1;
    private static final Position INPUT_POSITION = Position.emptyPosition();
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "true")));
    private KafkaStreams kafkaStreams;

    @BeforeAll
    public static void before() throws Exception {
        CLUSTER.start();
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", CLUSTER.bootstrapServers());
        producerProps.put("key.serializer", IntegerSerializer.class);
        producerProps.put("value.serializer", IntegerSerializer.class);
        try (KafkaProducer producer = new KafkaProducer(producerProps);){
            producer.send(new ProducerRecord(INPUT_TOPIC_NAME, Integer.valueOf(0), RECORD_TIMESTAMPS[0], (Object)2, (Object)RECORD_VALUES[0])).get();
            producer.send(new ProducerRecord(INPUT_TOPIC_NAME, Integer.valueOf(0), RECORD_TIMESTAMPS[1], (Object)2, (Object)RECORD_VALUES[1])).get();
            producer.send(new ProducerRecord(INPUT_TOPIC_NAME, Integer.valueOf(0), RECORD_TIMESTAMPS[2], (Object)2, (Object)RECORD_VALUES[2])).get();
            producer.send(new ProducerRecord(INPUT_TOPIC_NAME, Integer.valueOf(0), RECORD_TIMESTAMPS[3], (Object)2, (Object)RECORD_VALUES[3])).get();
        }
        INPUT_POSITION.withComponent(INPUT_TOPIC_NAME, 0, 3L);
    }

    @BeforeEach
    public void beforeTest() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.table(INPUT_TOPIC_NAME, Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)STORE_NAME, (Duration)HISTORY_RETENTION, (Duration)SEGMENT_INTERVAL)));
        Properties configs = new Properties();
        configs.put("application.id", "app");
        configs.put("bootstrap.servers", CLUSTER.bootstrapServers());
        configs.put("default.key.serde", Serdes.IntegerSerde.class.getName());
        configs.put("default.value.serde", Serdes.IntegerSerde.class.getName());
        this.kafkaStreams = IntegrationTestUtils.getStartedStreams(configs, builder, true);
    }

    @AfterEach
    public void afterTest() {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
            this.kafkaStreams.cleanUp();
        }
    }

    @AfterAll
    public static void after() {
        CLUSTER.stop();
    }

    @Test
    public void verifyStore() {
        this.shouldHandleVersionedKeyQuery(Optional.empty(), RECORD_VALUES[3], RECORD_TIMESTAMPS[3], Optional.empty());
        this.shouldHandleVersionedKeyQuery(Optional.of(Instant.now()), RECORD_VALUES[3], RECORD_TIMESTAMPS[3], Optional.empty());
        this.shouldHandleVersionedKeyQuery(Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMPS[3])), RECORD_VALUES[3], RECORD_TIMESTAMPS[3], Optional.empty());
        this.shouldHandleVersionedKeyQuery(Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMPS[0])), RECORD_VALUES[0], RECORD_TIMESTAMPS[0], Optional.of(RECORD_TIMESTAMPS[1]));
        this.shouldVerifyGetNullForVersionedKeyQuery(2, Instant.ofEpochMilli(RECORD_TIMESTAMPS[0] - 50L));
        this.shouldVerifyGetNullForVersionedKeyQuery(3, Instant.now());
        this.shouldHandleMultiVersionedKeyQuery(Optional.empty(), Optional.empty(), ResultOrder.ANY, 0, LAST_INDEX);
        this.shouldHandleMultiVersionedKeyQuery(Optional.empty(), Optional.empty(), ResultOrder.ASCENDING, 0, LAST_INDEX);
        this.shouldHandleMultiVersionedKeyQuery(Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMPS[1] + 5L)), Optional.of(Instant.now()), ResultOrder.ANY, 1, LAST_INDEX);
        this.shouldVerifyGetNullForMultiVersionedKeyQuery(2, Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMPS[0] - 100L)), Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMPS[0] - 50L)), ResultOrder.ANY);
        this.shouldVerifyGetNullForMultiVersionedKeyQuery(2, Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMPS[0] - 100L)), Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMPS[0] - 50L)), ResultOrder.ASCENDING);
        this.shouldVerifyGetNullForMultiVersionedKeyQuery(3, Optional.empty(), Optional.empty(), ResultOrder.ANY);
        this.shouldVerifyGetNullForMultiVersionedKeyQuery(3, Optional.empty(), Optional.empty(), ResultOrder.ASCENDING);
        this.shouldHandleRaceCondition();
    }

    private void shouldHandleVersionedKeyQuery(Optional<Instant> queryTimestamp, Integer expectedValue, Long expectedTimestamp, Optional<Long> expectedValidToTime) {
        VersionedKeyQuery<Integer, Integer> query = IQv2VersionedStoreIntegrationTest.defineQuery(2, queryTimestamp);
        QueryResult<VersionedRecord<Integer>> queryResult = IQv2VersionedStoreIntegrationTest.sendRequestAndReceiveResults(query, this.kafkaStreams);
        if (queryResult == null) {
            throw new AssertionError((Object)"The query returned null.");
        }
        if (queryResult.isFailure()) {
            throw new AssertionError((Object)queryResult.toString());
        }
        if (queryResult.getResult() == null) {
            throw new AssertionError((Object)"The query returned null.");
        }
        MatcherAssert.assertThat((Object)queryResult.isSuccess(), (Matcher)Matchers.is((Object)true));
        VersionedRecord result1 = (VersionedRecord)queryResult.getResult();
        MatcherAssert.assertThat((Object)result1.value(), (Matcher)Matchers.is((Object)expectedValue));
        MatcherAssert.assertThat((Object)result1.timestamp(), (Matcher)Matchers.is((Object)expectedTimestamp));
        MatcherAssert.assertThat((Object)result1.validTo(), (Matcher)Matchers.is(expectedValidToTime));
        MatcherAssert.assertThat((Object)queryResult.getExecutionInfo(), (Matcher)Matchers.is((Matcher)Matchers.empty()));
    }

    private void shouldVerifyGetNullForVersionedKeyQuery(Integer key, Instant queryTimestamp) {
        VersionedKeyQuery<Integer, Integer> query = IQv2VersionedStoreIntegrationTest.defineQuery(key, Optional.of(queryTimestamp));
        MatcherAssert.assertThat(IQv2VersionedStoreIntegrationTest.sendRequestAndReceiveResults(query, this.kafkaStreams), (Matcher)CoreMatchers.nullValue());
    }

    private void shouldHandleMultiVersionedKeyQuery(Optional<Instant> fromTime, Optional<Instant> toTime, ResultOrder order, int expectedArrayLowerBound, int expectedArrayUpperBound) {
        MultiVersionedKeyQuery<Integer, Integer> query = IQv2VersionedStoreIntegrationTest.defineQuery(2, fromTime, toTime, order);
        Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = IQv2VersionedStoreIntegrationTest.sendRequestAndReceiveResults(query, this.kafkaStreams);
        for (Map.Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : partitionResults.entrySet()) {
            IQv2VersionedStoreIntegrationTest.verifyPartitionResult(partitionResultsEntry.getValue());
            VersionedRecordIterator iterator = (VersionedRecordIterator)partitionResultsEntry.getValue().getResult();
            Throwable throwable = null;
            try {
                int i = order.equals((Object)ResultOrder.ASCENDING) ? 0 : expectedArrayUpperBound;
                int iteratorSize = 0;
                while (iterator.hasNext()) {
                    VersionedRecord record = (VersionedRecord)iterator.next();
                    Long timestamp = record.timestamp();
                    Optional validTo = record.validTo();
                    Integer value = (Integer)record.value();
                    Optional expectedValidTo = i < expectedArrayUpperBound ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty();
                    MatcherAssert.assertThat((Object)value, (Matcher)Matchers.is((Object)RECORD_VALUES[i]));
                    MatcherAssert.assertThat((Object)timestamp, (Matcher)Matchers.is((Object)RECORD_TIMESTAMPS[i]));
                    MatcherAssert.assertThat((Object)validTo, (Matcher)Matchers.is(expectedValidTo));
                    i = order.equals((Object)ResultOrder.ASCENDING) ? i + 1 : i - 1;
                    ++iteratorSize;
                }
                MatcherAssert.assertThat((Object)iteratorSize, (Matcher)Matchers.equalTo((Object)(expectedArrayUpperBound - expectedArrayLowerBound + 1)));
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (iterator == null) continue;
                if (throwable != null) {
                    try {
                        iterator.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                iterator.close();
            }
        }
    }

    private void shouldVerifyGetNullForMultiVersionedKeyQuery(Integer key, Optional<Instant> fromTime, Optional<Instant> toTime, ResultOrder order) {
        MultiVersionedKeyQuery<Integer, Integer> query = IQv2VersionedStoreIntegrationTest.defineQuery(key, fromTime, toTime, order);
        Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = IQv2VersionedStoreIntegrationTest.sendRequestAndReceiveResults(query, this.kafkaStreams);
        for (Map.Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : partitionResults.entrySet()) {
            VersionedRecordIterator iterator = (VersionedRecordIterator)partitionResultsEntry.getValue().getResult();
            Throwable throwable = null;
            try {
                Assertions.assertFalse((boolean)iterator.hasNext());
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (iterator == null) continue;
                if (throwable != null) {
                    try {
                        iterator.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                iterator.close();
            }
        }
    }

    private void shouldHandleRaceCondition() {
        MultiVersionedKeyQuery<Integer, Integer> query = IQv2VersionedStoreIntegrationTest.defineQuery(2, Optional.empty(), Optional.empty(), ResultOrder.ANY);
        Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = IQv2VersionedStoreIntegrationTest.sendRequestAndReceiveResults(query, this.kafkaStreams);
        for (Map.Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : partitionResults.entrySet()) {
            VersionedRecordIterator iterator = (VersionedRecordIterator)partitionResultsEntry.getValue().getResult();
            Throwable throwable = null;
            try {
                Optional<Object> expectedValidTo;
                Integer value;
                Optional validTo;
                Long timestamp;
                VersionedRecord record;
                int i = LAST_INDEX;
                int iteratorSize = 0;
                while (iterator.hasNext()) {
                    record = (VersionedRecord)iterator.next();
                    timestamp = record.timestamp();
                    validTo = record.validTo();
                    value = (Integer)record.value();
                    expectedValidTo = i < LAST_INDEX ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty();
                    MatcherAssert.assertThat((Object)value, (Matcher)Matchers.is((Object)RECORD_VALUES[i]));
                    MatcherAssert.assertThat((Object)timestamp, (Matcher)Matchers.is((Object)RECORD_TIMESTAMPS[i]));
                    MatcherAssert.assertThat((Object)validTo, (Matcher)Matchers.is(expectedValidTo));
                    ++iteratorSize;
                    if (--i != 2) continue;
                    break;
                }
                IQv2VersionedStoreIntegrationTest.updateRecordValue();
                while (iterator.hasNext()) {
                    record = (VersionedRecord)iterator.next();
                    timestamp = record.timestamp();
                    validTo = record.validTo();
                    value = (Integer)record.value();
                    expectedValidTo = Optional.of(RECORD_TIMESTAMPS[i + 1]);
                    MatcherAssert.assertThat((Object)value, (Matcher)Matchers.is((Object)RECORD_VALUES[i]));
                    MatcherAssert.assertThat((Object)timestamp, (Matcher)Matchers.is((Object)RECORD_TIMESTAMPS[i]));
                    MatcherAssert.assertThat((Object)validTo, (Matcher)Matchers.is(expectedValidTo));
                    --i;
                    ++iteratorSize;
                }
                MatcherAssert.assertThat((Object)iteratorSize, (Matcher)Matchers.equalTo((Object)RECORD_NUMBER));
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (iterator == null) continue;
                if (throwable != null) {
                    try {
                        iterator.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                iterator.close();
            }
        }
    }

    private static VersionedKeyQuery<Integer, Integer> defineQuery(Integer key, Optional<Instant> queryTimestamp) {
        VersionedKeyQuery query = VersionedKeyQuery.withKey((Object)key);
        if (queryTimestamp.isPresent()) {
            query = query.asOf(queryTimestamp.get());
        }
        return query;
    }

    private static MultiVersionedKeyQuery<Integer, Integer> defineQuery(Integer key, Optional<Instant> fromTime, Optional<Instant> toTime, ResultOrder order) {
        MultiVersionedKeyQuery query = MultiVersionedKeyQuery.withKey((Object)key);
        if (fromTime.isPresent()) {
            query = query.fromTime(fromTime.get());
        }
        if (toTime.isPresent()) {
            query = query.toTime(toTime.get());
        }
        if (order.equals((Object)ResultOrder.ASCENDING)) {
            query = query.withAscendingTimestamps();
        }
        return query;
    }

    private static Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> sendRequestAndReceiveResults(MultiVersionedKeyQuery<Integer, Integer> query, KafkaStreams kafkaStreams) {
        StateQueryRequest request = StateQueryRequest.inStore((String)STORE_NAME).withQuery(query).withPositionBound(PositionBound.at((Position)INPUT_POSITION));
        StateQueryResult result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
        return result.getPartitionResults();
    }

    private static QueryResult<VersionedRecord<Integer>> sendRequestAndReceiveResults(VersionedKeyQuery<Integer, Integer> query, KafkaStreams kafkaStreams) {
        StateQueryRequest request = StateQueryRequest.inStore((String)STORE_NAME).withQuery(query).withPositionBound(PositionBound.at((Position)INPUT_POSITION));
        StateQueryResult result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
        return result.getOnlyPartitionResult();
    }

    private static void verifyPartitionResult(QueryResult<VersionedRecordIterator<Integer>> result) {
        MatcherAssert.assertThat((Object)result.getExecutionInfo(), (Matcher)Matchers.is((Matcher)Matchers.empty()));
        if (result.isFailure()) {
            throw new AssertionError((Object)result.toString());
        }
        MatcherAssert.assertThat((Object)result.isSuccess(), (Matcher)Matchers.is((Object)true));
        Assertions.assertThrows(IllegalArgumentException.class, () -> result.getFailureReason());
        Assertions.assertThrows(IllegalArgumentException.class, () -> result.getFailureMessage());
    }

    private static void updateRecordValue() {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", CLUSTER.bootstrapServers());
        producerProps.put("key.serializer", IntegerSerializer.class);
        producerProps.put("value.serializer", IntegerSerializer.class);
        try (KafkaProducer producer = new KafkaProducer(producerProps);){
            producer.send(new ProducerRecord(INPUT_TOPIC_NAME, Integer.valueOf(0), RECORD_TIMESTAMPS[0], (Object)2, (Object)999999));
        }
        INPUT_POSITION.withComponent(INPUT_TOPIC_NAME, 0, 4L);
        MatcherAssert.assertThat((Object)INPUT_POSITION, (Matcher)Matchers.equalTo((Object)Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 4L)));
        Properties consumerProps = new Properties();
        consumerProps.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        consumerProps.setProperty("key.deserializer", IntegerDeserializer.class.getName());
        consumerProps.setProperty("value.deserializer", IntegerDeserializer.class.getName());
        consumerProps.setProperty("group.id", "foo");
        consumerProps.setProperty("auto.offset.reset", "earliest");
        try {
            IntegrationTestUtils.waitUntilMinRecordsReceived(consumerProps, INPUT_TOPIC_NAME, RECORD_NUMBER + 1);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

