/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category(value={IntegrationTest.class})
public class SelfJoinUpgradeIntegrationTest {
    public static final String INPUT_TOPIC = "selfjoin-input";
    public static final String OUTPUT_TOPIC = "selfjoin-output";
    private String inputTopic;
    private String outputTopic;
    private KafkaStreams kafkaStreams;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    @Rule
    public TestName testName = new TestName();
    private String safeTestName;

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void createTopics() throws Exception {
        this.safeTestName = IntegrationTestUtils.safeUniqueTestName(this.testName);
        this.inputTopic = INPUT_TOPIC + this.safeTestName;
        this.outputTopic = OUTPUT_TOPIC + this.safeTestName;
        CLUSTER.createTopic(this.inputTopic);
        CLUSTER.createTopic(this.outputTopic);
    }

    private Properties props() {
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("application.id", "app-" + this.safeTestName);
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("statestore.cache.max.bytes", (Object)0);
        streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        streamsConfiguration.put("commit.interval.ms", (Object)1000L);
        streamsConfiguration.put("auto.offset.reset", "earliest");
        return streamsConfiguration;
    }

    @After
    public void shutdown() {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close(Duration.ofSeconds(30L));
            this.kafkaStreams.cleanUp();
        }
    }

    @Test
    public void shouldUpgradeWithTopologyOptimizationOff() throws Exception {
        StreamsBuilder streamsBuilderOld = new StreamsBuilder();
        KStream leftOld = streamsBuilderOld.stream(this.inputTopic, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        ValueJoiner valueJoiner = (v, v2) -> v + v2;
        KStream joinedOld = leftOld.join(leftOld, valueJoiner, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMinutes(100L)));
        joinedOld.to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.testName);
        Properties props = this.props();
        props.put("topology.optimization", "none");
        this.kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
        this.kafkaStreams.start();
        long currentTime = SelfJoinUpgradeIntegrationTest.CLUSTER.time.milliseconds();
        this.processKeyValueAndVerifyCount("1", "A", currentTime + 42L, Arrays.asList(new KeyValueTimestamp<String, String>("1", "AA", currentTime + 42L)));
        this.processKeyValueAndVerifyCount("1", "B", currentTime + 43L, Arrays.asList(new KeyValueTimestamp<String, String>("1", "BA", currentTime + 43L), new KeyValueTimestamp<String, String>("1", "AB", currentTime + 43L), new KeyValueTimestamp<String, String>("1", "BB", currentTime + 43L)));
        this.kafkaStreams.close();
        this.kafkaStreams = null;
        props.put("topology.optimization", "all");
        this.kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
        this.kafkaStreams.start();
        long currentTimeNew = SelfJoinUpgradeIntegrationTest.CLUSTER.time.milliseconds();
        this.processKeyValueAndVerifyCount("1", "C", currentTimeNew + 44L, Arrays.asList(new KeyValueTimestamp<String, String>("1", "CA", currentTimeNew + 44L), new KeyValueTimestamp<String, String>("1", "CB", currentTimeNew + 44L), new KeyValueTimestamp<String, String>("1", "AC", currentTimeNew + 44L), new KeyValueTimestamp<String, String>("1", "BC", currentTimeNew + 44L), new KeyValueTimestamp<String, String>("1", "CC", currentTimeNew + 44L)));
        this.kafkaStreams.close();
    }

    @Test
    public void shouldRestartWithTopologyOptimizationOn() throws Exception {
        StreamsBuilder streamsBuilderOld = new StreamsBuilder();
        KStream leftOld = streamsBuilderOld.stream(this.inputTopic, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        ValueJoiner valueJoiner = (v, v2) -> v + v2;
        KStream joinedOld = leftOld.join(leftOld, valueJoiner, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMinutes(100L)));
        joinedOld.to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        Properties props = this.props();
        props.put("topology.optimization", "all");
        this.kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
        this.kafkaStreams.start();
        long currentTime = SelfJoinUpgradeIntegrationTest.CLUSTER.time.milliseconds();
        this.processKeyValueAndVerifyCount("1", "A", currentTime + 42L, Arrays.asList(new KeyValueTimestamp<String, String>("1", "AA", currentTime + 42L)));
        this.processKeyValueAndVerifyCount("1", "B", currentTime + 43L, Arrays.asList(new KeyValueTimestamp<String, String>("1", "BA", currentTime + 43L), new KeyValueTimestamp<String, String>("1", "AB", currentTime + 43L), new KeyValueTimestamp<String, String>("1", "BB", currentTime + 43L)));
        this.kafkaStreams.close();
        this.kafkaStreams = null;
        props.put("topology.optimization", "all");
        this.kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
        this.kafkaStreams.start();
        long currentTimeNew = SelfJoinUpgradeIntegrationTest.CLUSTER.time.milliseconds();
        this.processKeyValueAndVerifyCount("1", "C", currentTimeNew + 44L, Arrays.asList(new KeyValueTimestamp<String, String>("1", "CA", currentTimeNew + 44L), new KeyValueTimestamp<String, String>("1", "CB", currentTimeNew + 44L), new KeyValueTimestamp<String, String>("1", "AC", currentTimeNew + 44L), new KeyValueTimestamp<String, String>("1", "BC", currentTimeNew + 44L), new KeyValueTimestamp<String, String>("1", "CC", currentTimeNew + 44L)));
        this.kafkaStreams.close();
    }

    private <K, V> boolean processKeyValueAndVerifyCount(K key, V value, long timestamp, List<KeyValueTimestamp<K, V>> expected) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, Collections.singletonList(KeyValue.pair(key, value)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class), timestamp);
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        consumerProperties.setProperty("group.id", "group-" + this.safeTestName);
        consumerProperties.setProperty("auto.offset.reset", "earliest");
        consumerProperties.setProperty("key.deserializer", StringDeserializer.class.getName());
        consumerProperties.setProperty("value.deserializer", StringDeserializer.class.getName());
        consumerProperties.put("window.size.ms", (Object)500L);
        List actual = IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(consumerProperties, this.outputTopic, expected.size(), 60000L);
        MatcherAssert.assertThat(actual, (Matcher)Is.is(expected));
        return actual.equals(expected);
    }
}

