/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.RangeQuery;
import org.apache.kafka.streams.query.StateQueryRequest;
import org.apache.kafka.streams.query.StateQueryResult;
import org.apache.kafka.streams.query.WindowKeyQuery;
import org.apache.kafka.streams.query.WindowRangeQuery;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.StoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={IntegrationTest.class})
@RunWith(value=Parameterized.class)
public class IQv2StoreIntegrationTest {
    private static final Logger LOG = LoggerFactory.getLogger(IQv2StoreIntegrationTest.class);
    private static final long SEED = new Random().nextLong();
    private static final Random RANDOM = new Random(SEED);
    private static final int NUM_BROKERS = 1;
    public static final Duration WINDOW_SIZE = Duration.ofMinutes(5L);
    private static int port = 0;
    private static final String INPUT_TOPIC_NAME = "input-topic";
    private static final Position INPUT_POSITION = Position.emptyPosition();
    private static final String STORE_NAME = "kv-store";
    private static final long RECORD_TIME = System.currentTimeMillis();
    private static final long WINDOW_START = RECORD_TIME / WINDOW_SIZE.toMillis() * WINDOW_SIZE.toMillis();
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final Position POSITION_0 = Position.fromMap((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"input-topic", (Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)0, (Object)1L)}))}));
    private final StoresToTest storeToTest;
    private final String kind;
    private final boolean cache;
    private final boolean log;
    private KafkaStreams kafkaStreams;

    @Parameterized.Parameters(name="cache={0}, log={1}, supplier={2}, kind={3}")
    public static Collection<Object[]> data() {
        LOG.info("Generating test cases according to random seed: {}", (Object)SEED);
        ArrayList<Object[]> values = new ArrayList<Object[]>();
        for (boolean cacheEnabled : Arrays.asList(true, false)) {
            for (boolean logEnabled : Arrays.asList(true, false)) {
                for (StoresToTest toTest : StoresToTest.values()) {
                    for (String kind : Arrays.asList("DSL", "PAPI")) {
                        values.add(new Object[]{cacheEnabled, logEnabled, toTest.name(), kind});
                    }
                }
            }
        }
        Collections.shuffle(values, RANDOM);
        return values;
    }

    public IQv2StoreIntegrationTest(boolean cache, boolean log, String storeToTest, String kind) {
        this.cache = cache;
        this.log = log;
        this.storeToTest = StoresToTest.valueOf(storeToTest);
        this.kind = kind;
    }

    @BeforeClass
    public static void before() throws InterruptedException, IOException, ExecutionException, TimeoutException {
        CLUSTER.start();
        CLUSTER.deleteAllTopicsAndWait(60000L);
        int partitions = 2;
        CLUSTER.createTopic(INPUT_TOPIC_NAME, 2, 1);
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", CLUSTER.bootstrapServers());
        producerProps.put("key.serializer", IntegerSerializer.class);
        producerProps.put("value.serializer", IntegerSerializer.class);
        LinkedList<Future> futures = new LinkedList<Future>();
        try (KafkaProducer producer = new KafkaProducer(producerProps);){
            for (int i = 0; i < 4; ++i) {
                Future send = producer.send(new ProducerRecord(INPUT_TOPIC_NAME, Integer.valueOf(i % 2), Long.valueOf(RECORD_TIME), (Object)i, (Object)i, null));
                futures.add(send);
                Time.SYSTEM.sleep(1L);
            }
            producer.flush();
            for (Future future : futures) {
                RecordMetadata recordMetadata = (RecordMetadata)future.get(1L, TimeUnit.MINUTES);
                MatcherAssert.assertThat((Object)recordMetadata.hasOffset(), (Matcher)Matchers.is((Object)true));
                INPUT_POSITION.withComponent(recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
            }
        }
        MatcherAssert.assertThat((Object)INPUT_POSITION, (Matcher)Matchers.equalTo((Object)Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 1L).withComponent(INPUT_TOPIC_NAME, 1, 1L)));
    }

    @Before
    public void beforeTest() {
        StoreSupplier<?> supplier = this.storeToTest.supplier();
        Properties streamsConfig = IQv2StoreIntegrationTest.streamsConfiguration(this.cache, this.log, this.storeToTest.name(), this.kind);
        StreamsBuilder builder = new StreamsBuilder();
        if (Objects.equals(this.kind, "DSL") && supplier instanceof KeyValueBytesStoreSupplier) {
            this.setUpKeyValueDSLTopology((KeyValueBytesStoreSupplier)supplier, builder);
        } else if (Objects.equals(this.kind, "PAPI") && supplier instanceof KeyValueBytesStoreSupplier) {
            this.setUpKeyValuePAPITopology((KeyValueBytesStoreSupplier)supplier, builder);
        } else if (Objects.equals(this.kind, "DSL") && supplier instanceof WindowBytesStoreSupplier) {
            this.setUpWindowDSLTopology((WindowBytesStoreSupplier)supplier, builder);
        } else if (Objects.equals(this.kind, "PAPI") && supplier instanceof WindowBytesStoreSupplier) {
            this.setUpWindowPAPITopology((WindowBytesStoreSupplier)supplier, builder);
        } else if (Objects.equals(this.kind, "DSL") && supplier instanceof SessionBytesStoreSupplier) {
            this.setUpSessionDSLTopology((SessionBytesStoreSupplier)supplier, builder);
        } else if (Objects.equals(this.kind, "PAPI") && supplier instanceof SessionBytesStoreSupplier) {
            this.setUpSessionPAPITopology((SessionBytesStoreSupplier)supplier, builder);
        } else {
            throw new AssertionError((Object)"Store supplier is an unrecognized type.");
        }
        this.kafkaStreams = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
    }

    private void setUpSessionDSLTopology(SessionBytesStoreSupplier supplier, StreamsBuilder builder) {
        Materialized materialized = Materialized.as((SessionBytesStoreSupplier)supplier);
        if (this.cache) {
            materialized.withCachingEnabled();
        } else {
            materialized.withCachingDisabled();
        }
        if (this.log) {
            materialized.withLoggingEnabled(Collections.emptyMap());
        } else {
            materialized.withLoggingDisabled();
        }
        builder.stream(INPUT_TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer())).groupByKey().windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)WINDOW_SIZE)).aggregate(() -> 0, (key, value, aggregate) -> aggregate + value, (aggKey, aggOne, aggTwo) -> aggOne + aggTwo, materialized);
    }

    private void setUpWindowDSLTopology(WindowBytesStoreSupplier supplier, StreamsBuilder builder) {
        Materialized materialized = Materialized.as((WindowBytesStoreSupplier)supplier);
        if (this.cache) {
            materialized.withCachingEnabled();
        } else {
            materialized.withCachingDisabled();
        }
        if (this.log) {
            materialized.withLoggingEnabled(Collections.emptyMap());
        } else {
            materialized.withLoggingDisabled();
        }
        builder.stream(INPUT_TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer())).groupByKey().windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)WINDOW_SIZE)).aggregate(() -> 0, (key, value, aggregate) -> aggregate + value, materialized);
    }

    private void setUpKeyValueDSLTopology(KeyValueBytesStoreSupplier supplier, StreamsBuilder builder) {
        Materialized materialized = Materialized.as((KeyValueBytesStoreSupplier)supplier);
        if (this.cache) {
            materialized.withCachingEnabled();
        } else {
            materialized.withCachingDisabled();
        }
        if (this.log) {
            materialized.withLoggingEnabled(Collections.emptyMap());
        } else {
            materialized.withLoggingDisabled();
        }
        if (this.storeToTest.global()) {
            builder.globalTable(INPUT_TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()), materialized);
        } else {
            builder.table(INPUT_TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()), materialized);
        }
    }

    private void setUpKeyValuePAPITopology(KeyValueBytesStoreSupplier supplier, StreamsBuilder builder) {
        ProcessorSupplier processorSupplier;
        StoreBuilder keyValueStoreStoreBuilder;
        if (this.storeToTest.timestamped()) {
            keyValueStoreStoreBuilder = Stores.timestampedKeyValueStoreBuilder((KeyValueBytesStoreSupplier)supplier, (Serde)Serdes.Integer(), (Serde)Serdes.Integer());
            processorSupplier = () -> new ContextualProcessor<Integer, Integer, Void, Void>(){

                public void process(Record<Integer, Integer> record) {
                    TimestampedKeyValueStore stateStore = (TimestampedKeyValueStore)this.context().getStateStore(keyValueStoreStoreBuilder.name());
                    stateStore.put(record.key(), (Object)ValueAndTimestamp.make((Object)record.value(), (long)record.timestamp()));
                }
            };
        } else {
            keyValueStoreStoreBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)supplier, (Serde)Serdes.Integer(), (Serde)Serdes.Integer());
            processorSupplier = () -> new ContextualProcessor<Integer, Integer, Void, Void>(){

                public void process(Record<Integer, Integer> record) {
                    KeyValueStore stateStore = (KeyValueStore)this.context().getStateStore(keyValueStoreStoreBuilder.name());
                    stateStore.put(record.key(), record.value());
                }
            };
        }
        if (this.cache) {
            keyValueStoreStoreBuilder.withCachingEnabled();
        } else {
            keyValueStoreStoreBuilder.withCachingDisabled();
        }
        if (this.log) {
            keyValueStoreStoreBuilder.withLoggingEnabled(Collections.emptyMap());
        } else {
            keyValueStoreStoreBuilder.withLoggingDisabled();
        }
        if (this.storeToTest.global()) {
            builder.addGlobalStore(keyValueStoreStoreBuilder, INPUT_TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()), processorSupplier);
        } else {
            builder.addStateStore(keyValueStoreStoreBuilder);
            builder.stream(INPUT_TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer())).process(processorSupplier, new String[]{keyValueStoreStoreBuilder.name()});
        }
    }

    private void setUpWindowPAPITopology(WindowBytesStoreSupplier supplier, StreamsBuilder builder) {
        ProcessorSupplier processorSupplier;
        StoreBuilder windowStoreStoreBuilder;
        if (this.storeToTest.timestamped()) {
            windowStoreStoreBuilder = Stores.timestampedWindowStoreBuilder((WindowBytesStoreSupplier)supplier, (Serde)Serdes.Integer(), (Serde)Serdes.Integer());
            processorSupplier = () -> new ContextualProcessor<Integer, Integer, Void, Void>(){

                public void process(Record<Integer, Integer> record) {
                    TimestampedWindowStore stateStore = (TimestampedWindowStore)this.context().getStateStore(windowStoreStoreBuilder.name());
                    stateStore.put(record.key(), (Object)ValueAndTimestamp.make((Object)record.value(), (long)record.timestamp()), WINDOW_START);
                }
            };
        } else {
            windowStoreStoreBuilder = Stores.windowStoreBuilder((WindowBytesStoreSupplier)supplier, (Serde)Serdes.Integer(), (Serde)Serdes.Integer());
            processorSupplier = () -> new ContextualProcessor<Integer, Integer, Void, Void>(){

                public void process(Record<Integer, Integer> record) {
                    WindowStore stateStore = (WindowStore)this.context().getStateStore(windowStoreStoreBuilder.name());
                    stateStore.put(record.key(), record.value(), WINDOW_START);
                }
            };
        }
        if (this.cache) {
            windowStoreStoreBuilder.withCachingEnabled();
        } else {
            windowStoreStoreBuilder.withCachingDisabled();
        }
        if (this.log) {
            windowStoreStoreBuilder.withLoggingEnabled(Collections.emptyMap());
        } else {
            windowStoreStoreBuilder.withLoggingDisabled();
        }
        if (this.storeToTest.global()) {
            builder.addGlobalStore(windowStoreStoreBuilder, INPUT_TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()), processorSupplier);
        } else {
            builder.addStateStore(windowStoreStoreBuilder);
            builder.stream(INPUT_TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer())).process(processorSupplier, new String[]{windowStoreStoreBuilder.name()});
        }
    }

    private void setUpSessionPAPITopology(SessionBytesStoreSupplier supplier, StreamsBuilder builder) {
        final StoreBuilder sessionStoreStoreBuilder = Stores.sessionStoreBuilder((SessionBytesStoreSupplier)supplier, (Serde)Serdes.Integer(), (Serde)Serdes.Integer());
        ProcessorSupplier processorSupplier = () -> new ContextualProcessor<Integer, Integer, Void, Void>(){

            public void process(Record<Integer, Integer> record) {
                SessionStore stateStore = (SessionStore)this.context().getStateStore(sessionStoreStoreBuilder.name());
                stateStore.put(new Windowed(record.key(), (Window)new SessionWindow(WINDOW_START, WINDOW_START)), record.value());
            }
        };
        if (this.cache) {
            sessionStoreStoreBuilder.withCachingEnabled();
        } else {
            sessionStoreStoreBuilder.withCachingDisabled();
        }
        if (this.log) {
            sessionStoreStoreBuilder.withLoggingEnabled(Collections.emptyMap());
        } else {
            sessionStoreStoreBuilder.withLoggingDisabled();
        }
        if (this.storeToTest.global()) {
            builder.addGlobalStore(sessionStoreStoreBuilder, INPUT_TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()), processorSupplier);
        } else {
            builder.addStateStore(sessionStoreStoreBuilder);
            builder.stream(INPUT_TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer())).process(processorSupplier, new String[]{sessionStoreStoreBuilder.name()});
        }
    }

    @After
    public void afterTest() {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
            this.kafkaStreams.cleanUp();
        }
    }

    @AfterClass
    public static void after() {
        CLUSTER.stop();
    }

    @Test
    public void verifyStore() {
        try {
            if (this.storeToTest.global()) {
                this.globalShouldRejectAllQueries();
            } else {
                Function<Object, Integer> valueExtractor;
                this.shouldRejectUnknownQuery();
                this.shouldCollectExecutionInfo();
                this.shouldCollectExecutionInfoUnderFailure();
                if (this.storeToTest.keyValue()) {
                    if (this.storeToTest.timestamped()) {
                        valueExtractor = ValueAndTimestamp::value;
                        this.shouldHandleKeyQuery(2, valueExtractor, 2);
                        this.shouldHandleRangeQueries(valueExtractor);
                    } else {
                        valueExtractor = Function.identity();
                        this.shouldHandleKeyQuery(2, valueExtractor, 2);
                        this.shouldHandleRangeQueries(valueExtractor);
                    }
                }
                if (this.storeToTest.isWindowed()) {
                    if (this.storeToTest.timestamped()) {
                        valueExtractor = ValueAndTimestamp::value;
                        this.shouldHandleWindowKeyQueries(valueExtractor);
                        this.shouldHandleWindowRangeQueries(valueExtractor);
                    } else {
                        valueExtractor = Function.identity();
                        this.shouldHandleWindowKeyQueries(valueExtractor);
                        this.shouldHandleWindowRangeQueries(valueExtractor);
                    }
                }
                if (this.storeToTest.isSession()) {
                    this.shouldHandleSessionKeyQueries();
                }
            }
        }
        catch (AssertionError e) {
            LOG.error("Failed assertion", (Throwable)((Object)e));
            throw e;
        }
    }

    private <T> void shouldHandleRangeQueries(Function<T, Integer> extractor) {
        this.shouldHandleRangeQuery(Optional.of(1), Optional.of(3), extractor, Utils.mkSet((Object[])new Integer[]{1, 2, 3}));
        this.shouldHandleRangeQuery(Optional.of(1), Optional.empty(), extractor, Utils.mkSet((Object[])new Integer[]{1, 2, 3}));
        this.shouldHandleRangeQuery(Optional.empty(), Optional.of(1), extractor, Utils.mkSet((Object[])new Integer[]{0, 1}));
        this.shouldHandleRangeQuery(Optional.empty(), Optional.empty(), extractor, Utils.mkSet((Object[])new Integer[]{0, 1, 2, 3}));
    }

    private <T> void shouldHandleWindowKeyQueries(Function<T, Integer> extractor) {
        this.shouldHandleWindowKeyQuery(2, Instant.ofEpochMilli(WINDOW_START), Instant.ofEpochMilli(WINDOW_START), extractor, Utils.mkSet((Object[])new Integer[]{2}));
        this.shouldHandleWindowKeyQuery(2, Instant.ofEpochMilli(WINDOW_START - 1L), Instant.ofEpochMilli(WINDOW_START - 1L), extractor, Utils.mkSet((Object[])new Integer[0]));
        this.shouldHandleWindowKeyQuery(999, Instant.ofEpochMilli(WINDOW_START), Instant.ofEpochMilli(WINDOW_START), extractor, Utils.mkSet((Object[])new Integer[0]));
        this.shouldHandleWindowKeyQuery(999, Instant.ofEpochMilli(WINDOW_START - 1L), Instant.ofEpochMilli(WINDOW_START - 1L), extractor, Utils.mkSet((Object[])new Integer[0]));
    }

    private <T> void shouldHandleWindowRangeQueries(Function<T, Integer> extractor) {
        long windowSize = WINDOW_SIZE.toMillis();
        long windowStart = RECORD_TIME / windowSize * windowSize;
        this.shouldHandleWindowRangeQuery(Instant.ofEpochMilli(windowStart), Instant.ofEpochMilli(windowStart), extractor, Utils.mkSet((Object[])new Integer[]{0, 1, 2, 3}));
        this.shouldHandleWindowRangeQuery(Instant.ofEpochMilli(windowStart - 1L), Instant.ofEpochMilli(windowStart - 1L), extractor, Utils.mkSet((Object[])new Integer[0]));
        WindowRangeQuery query = WindowRangeQuery.withKey((Object)2);
        StateQueryRequest request = StateQueryRequest.inStore((String)STORE_NAME).withQuery((Query)query).withPartitions(Utils.mkSet((Object[])new Integer[]{0, 1})).withPositionBound(PositionBound.at((Position)INPUT_POSITION));
        StateQueryResult result = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, request);
        if (result.getGlobalResult() != null) {
            Assert.fail((String)"global tables aren't implemented");
        } else {
            Map queryResult = result.getPartitionResults();
            Iterator iterator = queryResult.keySet().iterator();
            while (iterator.hasNext()) {
                int partition = (Integer)iterator.next();
                QueryResult partitionResult = (QueryResult)queryResult.get(partition);
                boolean failure = partitionResult.isFailure();
                if (!failure) {
                    throw new AssertionError((Object)queryResult.toString());
                }
                MatcherAssert.assertThat((Object)partitionResult.getFailureReason(), (Matcher)Matchers.is((Object)FailureReason.UNKNOWN_QUERY_TYPE));
                MatcherAssert.assertThat((Object)partitionResult.getFailureMessage(), (Matcher)Matchers.matchesPattern((String)"This store \\(class org.apache.kafka.streams.state.internals.Metered.*WindowStore\\) doesn't know how to execute the given query \\(WindowRangeQuery\\{key=Optional\\[2], timeFrom=Optional.empty, timeTo=Optional.empty}\\) because WindowStores only supports WindowRangeQuery.withWindowStartRange\\. Contact the store maintainer if you need support for a new query type\\."));
            }
        }
    }

    private <T> void shouldHandleSessionKeyQueries() {
        this.shouldHandleSessionRangeQuery(2, Utils.mkSet((Object[])new Integer[]{2}));
        this.shouldHandleSessionRangeQuery(999, Utils.mkSet((Object[])new Integer[0]));
        WindowRangeQuery query = WindowRangeQuery.withWindowStartRange((Instant)Instant.ofEpochMilli(0L), (Instant)Instant.ofEpochMilli(0L));
        StateQueryRequest request = StateQueryRequest.inStore((String)STORE_NAME).withQuery((Query)query).withPartitions(Utils.mkSet((Object[])new Integer[]{0, 1})).withPositionBound(PositionBound.at((Position)INPUT_POSITION));
        StateQueryResult result = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, request);
        if (result.getGlobalResult() != null) {
            Assert.fail((String)"global tables aren't implemented");
        } else {
            Map queryResult = result.getPartitionResults();
            Iterator iterator = queryResult.keySet().iterator();
            while (iterator.hasNext()) {
                int partition = (Integer)iterator.next();
                QueryResult partitionResult = (QueryResult)queryResult.get(partition);
                boolean failure = partitionResult.isFailure();
                if (!failure) {
                    throw new AssertionError((Object)queryResult.toString());
                }
                MatcherAssert.assertThat((Object)partitionResult.getFailureReason(), (Matcher)Matchers.is((Object)FailureReason.UNKNOWN_QUERY_TYPE));
                MatcherAssert.assertThat((Object)partitionResult.getFailureMessage(), (Matcher)Matchers.is((Object)"This store (class org.apache.kafka.streams.state.internals.MeteredSessionStore) doesn't know how to execute the given query (WindowRangeQuery{key=Optional.empty, timeFrom=Optional[1970-01-01T00:00:00Z], timeTo=Optional[1970-01-01T00:00:00Z]}) because SessionStores only support WindowRangeQuery.withKey. Contact the store maintainer if you need support for a new query type."));
            }
        }
    }

    private void globalShouldRejectAllQueries() {
        KeyQuery query = KeyQuery.withKey((Object)1);
        StateQueryRequest request = StateQueryRequest.inStore((String)STORE_NAME).withQuery((Query)query);
        StateQueryResult result = this.kafkaStreams.query(request);
        MatcherAssert.assertThat((Object)result.getGlobalResult().isFailure(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)result.getGlobalResult().getFailureReason(), (Matcher)Matchers.is((Object)FailureReason.UNKNOWN_QUERY_TYPE));
        MatcherAssert.assertThat((Object)result.getGlobalResult().getFailureMessage(), (Matcher)Matchers.is((Object)"Global stores do not yet support the KafkaStreams#query API. Use KafkaStreams#store instead."));
    }

    public void shouldRejectUnknownQuery() {
        UnknownQuery query = new UnknownQuery();
        StateQueryRequest request = StateQueryRequest.inStore((String)STORE_NAME).withQuery((Query)query);
        Set partitions = Utils.mkSet((Object[])new Integer[]{0, 1});
        StateQueryResult result = IntegrationTestUtils.iqv2WaitForPartitions(this.kafkaStreams, request, partitions);
        this.makeAssertions(partitions, result, queryResult -> {
            MatcherAssert.assertThat((Object)queryResult.isFailure(), (Matcher)Matchers.is((Object)true));
            MatcherAssert.assertThat((Object)queryResult.isSuccess(), (Matcher)Matchers.is((Object)false));
            MatcherAssert.assertThat((Object)queryResult.getFailureReason(), (Matcher)Matchers.is((Object)FailureReason.UNKNOWN_QUERY_TYPE));
            MatcherAssert.assertThat((Object)queryResult.getFailureMessage(), (Matcher)Matchers.matchesPattern((String)"This store (.*) doesn't know how to execute the given query (.*). Contact the store maintainer if you need support for a new query type."));
            Assertions.assertThrows(IllegalArgumentException.class, () -> ((QueryResult)queryResult).getResult());
            MatcherAssert.assertThat((Object)queryResult.getExecutionInfo(), (Matcher)Matchers.is((Matcher)Matchers.empty()));
        });
    }

    public <V> void shouldHandleKeyQuery(Integer key, Function<V, Integer> valueExtactor, Integer expectedValue) {
        KeyQuery query = KeyQuery.withKey((Object)key);
        StateQueryRequest request = StateQueryRequest.inStore((String)STORE_NAME).withQuery((Query)query).withPartitions(Utils.mkSet((Object[])new Integer[]{0, 1})).withPositionBound(PositionBound.at((Position)INPUT_POSITION));
        StateQueryResult result = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, request);
        QueryResult queryResult = result.getOnlyPartitionResult();
        boolean failure = queryResult.isFailure();
        if (failure) {
            throw new AssertionError((Object)queryResult.toString());
        }
        MatcherAssert.assertThat((Object)queryResult.isSuccess(), (Matcher)Matchers.is((Object)true));
        Assertions.assertThrows(IllegalArgumentException.class, () -> ((QueryResult)queryResult).getFailureReason());
        Assertions.assertThrows(IllegalArgumentException.class, () -> ((QueryResult)queryResult).getFailureMessage());
        Object result1 = queryResult.getResult();
        Integer integer = valueExtactor.apply(result1);
        MatcherAssert.assertThat((Object)integer, (Matcher)Matchers.is((Object)expectedValue));
        MatcherAssert.assertThat((Object)queryResult.getExecutionInfo(), (Matcher)Matchers.is((Matcher)Matchers.empty()));
        MatcherAssert.assertThat((Object)queryResult.getPosition(), (Matcher)Matchers.is((Object)POSITION_0));
    }

    public <V> void shouldHandleRangeQuery(Optional<Integer> lower, Optional<Integer> upper, Function<V, Integer> valueExtactor, Set<Integer> expectedValue) {
        RangeQuery query = lower.isPresent() && upper.isPresent() ? RangeQuery.withRange((Object)lower.get(), (Object)upper.get()) : (lower.isPresent() ? RangeQuery.withLowerBound((Object)lower.get()) : (upper.isPresent() ? RangeQuery.withUpperBound((Object)upper.get()) : RangeQuery.withNoBounds()));
        StateQueryRequest request = StateQueryRequest.inStore((String)STORE_NAME).withQuery((Query)query).withPartitions(Utils.mkSet((Object[])new Integer[]{0, 1})).withPositionBound(PositionBound.at((Position)INPUT_POSITION));
        StateQueryResult result = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, request);
        if (result.getGlobalResult() != null) {
            Assert.fail((String)"global tables aren't implemented");
        } else {
            HashSet<Integer> actualValue = new HashSet<Integer>();
            Map queryResult = result.getPartitionResults();
            Iterator iterator = queryResult.keySet().iterator();
            while (iterator.hasNext()) {
                int partition = (Integer)iterator.next();
                boolean failure = ((QueryResult)queryResult.get(partition)).isFailure();
                if (failure) {
                    throw new AssertionError((Object)queryResult.toString());
                }
                MatcherAssert.assertThat((Object)((QueryResult)queryResult.get(partition)).isSuccess(), (Matcher)Matchers.is((Object)true));
                Assertions.assertThrows(IllegalArgumentException.class, () -> ((QueryResult)((QueryResult)queryResult.get(partition))).getFailureReason());
                Assertions.assertThrows(IllegalArgumentException.class, () -> ((QueryResult)((QueryResult)queryResult.get(partition))).getFailureMessage());
                try (KeyValueIterator iterator2 = (KeyValueIterator)((QueryResult)queryResult.get(partition)).getResult();){
                    while (iterator2.hasNext()) {
                        actualValue.add(valueExtactor.apply(((KeyValue)iterator2.next()).value));
                    }
                }
                MatcherAssert.assertThat((Object)((QueryResult)queryResult.get(partition)).getExecutionInfo(), (Matcher)Matchers.is((Matcher)Matchers.empty()));
            }
            MatcherAssert.assertThat((String)("Result:" + result), actualValue, (Matcher)Matchers.is(expectedValue));
            MatcherAssert.assertThat((String)("Result:" + result), (Object)result.getPosition(), (Matcher)Matchers.is((Object)INPUT_POSITION));
        }
    }

    public <V> void shouldHandleWindowKeyQuery(Integer key, Instant timeFrom, Instant timeTo, Function<V, Integer> valueExtactor, Set<Integer> expectedValue) {
        WindowKeyQuery query = WindowKeyQuery.withKeyAndWindowStartRange((Object)key, (Instant)timeFrom, (Instant)timeTo);
        StateQueryRequest request = StateQueryRequest.inStore((String)STORE_NAME).withQuery((Query)query).withPartitions(Utils.mkSet((Object[])new Integer[]{0, 1})).withPositionBound(PositionBound.at((Position)INPUT_POSITION));
        StateQueryResult result = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, request);
        if (result.getGlobalResult() != null) {
            Assert.fail((String)"global tables aren't implemented");
        } else {
            HashSet<Integer> actualValue = new HashSet<Integer>();
            Map queryResult = result.getPartitionResults();
            Iterator iterator = queryResult.keySet().iterator();
            while (iterator.hasNext()) {
                int partition = (Integer)iterator.next();
                boolean failure = ((QueryResult)queryResult.get(partition)).isFailure();
                if (failure) {
                    throw new AssertionError((Object)queryResult.toString());
                }
                MatcherAssert.assertThat((Object)((QueryResult)queryResult.get(partition)).isSuccess(), (Matcher)Matchers.is((Object)true));
                Assertions.assertThrows(IllegalArgumentException.class, () -> ((QueryResult)((QueryResult)queryResult.get(partition))).getFailureReason());
                Assertions.assertThrows(IllegalArgumentException.class, () -> ((QueryResult)((QueryResult)queryResult.get(partition))).getFailureMessage());
                try (WindowStoreIterator iterator2 = (WindowStoreIterator)((QueryResult)queryResult.get(partition)).getResult();){
                    while (iterator2.hasNext()) {
                        actualValue.add(valueExtactor.apply(((KeyValue)iterator2.next()).value));
                    }
                }
                MatcherAssert.assertThat((Object)((QueryResult)queryResult.get(partition)).getExecutionInfo(), (Matcher)Matchers.is((Matcher)Matchers.empty()));
            }
            MatcherAssert.assertThat((String)("Result:" + result), actualValue, (Matcher)Matchers.is(expectedValue));
            MatcherAssert.assertThat((String)("Result:" + result), (Object)result.getPosition(), (Matcher)Matchers.is((Object)INPUT_POSITION));
        }
    }

    public <V> void shouldHandleWindowRangeQuery(Instant timeFrom, Instant timeTo, Function<V, Integer> valueExtactor, Set<Integer> expectedValue) {
        WindowRangeQuery query = WindowRangeQuery.withWindowStartRange((Instant)timeFrom, (Instant)timeTo);
        StateQueryRequest request = StateQueryRequest.inStore((String)STORE_NAME).withQuery((Query)query).withPartitions(Utils.mkSet((Object[])new Integer[]{0, 1})).withPositionBound(PositionBound.at((Position)INPUT_POSITION));
        StateQueryResult result = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, request);
        if (result.getGlobalResult() != null) {
            Assert.fail((String)"global tables aren't implemented");
        } else {
            HashSet<Integer> actualValue = new HashSet<Integer>();
            Map queryResult = result.getPartitionResults();
            Iterator iterator = queryResult.keySet().iterator();
            while (iterator.hasNext()) {
                int partition = (Integer)iterator.next();
                boolean failure = ((QueryResult)queryResult.get(partition)).isFailure();
                if (failure) {
                    throw new AssertionError((Object)queryResult.toString());
                }
                MatcherAssert.assertThat((Object)((QueryResult)queryResult.get(partition)).isSuccess(), (Matcher)Matchers.is((Object)true));
                Assertions.assertThrows(IllegalArgumentException.class, () -> ((QueryResult)((QueryResult)queryResult.get(partition))).getFailureReason());
                Assertions.assertThrows(IllegalArgumentException.class, () -> ((QueryResult)((QueryResult)queryResult.get(partition))).getFailureMessage());
                try (KeyValueIterator iterator2 = (KeyValueIterator)((QueryResult)queryResult.get(partition)).getResult();){
                    while (iterator2.hasNext()) {
                        actualValue.add(valueExtactor.apply(((KeyValue)iterator2.next()).value));
                    }
                }
                MatcherAssert.assertThat((Object)((QueryResult)queryResult.get(partition)).getExecutionInfo(), (Matcher)Matchers.is((Matcher)Matchers.empty()));
            }
            MatcherAssert.assertThat((String)("Result:" + result), actualValue, (Matcher)Matchers.is(expectedValue));
            MatcherAssert.assertThat((String)("Result:" + result), (Object)result.getPosition(), (Matcher)Matchers.is((Object)INPUT_POSITION));
        }
    }

    public <V> void shouldHandleSessionRangeQuery(Integer key, Set<Integer> expectedValue) {
        WindowRangeQuery query = WindowRangeQuery.withKey((Object)key);
        StateQueryRequest request = StateQueryRequest.inStore((String)STORE_NAME).withQuery((Query)query).withPartitions(Utils.mkSet((Object[])new Integer[]{0, 1})).withPositionBound(PositionBound.at((Position)INPUT_POSITION));
        StateQueryResult result = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, request);
        if (result.getGlobalResult() != null) {
            Assert.fail((String)"global tables aren't implemented");
        } else {
            HashSet<Integer> actualValue = new HashSet<Integer>();
            Map queryResult = result.getPartitionResults();
            Iterator iterator = queryResult.keySet().iterator();
            while (iterator.hasNext()) {
                int partition = (Integer)iterator.next();
                boolean failure = ((QueryResult)queryResult.get(partition)).isFailure();
                if (failure) {
                    throw new AssertionError((Object)queryResult.toString());
                }
                MatcherAssert.assertThat((Object)((QueryResult)queryResult.get(partition)).isSuccess(), (Matcher)Matchers.is((Object)true));
                Assertions.assertThrows(IllegalArgumentException.class, () -> ((QueryResult)((QueryResult)queryResult.get(partition))).getFailureReason());
                Assertions.assertThrows(IllegalArgumentException.class, () -> ((QueryResult)((QueryResult)queryResult.get(partition))).getFailureMessage());
                try (KeyValueIterator iterator2 = (KeyValueIterator)((QueryResult)queryResult.get(partition)).getResult();){
                    while (iterator2.hasNext()) {
                        actualValue.add((Integer)((KeyValue)iterator2.next()).value);
                    }
                }
                MatcherAssert.assertThat((Object)((QueryResult)queryResult.get(partition)).getExecutionInfo(), (Matcher)Matchers.is((Matcher)Matchers.empty()));
            }
            MatcherAssert.assertThat((String)("Result:" + result), actualValue, (Matcher)Matchers.is(expectedValue));
            MatcherAssert.assertThat((String)("Result:" + result), (Object)result.getPosition(), (Matcher)Matchers.is((Object)INPUT_POSITION));
        }
    }

    public void shouldCollectExecutionInfo() {
        KeyQuery query = KeyQuery.withKey((Object)1);
        Set partitions = Utils.mkSet((Object[])new Integer[]{0, 1});
        StateQueryRequest request = StateQueryRequest.inStore((String)STORE_NAME).withQuery((Query)query).enableExecutionInfo().withPartitions(partitions).withPositionBound(PositionBound.at((Position)INPUT_POSITION));
        StateQueryResult result = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, request);
        this.makeAssertions(partitions, result, queryResult -> MatcherAssert.assertThat((Object)queryResult.getExecutionInfo(), (Matcher)Matchers.not((Matcher)Matchers.empty())));
    }

    public void shouldCollectExecutionInfoUnderFailure() {
        UnknownQuery query = new UnknownQuery();
        Set partitions = Utils.mkSet((Object[])new Integer[]{0, 1});
        StateQueryRequest request = StateQueryRequest.inStore((String)STORE_NAME).withQuery((Query)query).enableExecutionInfo().withPartitions(partitions).withPositionBound(PositionBound.at((Position)INPUT_POSITION));
        StateQueryResult result = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, request);
        this.makeAssertions(partitions, result, queryResult -> MatcherAssert.assertThat((Object)queryResult.getExecutionInfo(), (Matcher)Matchers.not((Matcher)Matchers.empty())));
    }

    private <R> void makeAssertions(Set<Integer> partitions, StateQueryResult<R> result, Consumer<QueryResult<R>> assertion) {
        if (result.getGlobalResult() != null) {
            assertion.accept(result.getGlobalResult());
        } else {
            MatcherAssert.assertThat(result.getPartitionResults().keySet(), (Matcher)Matchers.is(partitions));
            for (Integer partition : partitions) {
                assertion.accept((QueryResult<R>)result.getPartitionResults().get(partition));
            }
        }
    }

    private static Properties streamsConfiguration(boolean cache, boolean log, String supplier, String kind) {
        String safeTestName = IQv2StoreIntegrationTest.class.getName() + "-" + cache + "-" + log + "-" + supplier + "-" + kind + "-" + RANDOM.nextInt();
        Properties config = new Properties();
        config.put("topology.optimization", "all");
        config.put("application.id", "app-" + safeTestName);
        config.put("application.server", "localhost:" + ++port);
        config.put("bootstrap.servers", CLUSTER.bootstrapServers());
        config.put("state.dir", TestUtils.tempDirectory().getPath());
        config.put("default.key.serde", Serdes.Integer().getClass());
        config.put("default.value.serde", Serdes.Integer().getClass());
        config.put("num.standby.replicas", (Object)1);
        config.put("max.poll.records", (Object)100);
        config.put("heartbeat.interval.ms", (Object)200);
        config.put("session.timeout.ms", (Object)1000);
        config.put("commit.interval.ms", (Object)100L);
        config.put("num.stream.threads", (Object)1);
        return config;
    }

    public static enum StoresToTest {
        GLOBAL_IN_MEMORY_KV{

            @Override
            public StoreSupplier<?> supplier() {
                return Stores.inMemoryKeyValueStore((String)IQv2StoreIntegrationTest.STORE_NAME);
            }

            @Override
            public boolean global() {
                return true;
            }

            @Override
            public boolean keyValue() {
                return true;
            }
        }
        ,
        GLOBAL_IN_MEMORY_LRU{

            @Override
            public StoreSupplier<?> supplier() {
                return Stores.lruMap((String)IQv2StoreIntegrationTest.STORE_NAME, (int)100);
            }

            @Override
            public boolean global() {
                return true;
            }

            @Override
            public boolean keyValue() {
                return true;
            }
        }
        ,
        GLOBAL_ROCKS_KV{

            @Override
            public StoreSupplier<?> supplier() {
                return Stores.persistentKeyValueStore((String)IQv2StoreIntegrationTest.STORE_NAME);
            }

            @Override
            public boolean timestamped() {
                return false;
            }

            @Override
            public boolean global() {
                return true;
            }

            @Override
            public boolean keyValue() {
                return true;
            }
        }
        ,
        GLOBAL_TIME_ROCKS_KV{

            @Override
            public StoreSupplier<?> supplier() {
                return Stores.persistentTimestampedKeyValueStore((String)IQv2StoreIntegrationTest.STORE_NAME);
            }

            @Override
            public boolean global() {
                return true;
            }

            @Override
            public boolean keyValue() {
                return true;
            }
        }
        ,
        IN_MEMORY_KV{

            @Override
            public StoreSupplier<?> supplier() {
                return Stores.inMemoryKeyValueStore((String)IQv2StoreIntegrationTest.STORE_NAME);
            }

            @Override
            public boolean keyValue() {
                return true;
            }
        }
        ,
        IN_MEMORY_LRU{

            @Override
            public StoreSupplier<?> supplier() {
                return Stores.lruMap((String)IQv2StoreIntegrationTest.STORE_NAME, (int)100);
            }

            @Override
            public boolean keyValue() {
                return true;
            }
        }
        ,
        ROCKS_KV{

            @Override
            public StoreSupplier<?> supplier() {
                return Stores.persistentKeyValueStore((String)IQv2StoreIntegrationTest.STORE_NAME);
            }

            @Override
            public boolean timestamped() {
                return false;
            }

            @Override
            public boolean keyValue() {
                return true;
            }
        }
        ,
        TIME_ROCKS_KV{

            @Override
            public StoreSupplier<?> supplier() {
                return Stores.persistentTimestampedKeyValueStore((String)IQv2StoreIntegrationTest.STORE_NAME);
            }

            @Override
            public boolean keyValue() {
                return true;
            }
        }
        ,
        IN_MEMORY_WINDOW{

            @Override
            public StoreSupplier<?> supplier() {
                return Stores.inMemoryWindowStore((String)IQv2StoreIntegrationTest.STORE_NAME, (Duration)Duration.ofDays(1L), (Duration)WINDOW_SIZE, (boolean)false);
            }

            @Override
            public boolean isWindowed() {
                return true;
            }
        }
        ,
        ROCKS_WINDOW{

            @Override
            public StoreSupplier<?> supplier() {
                return Stores.persistentWindowStore((String)IQv2StoreIntegrationTest.STORE_NAME, (Duration)Duration.ofDays(1L), (Duration)WINDOW_SIZE, (boolean)false);
            }

            @Override
            public boolean isWindowed() {
                return true;
            }

            @Override
            public boolean timestamped() {
                return false;
            }
        }
        ,
        TIME_ROCKS_WINDOW{

            @Override
            public StoreSupplier<?> supplier() {
                return Stores.persistentTimestampedWindowStore((String)IQv2StoreIntegrationTest.STORE_NAME, (Duration)Duration.ofDays(1L), (Duration)WINDOW_SIZE, (boolean)false);
            }

            @Override
            public boolean isWindowed() {
                return true;
            }
        }
        ,
        IN_MEMORY_SESSION{

            @Override
            public StoreSupplier<?> supplier() {
                return Stores.inMemorySessionStore((String)IQv2StoreIntegrationTest.STORE_NAME, (Duration)Duration.ofDays(1L));
            }

            @Override
            public boolean isSession() {
                return true;
            }
        }
        ,
        ROCKS_SESSION{

            @Override
            public StoreSupplier<?> supplier() {
                return Stores.persistentSessionStore((String)IQv2StoreIntegrationTest.STORE_NAME, (Duration)Duration.ofDays(1L));
            }

            @Override
            public boolean isSession() {
                return true;
            }
        };


        public abstract StoreSupplier<?> supplier();

        public boolean timestamped() {
            return true;
        }

        public boolean global() {
            return false;
        }

        public boolean keyValue() {
            return false;
        }

        public boolean isWindowed() {
            return false;
        }

        public boolean isSession() {
            return false;
        }
    }

    public static class UnknownQuery
    implements Query<Void> {
    }
}

