/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.StreamsNotStartedException;
import org.apache.kafka.streams.errors.StreamsStoppedException;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.StateQueryRequest;
import org.apache.kafka.streams.query.StateQueryResult;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.StoreQueryUtils;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.Assertions;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;

@Category(value={IntegrationTest.class})
public class IQv2IntegrationTest {
    @Rule
    public Timeout globalTimeout = Timeout.seconds((long)600L);
    private static final int NUM_BROKERS = 1;
    public static final Duration WINDOW_SIZE = Duration.ofMinutes(5L);
    private static int port = 0;
    private static final String INPUT_TOPIC_NAME = "input-topic";
    private static final Position INPUT_POSITION = Position.emptyPosition();
    private static final String STORE_NAME = "kv-store";
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private KafkaStreams kafkaStreams;
    @Rule
    public TestName testName = new TestName();

    @BeforeClass
    public static void before() throws InterruptedException, IOException, ExecutionException, TimeoutException {
        CLUSTER.start();
        int partitions = 2;
        CLUSTER.createTopic(INPUT_TOPIC_NAME, 2, 1);
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", CLUSTER.bootstrapServers());
        producerProps.put("key.serializer", IntegerSerializer.class);
        producerProps.put("value.serializer", IntegerSerializer.class);
        LinkedList<Future> futures = new LinkedList<Future>();
        try (KafkaProducer producer = new KafkaProducer(producerProps);){
            for (int i = 0; i < 3; ++i) {
                Future send = producer.send(new ProducerRecord(INPUT_TOPIC_NAME, Integer.valueOf(i % 2), Long.valueOf(Time.SYSTEM.milliseconds()), (Object)i, (Object)i, null));
                futures.add(send);
                Time.SYSTEM.sleep(1L);
            }
            producer.flush();
            for (Future future : futures) {
                RecordMetadata recordMetadata = (RecordMetadata)future.get(1L, TimeUnit.MINUTES);
                MatcherAssert.assertThat((Object)recordMetadata.hasOffset(), (Matcher)Matchers.is((Object)true));
                INPUT_POSITION.withComponent(recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
            }
        }
        MatcherAssert.assertThat((Object)INPUT_POSITION, (Matcher)Matchers.equalTo((Object)Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 1L).withComponent(INPUT_TOPIC_NAME, 1, 0L)));
    }

    @Before
    public void beforeTest() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.table(INPUT_TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()), Materialized.as((String)STORE_NAME));
        this.kafkaStreams = new KafkaStreams(builder.build(), this.streamsConfiguration());
        this.kafkaStreams.cleanUp();
    }

    @After
    public void afterTest() {
        this.kafkaStreams.close();
        this.kafkaStreams.cleanUp();
    }

    @AfterClass
    public static void after() {
        CLUSTER.stop();
    }

    @Test
    public void shouldFailUnknownStore() {
        KeyQuery query = KeyQuery.withKey((Object)1);
        StateQueryRequest request = StateQueryRequest.inStore((String)"unknown-store").withQuery((Query)query);
        Assertions.assertThrows(UnknownStateStoreException.class, () -> this.kafkaStreams.query(request));
    }

    @Test
    public void shouldFailNotStarted() {
        KeyQuery query = KeyQuery.withKey((Object)1);
        StateQueryRequest request = StateQueryRequest.inStore((String)STORE_NAME).withQuery((Query)query);
        Assertions.assertThrows(StreamsNotStartedException.class, () -> this.kafkaStreams.query(request));
    }

    @Test
    public void shouldFailStopped() {
        KeyQuery query = KeyQuery.withKey((Object)1);
        StateQueryRequest request = StateQueryRequest.inStore((String)STORE_NAME).withQuery((Query)query);
        this.kafkaStreams.start();
        this.kafkaStreams.close(Duration.ZERO);
        Assertions.assertThrows(StreamsStoppedException.class, () -> this.kafkaStreams.query(request));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldRejectNonRunningActive() throws NoSuchFieldException, IllegalAccessException {
        KeyQuery query = KeyQuery.withKey((Object)1);
        StateQueryRequest request = StateQueryRequest.inStore((String)STORE_NAME).withQuery((Query)query).requireActive();
        Set partitions = Utils.mkSet((Object[])new Integer[]{0, 1});
        this.kafkaStreams.start();
        Field threadsField = KafkaStreams.class.getDeclaredField("threads");
        threadsField.setAccessible(true);
        List threads = (List)threadsField.get(this.kafkaStreams);
        StreamThread streamThread = (StreamThread)threads.get(0);
        Field stateLock = StreamThread.class.getDeclaredField("stateLock");
        stateLock.setAccessible(true);
        Object lock = stateLock.get(streamThread);
        IntegrationTestUtils.iqv2WaitForPartitions(this.kafkaStreams, StateQueryRequest.inStore((String)STORE_NAME).withQuery((Query)query), partitions);
        Object object = lock;
        synchronized (object) {
            Field stateField = StreamThread.class.getDeclaredField("state");
            stateField.setAccessible(true);
            stateField.set(streamThread, StreamThread.State.PARTITIONS_ASSIGNED);
            StateQueryResult result = IntegrationTestUtils.iqv2WaitForPartitions(this.kafkaStreams, request, partitions);
            MatcherAssert.assertThat(result.getPartitionResults().keySet(), (Matcher)Matchers.is((Object)partitions));
            for (Integer partition : partitions) {
                MatcherAssert.assertThat((Object)((QueryResult)result.getPartitionResults().get(partition)).isFailure(), (Matcher)Matchers.is((Object)true));
                MatcherAssert.assertThat((Object)((QueryResult)result.getPartitionResults().get(partition)).getFailureReason(), (Matcher)Matchers.is((Object)FailureReason.NOT_ACTIVE));
                MatcherAssert.assertThat((Object)((QueryResult)result.getPartitionResults().get(partition)).getFailureMessage(), (Matcher)Matchers.is((Object)"Query requires a running active task, but partition was in state PARTITIONS_ASSIGNED and was active."));
            }
        }
    }

    @Test
    public void shouldFetchFromPartition() {
        KeyQuery query = KeyQuery.withKey((Object)1);
        boolean partition = true;
        Set<Integer> partitions = Collections.singleton(1);
        StateQueryRequest request = StateQueryRequest.inStore((String)STORE_NAME).withQuery((Query)query).withPartitions(partitions);
        this.kafkaStreams.start();
        StateQueryResult result = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, request);
        MatcherAssert.assertThat(result.getPartitionResults().keySet(), (Matcher)Matchers.equalTo(partitions));
    }

    @Test
    public void shouldFetchExplicitlyFromAllPartitions() {
        KeyQuery query = KeyQuery.withKey((Object)1);
        Set partitions = Utils.mkSet((Object[])new Integer[]{0, 1});
        StateQueryRequest request = StateQueryRequest.inStore((String)STORE_NAME).withQuery((Query)query).withAllPartitions();
        this.kafkaStreams.start();
        StateQueryResult result = IntegrationTestUtils.iqv2WaitForPartitions(this.kafkaStreams, request, partitions);
        MatcherAssert.assertThat(result.getPartitionResults().keySet(), (Matcher)Matchers.equalTo((Object)partitions));
    }

    @Test
    public void shouldNotRequireQueryHandler() {
        KeyQuery query = KeyQuery.withKey((Object)1);
        boolean partition = true;
        Set<Integer> partitions = Collections.singleton(1);
        StateQueryRequest request = StateQueryRequest.inStore((String)STORE_NAME).withQuery((Query)query).withPartitions(partitions);
        StreamsBuilder builder = new StreamsBuilder();
        builder.table(INPUT_TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()), Materialized.as((KeyValueBytesStoreSupplier)new KeyValueBytesStoreSupplier(){

            public String name() {
                return IQv2IntegrationTest.STORE_NAME;
            }

            public KeyValueStore<Bytes, byte[]> get() {
                return new KeyValueStore<Bytes, byte[]>(){
                    private boolean open = false;
                    private Map<Bytes, byte[]> map = new HashMap<Bytes, byte[]>();
                    private Position position;
                    private StateStoreContext context;

                    public void put(Bytes key, byte[] value) {
                        this.map.put(key, value);
                        StoreQueryUtils.updatePosition((Position)this.position, (StateStoreContext)this.context);
                    }

                    public byte[] putIfAbsent(Bytes key, byte[] value) {
                        StoreQueryUtils.updatePosition((Position)this.position, (StateStoreContext)this.context);
                        return this.map.putIfAbsent(key, value);
                    }

                    public void putAll(List<KeyValue<Bytes, byte[]>> entries) {
                        StoreQueryUtils.updatePosition((Position)this.position, (StateStoreContext)this.context);
                        for (KeyValue<Bytes, byte[]> entry : entries) {
                            this.map.put((Bytes)entry.key, (byte[])entry.value);
                        }
                    }

                    public byte[] delete(Bytes key) {
                        StoreQueryUtils.updatePosition((Position)this.position, (StateStoreContext)this.context);
                        return this.map.remove(key);
                    }

                    public String name() {
                        return IQv2IntegrationTest.STORE_NAME;
                    }

                    @Deprecated
                    public void init(ProcessorContext context, StateStore root) {
                        throw new UnsupportedOperationException();
                    }

                    public void init(StateStoreContext context, StateStore root) {
                        context.register(root, (key, value) -> this.put(Bytes.wrap((byte[])key), value));
                        this.open = true;
                        this.position = Position.emptyPosition();
                        this.context = context;
                    }

                    public void flush() {
                    }

                    public void close() {
                        this.open = false;
                        this.map.clear();
                    }

                    public boolean persistent() {
                        return false;
                    }

                    public boolean isOpen() {
                        return this.open;
                    }

                    public Position getPosition() {
                        return this.position;
                    }

                    public byte[] get(Bytes key) {
                        return this.map.get(key);
                    }

                    public KeyValueIterator<Bytes, byte[]> range(Bytes from, Bytes to) {
                        throw new UnsupportedOperationException();
                    }

                    public KeyValueIterator<Bytes, byte[]> all() {
                        throw new UnsupportedOperationException();
                    }

                    public long approximateNumEntries() {
                        return this.map.size();
                    }
                };
            }

            public String metricsScope() {
                return "nonquery";
            }
        }));
        this.kafkaStreams = new KafkaStreams(builder.build(), this.streamsConfiguration());
        this.kafkaStreams.cleanUp();
        this.kafkaStreams.start();
        StateQueryResult result = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, request);
        QueryResult queryResult = (QueryResult)result.getPartitionResults().get(1);
        MatcherAssert.assertThat((Object)queryResult.isFailure(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)queryResult.getFailureReason(), (Matcher)Matchers.is((Object)FailureReason.UNKNOWN_QUERY_TYPE));
        MatcherAssert.assertThat((Object)queryResult.getFailureMessage(), (Matcher)Matchers.matchesPattern((String)"This store (.*) doesn't know how to execute the given query (.*). Contact the store maintainer if you need support for a new query type."));
    }

    private Properties streamsConfiguration() {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        Properties config = new Properties();
        config.put("topology.optimization", "all");
        config.put("application.id", "app-" + safeTestName);
        config.put("application.server", "localhost:" + ++port);
        config.put("bootstrap.servers", CLUSTER.bootstrapServers());
        config.put("state.dir", TestUtils.tempDirectory().getPath());
        config.put("default.key.serde", Serdes.Integer().getClass());
        config.put("default.value.serde", Serdes.Integer().getClass());
        config.put("num.standby.replicas", (Object)1);
        config.put("max.poll.records", (Object)100);
        config.put("heartbeat.interval.ms", (Object)200);
        config.put("session.timeout.ms", (Object)1000);
        config.put("commit.interval.ms", (Object)100L);
        config.put("num.stream.threads", (Object)1);
        return config;
    }
}

