/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class OptimizedKTableIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static int port = 0;
    private static final String INPUT_TOPIC_NAME = "input-topic";
    private static final String TABLE_NAME = "source-table";
    @Rule
    public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(1);
    private final List<KafkaStreams> streamsToCleanup = new ArrayList<KafkaStreams>();
    private final MockTime mockTime;

    public OptimizedKTableIntegrationTest() {
        this.mockTime = this.cluster.time;
    }

    @Before
    public void before() throws InterruptedException {
        this.cluster.createTopic(INPUT_TOPIC_NAME, 2, 1);
    }

    @After
    public void after() {
        for (KafkaStreams kafkaStreams : this.streamsToCleanup) {
            kafkaStreams.close();
        }
    }

    @Test
    public void standbyShouldNotPerformRestoreAtStartup() throws Exception {
        int numMessages = 10;
        boolean key = true;
        Semaphore semaphore = new Semaphore(0);
        StreamsBuilder builder = new StreamsBuilder();
        builder.table(INPUT_TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()), Materialized.as((String)TABLE_NAME).withCachingDisabled()).toStream().peek((k, v) -> semaphore.release());
        KafkaStreams kafkaStreams1 = this.createKafkaStreams(builder, this.streamsConfiguration());
        KafkaStreams kafkaStreams2 = this.createKafkaStreams(builder, this.streamsConfiguration());
        List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2);
        this.produceValueRange(1, 0, 10);
        AtomicLong restoreStartOffset = new AtomicLong(-1L);
        kafkaStreamsList.forEach(kafkaStreams -> kafkaStreams.setGlobalStateRestoreListener(this.createTrackingRestoreListener(restoreStartOffset, new AtomicLong())));
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60L));
        MatcherAssert.assertThat((Object)semaphore.tryAcquire(10, 60L, TimeUnit.SECONDS), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)true)));
        MatcherAssert.assertThat((Object)restoreStartOffset.get(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)-1L)));
    }

    @Test
    public void shouldApplyUpdatesToStandbyStore() throws Exception {
        boolean kafkaStreams1WasFirstActive;
        int batch1NumMessages = 100;
        int batch2NumMessages = 100;
        boolean key = true;
        Semaphore semaphore = new Semaphore(0);
        StreamsBuilder builder = new StreamsBuilder();
        builder.table(INPUT_TOPIC_NAME, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()), Materialized.as((String)TABLE_NAME).withCachingDisabled()).toStream().peek((k, v) -> semaphore.release());
        KafkaStreams kafkaStreams1 = this.createKafkaStreams(builder, this.streamsConfiguration());
        KafkaStreams kafkaStreams2 = this.createKafkaStreams(builder, this.streamsConfiguration());
        List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2);
        AtomicLong restoreStartOffset = new AtomicLong(-1L);
        AtomicLong restoreEndOffset = new AtomicLong(-1L);
        kafkaStreamsList.forEach(kafkaStreams -> kafkaStreams.setGlobalStateRestoreListener(this.createTrackingRestoreListener(restoreStartOffset, restoreEndOffset)));
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60L));
        this.produceValueRange(1, 0, 100);
        MatcherAssert.assertThat((Object)semaphore.tryAcquire(100, 60L, TimeUnit.SECONDS), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)true)));
        ReadOnlyKeyValueStore store1 = (ReadOnlyKeyValueStore)kafkaStreams1.store(StoreQueryParameters.fromNameAndType((String)TABLE_NAME, (QueryableStoreType)QueryableStoreTypes.keyValueStore()));
        ReadOnlyKeyValueStore store2 = (ReadOnlyKeyValueStore)kafkaStreams2.store(StoreQueryParameters.fromNameAndType((String)TABLE_NAME, (QueryableStoreType)QueryableStoreTypes.keyValueStore()));
        KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, (Object)1, (topic, somekey, value, numPartitions) -> 0);
        if (keyQueryMetadata.getActiveHost().port() % 2 == 1) {
            kafkaStreams1WasFirstActive = true;
        } else {
            MatcherAssert.assertThat((Object)store2.get((Object)1), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
            kafkaStreams1WasFirstActive = false;
        }
        MatcherAssert.assertThat((Object)restoreStartOffset.get(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)-1L)));
        MatcherAssert.assertThat((Object)(kafkaStreams1WasFirstActive ? (Integer)store1.get((Object)1) : (Integer)store2.get((Object)1)), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)99)));
        if (kafkaStreams1WasFirstActive) {
            kafkaStreams1.close();
        } else {
            kafkaStreams2.close();
        }
        ReadOnlyKeyValueStore newActiveStore = kafkaStreams1WasFirstActive ? store2 : store1;
        TestUtils.retryOnExceptionWithTimeout((long)100L, (long)60000L, () -> MatcherAssert.assertThat((Object)newActiveStore.get((Object)1), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)99))));
        int totalNumMessages = 200;
        this.produceValueRange(1, 100, 200);
        MatcherAssert.assertThat((Object)semaphore.tryAcquire(100, 60L, TimeUnit.SECONDS), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)true)));
        MatcherAssert.assertThat((Object)restoreStartOffset.get(), (Matcher)Matchers.is((Matcher)Matchers.anyOf((Matcher)Matchers.greaterThan((Comparable)Long.valueOf(0L)), (Matcher)Matchers.equalTo((Object)-1L))));
        MatcherAssert.assertThat((Object)restoreEndOffset.get(), (Matcher)Matchers.is((Matcher)Matchers.anyOf((Matcher)Matchers.equalTo((Object)99L), (Matcher)Matchers.equalTo((Object)-1L))));
        MatcherAssert.assertThat((Object)newActiveStore.get((Object)1), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)199)));
    }

    private void produceValueRange(int key, int start, int endExclusive) throws Exception {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", this.cluster.bootstrapServers());
        producerProps.put("key.serializer", IntegerSerializer.class);
        producerProps.put("value.serializer", IntegerSerializer.class);
        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_TOPIC_NAME, IntStream.range(start, endExclusive).mapToObj(i -> KeyValue.pair((Object)key, (Object)i)).collect(Collectors.toList()), producerProps, (Time)this.mockTime);
    }

    private KafkaStreams createKafkaStreams(StreamsBuilder builder, Properties config) {
        KafkaStreams streams = new KafkaStreams(builder.build(config), config);
        this.streamsToCleanup.add(streams);
        return streams;
    }

    private StateRestoreListener createTrackingRestoreListener(final AtomicLong restoreStartOffset, final AtomicLong restoreEndOffset) {
        return new StateRestoreListener(){

            public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) {
                restoreStartOffset.set(startingOffset);
                restoreEndOffset.set(endingOffset);
            }

            public void onBatchRestored(TopicPartition topicPartition, String storeName, long batchEndOffset, long numRestored) {
            }

            public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) {
            }
        };
    }

    private Properties streamsConfiguration() {
        String applicationId = "streamsApp";
        Properties config = new Properties();
        config.put("topology.optimization", "all");
        config.put("application.id", "streamsApp");
        config.put("application.server", "localhost:" + String.valueOf(++port));
        config.put("bootstrap.servers", this.cluster.bootstrapServers());
        config.put("state.dir", TestUtils.tempDirectory((String)"streamsApp").getPath());
        config.put("default.key.serde", Serdes.Integer().getClass());
        config.put("default.value.serde", Serdes.Integer().getClass());
        config.put("num.standby.replicas", (Object)1);
        config.put("max.poll.records", (Object)100);
        config.put("heartbeat.interval.ms", (Object)200);
        config.put("session.timeout.ms", (Object)1000);
        config.put("commit.interval.ms", (Object)100);
        return config;
    }
}

