/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

public class RestoreIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final String APPID = "restore-test";
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final String INPUT_STREAM = "input-stream";
    private static final String INPUT_STREAM_2 = "input-stream-2";
    private final int numberOfKeys = 10000;
    private KafkaStreams kafkaStreams;

    @BeforeClass
    public static void createTopics() throws InterruptedException {
        CLUSTER.createTopic(INPUT_STREAM, 2, 1);
        CLUSTER.createTopic(INPUT_STREAM_2, 2, 1);
        CLUSTER.createTopic("restore-test-store-changelog", 2, 1);
    }

    private Properties props(String applicationId) {
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("application.id", applicationId);
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("cache.max.bytes.buffering", (Object)0);
        streamsConfiguration.put("state.dir", TestUtils.tempDirectory((String)applicationId).getPath());
        streamsConfiguration.put("key.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("value.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("commit.interval.ms", (Object)1000);
        streamsConfiguration.put("auto.offset.reset", "earliest");
        return streamsConfiguration;
    }

    @After
    public void shutdown() {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close(30L, TimeUnit.SECONDS);
        }
    }

    @Test
    public void shouldRestoreStateFromSourceTopic() throws Exception {
        final AtomicInteger numReceived = new AtomicInteger(0);
        KStreamBuilder builder = new KStreamBuilder();
        Properties props = this.props(APPID);
        int offsetLimitDelta = 1000;
        int offsetCheckpointed = 1000;
        this.createStateForRestoration(INPUT_STREAM);
        this.setCommittedOffset(INPUT_STREAM, 1000);
        StateDirectory stateDirectory = new StateDirectory(APPID, props.getProperty("state.dir"));
        new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 0)), ".checkpoint")).write(Collections.singletonMap(new TopicPartition(INPUT_STREAM, 0), 1000L));
        new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 1)), ".checkpoint")).write(Collections.singletonMap(new TopicPartition(INPUT_STREAM, 1), 1000L));
        final CountDownLatch startupLatch = new CountDownLatch(1);
        final CountDownLatch shutdownLatch = new CountDownLatch(1);
        builder.table(Serdes.Integer(), Serdes.Integer(), INPUT_STREAM, "store").toStream().foreach((ForeachAction)new ForeachAction<Integer, Integer>(){

            public void apply(Integer key, Integer value) {
                if (numReceived.incrementAndGet() == 2000) {
                    shutdownLatch.countDown();
                }
            }
        });
        this.kafkaStreams = new KafkaStreams((TopologyBuilder)builder, props);
        this.kafkaStreams.setStateListener(new KafkaStreams.StateListener(){

            public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
                if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                    startupLatch.countDown();
                }
            }
        });
        this.kafkaStreams.start();
        Assert.assertTrue((boolean)startupLatch.await(30L, TimeUnit.SECONDS));
        ReadOnlyKeyValueStore store = (ReadOnlyKeyValueStore)this.kafkaStreams.store("store", QueryableStoreTypes.keyValueStore());
        MatcherAssert.assertThat((Object)store.approximateNumEntries(), (Matcher)IsEqual.equalTo((Object)6000L));
        Assert.assertTrue((boolean)shutdownLatch.await(30L, TimeUnit.SECONDS));
        MatcherAssert.assertThat((Object)numReceived.get(), (Matcher)IsEqual.equalTo((Object)2000));
        MatcherAssert.assertThat((Object)store.approximateNumEntries(), (Matcher)IsEqual.equalTo((Object)8000L));
    }

    private void createStateForRestoration(String changelogTopic) {
        Properties producerConfig = new Properties();
        producerConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        try (KafkaProducer producer = new KafkaProducer(producerConfig, (Serializer)new IntegerSerializer(), (Serializer)new IntegerSerializer());){
            for (int i = 0; i < 10000; ++i) {
                producer.send(new ProducerRecord(changelogTopic, (Object)i, (Object)i));
            }
        }
    }

    private void setCommittedOffset(String topic, int limitDelta) {
        Properties consumerConfig = new Properties();
        consumerConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        consumerConfig.put("group.id", APPID);
        consumerConfig.put("client.id", "commit-consumer");
        consumerConfig.put("key.deserializer", IntegerDeserializer.class);
        consumerConfig.put("value.deserializer", IntegerDeserializer.class);
        KafkaConsumer consumer = new KafkaConsumer(consumerConfig);
        List<TopicPartition> partitions = Arrays.asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1));
        consumer.assign(partitions);
        consumer.seekToEnd(partitions);
        for (TopicPartition partition : partitions) {
            long position = consumer.position(partition);
            consumer.seek(partition, position - (long)limitDelta);
        }
        consumer.commitSync();
        consumer.close();
    }
}

