/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.File;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import kafka.utils.MockTime;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreamsWrapper;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.LagInfo;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category(value={IntegrationTest.class})
public class LagFetchIntegrationTest {
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final long CONSUMER_TIMEOUT_MS = 60000L;
    private final MockTime mockTime;
    private Properties streamsConfiguration;
    private Properties consumerConfiguration;
    private String inputTopicName;
    private String outputTopicName;
    private String stateStoreName;
    @Rule
    public TestName name;

    public LagFetchIntegrationTest() {
        this.mockTime = LagFetchIntegrationTest.CLUSTER.time;
        this.name = new TestName();
    }

    @Before
    public void before() {
        this.inputTopicName = "input-topic-" + this.name.getMethodName();
        this.outputTopicName = "output-topic-" + this.name.getMethodName();
        this.stateStoreName = "lagfetch-test-store" + this.name.getMethodName();
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", "lag-fetch-" + this.name.getMethodName());
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("commit.interval.ms", (Object)100);
        this.consumerConfiguration = new Properties();
        this.consumerConfiguration.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        this.consumerConfiguration.setProperty("group.id", this.name.getMethodName() + "-consumer");
        this.consumerConfiguration.setProperty("auto.offset.reset", "earliest");
        this.consumerConfiguration.setProperty("key.deserializer", StringDeserializer.class.getName());
        this.consumerConfiguration.setProperty("value.deserializer", LongDeserializer.class.getName());
    }

    @After
    public void shutdown() throws Exception {
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void shouldFetchLagsDuringRebalancing(String optimization) throws Exception {
        CountDownLatch latchTillActiveIsRunning = new CountDownLatch(1);
        CountDownLatch latchTillStandbyIsRunning = new CountDownLatch(1);
        CountDownLatch latchTillStandbyHasPartitionsAssigned = new CountDownLatch(1);
        CyclicBarrier lagCheckBarrier = new CyclicBarrier(2);
        ArrayList<KafkaStreamsWrapper> streamsList = new ArrayList<KafkaStreamsWrapper>();
        IntegrationTestUtils.produceKeyValuesSynchronously(this.inputTopicName, Utils.mkSet((Object[])new KeyValue[]{new KeyValue((Object)"k1", (Object)1L), new KeyValue((Object)"k2", (Object)2L), new KeyValue((Object)"k3", (Object)3L), new KeyValue((Object)"k4", (Object)4L), new KeyValue((Object)"k5", (Object)5L)}), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class, (Properties)new Properties()), (Time)this.mockTime);
        for (int i = 0; i < 2; ++i) {
            Properties props = (Properties)this.streamsConfiguration.clone();
            props.put("application.server", "localhost:" + i);
            props.put("client.id", "instance-" + i);
            props.put("topology.optimization", optimization);
            props.put("num.standby.replicas", (Object)1);
            props.put("state.dir", TestUtils.tempDirectory((String)(this.stateStoreName + i)).getAbsolutePath());
            StreamsBuilder builder = new StreamsBuilder();
            KTable kTable = builder.table(this.inputTopicName, Materialized.as((String)this.stateStoreName));
            kTable.toStream().to(this.outputTopicName);
            KafkaStreamsWrapper streams = new KafkaStreamsWrapper(builder.build(props), props);
            streamsList.add(streams);
        }
        KafkaStreamsWrapper activeStreams = (KafkaStreamsWrapper)((Object)streamsList.get(0));
        KafkaStreamsWrapper standbyStreams = (KafkaStreamsWrapper)((Object)streamsList.get(1));
        activeStreams.setStreamThreadStateListener((thread, newState, oldState) -> {
            if (newState == StreamThread.State.RUNNING) {
                latchTillActiveIsRunning.countDown();
            }
        });
        standbyStreams.setStreamThreadStateListener((thread, newState, oldState) -> {
            if (oldState == StreamThread.State.PARTITIONS_ASSIGNED && newState == StreamThread.State.RUNNING) {
                latchTillStandbyHasPartitionsAssigned.countDown();
                try {
                    lagCheckBarrier.await(60L, TimeUnit.SECONDS);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            } else if (newState == StreamThread.State.RUNNING) {
                latchTillStandbyIsRunning.countDown();
            }
        });
        try {
            Map offsetLagInfoMap = activeStreams.allLocalStorePartitionLags();
            MatcherAssert.assertThat((Object)offsetLagInfoMap.size(), (Matcher)IsEqual.equalTo((Object)0));
            activeStreams.start();
            latchTillActiveIsRunning.await(60L, TimeUnit.SECONDS);
            IntegrationTestUtils.waitUntilMinValuesRecordsReceived(this.consumerConfiguration, this.outputTopicName, 5, 60000L);
            offsetLagInfoMap = activeStreams.allLocalStorePartitionLags();
            MatcherAssert.assertThat((Object)offsetLagInfoMap.size(), (Matcher)IsEqual.equalTo((Object)1));
            MatcherAssert.assertThat(offsetLagInfoMap.keySet(), (Matcher)IsEqual.equalTo((Object)Utils.mkSet((Object[])new String[]{this.stateStoreName})));
            MatcherAssert.assertThat((Object)((Map)offsetLagInfoMap.get(this.stateStoreName)).size(), (Matcher)IsEqual.equalTo((Object)1));
            LagInfo lagInfo = (LagInfo)((Map)offsetLagInfoMap.get(this.stateStoreName)).get(0);
            MatcherAssert.assertThat((Object)lagInfo.currentOffsetPosition(), (Matcher)IsEqual.equalTo((Object)5L));
            MatcherAssert.assertThat((Object)lagInfo.endOffsetPosition(), (Matcher)IsEqual.equalTo((Object)5L));
            MatcherAssert.assertThat((Object)lagInfo.offsetLag(), (Matcher)IsEqual.equalTo((Object)0L));
            standbyStreams.start();
            latchTillStandbyHasPartitionsAssigned.await(60L, TimeUnit.SECONDS);
            offsetLagInfoMap = standbyStreams.allLocalStorePartitionLags();
            MatcherAssert.assertThat((Object)offsetLagInfoMap.size(), (Matcher)IsEqual.equalTo((Object)1));
            MatcherAssert.assertThat(offsetLagInfoMap.keySet(), (Matcher)IsEqual.equalTo((Object)Utils.mkSet((Object[])new String[]{this.stateStoreName})));
            MatcherAssert.assertThat((Object)((Map)offsetLagInfoMap.get(this.stateStoreName)).size(), (Matcher)IsEqual.equalTo((Object)1));
            LagInfo lagInfo2 = (LagInfo)((Map)offsetLagInfoMap.get(this.stateStoreName)).get(0);
            MatcherAssert.assertThat((Object)lagInfo2.currentOffsetPosition(), (Matcher)IsEqual.equalTo((Object)0L));
            MatcherAssert.assertThat((Object)lagInfo2.endOffsetPosition(), (Matcher)IsEqual.equalTo((Object)5L));
            MatcherAssert.assertThat((Object)lagInfo2.offsetLag(), (Matcher)IsEqual.equalTo((Object)5L));
            lagCheckBarrier.await(60L, TimeUnit.SECONDS);
            TestUtils.waitForCondition(() -> ((LagInfo)((Map)standbyStreams.allLocalStorePartitionLags().get(this.stateStoreName)).get(0)).offsetLag() == 0L, (String)"Standby should eventually catchup and have zero lag.");
        }
        finally {
            for (KafkaStreams kafkaStreams : streamsList) {
                kafkaStreams.close();
            }
        }
    }

    @Test
    public void shouldFetchLagsDuringRebalancingWithOptimization() throws Exception {
        this.shouldFetchLagsDuringRebalancing("all");
    }

    @Test
    public void shouldFetchLagsDuringRebalancingWithNoOptimization() throws Exception {
        this.shouldFetchLagsDuringRebalancing("none");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldFetchLagsDuringRestoration() throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronously(this.inputTopicName, Utils.mkSet((Object[])new KeyValue[]{new KeyValue((Object)"k1", (Object)1L), new KeyValue((Object)"k2", (Object)2L), new KeyValue((Object)"k3", (Object)3L), new KeyValue((Object)"k4", (Object)4L), new KeyValue((Object)"k5", (Object)5L)}), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class, (Properties)new Properties()), (Time)this.mockTime);
        Properties props = (Properties)this.streamsConfiguration.clone();
        File stateDir = TestUtils.tempDirectory((String)(this.stateStoreName + "0"));
        props.put("application.server", "localhost:0");
        props.put("client.id", "instance-0");
        props.put("state.dir", stateDir.getAbsolutePath());
        StreamsBuilder builder = new StreamsBuilder();
        KTable t1 = builder.table(this.inputTopicName, Materialized.as((String)this.stateStoreName));
        t1.toStream().to(this.outputTopicName);
        try (KafkaStreams streams = new KafkaStreams(builder.build(), props);){
            Map offsetLagInfoMap = streams.allLocalStorePartitionLags();
            MatcherAssert.assertThat((Object)offsetLagInfoMap.size(), (Matcher)IsEqual.equalTo((Object)0));
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(streams), Duration.ofSeconds(60L));
            IntegrationTestUtils.waitUntilMinValuesRecordsReceived(this.consumerConfiguration, this.outputTopicName, 5, 60000L);
            offsetLagInfoMap = streams.allLocalStorePartitionLags();
            MatcherAssert.assertThat((Object)offsetLagInfoMap.size(), (Matcher)IsEqual.equalTo((Object)1));
            MatcherAssert.assertThat(offsetLagInfoMap.keySet(), (Matcher)IsEqual.equalTo((Object)Utils.mkSet((Object[])new String[]{this.stateStoreName})));
            MatcherAssert.assertThat((Object)((Map)offsetLagInfoMap.get(this.stateStoreName)).size(), (Matcher)IsEqual.equalTo((Object)1));
            LagInfo zeroLagInfo = (LagInfo)((Map)offsetLagInfoMap.get(this.stateStoreName)).get(0);
            MatcherAssert.assertThat((Object)zeroLagInfo.currentOffsetPosition(), (Matcher)IsEqual.equalTo((Object)5L));
            MatcherAssert.assertThat((Object)zeroLagInfo.endOffsetPosition(), (Matcher)IsEqual.equalTo((Object)5L));
            MatcherAssert.assertThat((Object)zeroLagInfo.offsetLag(), (Matcher)IsEqual.equalTo((Object)0L));
            MatcherAssert.assertThat((String)"Streams instance did not close within timeout", (boolean)streams.close(Duration.ofSeconds(60L)));
            IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
            Files.walk(stateDir.toPath(), new FileVisitOption[0]).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(f -> Assert.assertTrue((String)("Some state " + f + " could not be deleted"), (boolean)f.delete()));
            final KafkaStreams restartedStreams = new KafkaStreams(builder.build(), props);
            final HashMap restoreStartLagInfo = new HashMap();
            final HashMap restoreEndLagInfo = new HashMap();
            restartedStreams.setGlobalStateRestoreListener(new StateRestoreListener(){

                public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) {
                    restoreStartLagInfo.putAll(restartedStreams.allLocalStorePartitionLags());
                }

                public void onBatchRestored(TopicPartition topicPartition, String storeName, long batchEndOffset, long numRestored) {
                }

                public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) {
                    restoreEndLagInfo.putAll(restartedStreams.allLocalStorePartitionLags());
                }
            });
            restartedStreams.start();
            TestUtils.waitForCondition(() -> ((LagInfo)((Map)restartedStreams.allLocalStorePartitionLags().get(this.stateStoreName)).get(0)).offsetLag() == 0L, (String)"Standby should eventually catchup and have zero lag.");
            LagInfo fullLagInfo = (LagInfo)((Map)restoreStartLagInfo.get(this.stateStoreName)).get(0);
            MatcherAssert.assertThat((Object)fullLagInfo.currentOffsetPosition(), (Matcher)IsEqual.equalTo((Object)0L));
            MatcherAssert.assertThat((Object)fullLagInfo.endOffsetPosition(), (Matcher)IsEqual.equalTo((Object)5L));
            MatcherAssert.assertThat((Object)fullLagInfo.offsetLag(), (Matcher)IsEqual.equalTo((Object)5L));
            MatcherAssert.assertThat((Object)zeroLagInfo, (Matcher)IsEqual.equalTo(((Map)restoreEndLagInfo.get(this.stateStoreName)).get(0)));
        }
    }
}

