/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag(value="integration")
@Timeout(value=600L)
public class KStreamRepartitionIntegrationTest {
    private static final int NUM_BROKERS = 1;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private String topicB;
    private String inputTopic;
    private String outputTopic;
    private String applicationId;
    private String safeTestName;
    private List<KafkaStreams> kafkaStreamsInstances;
    private final File testFolder = TestUtils.tempDirectory();

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void before(TestInfo testInfo) throws InterruptedException {
        this.kafkaStreamsInstances = new ArrayList<KafkaStreams>();
        this.safeTestName = IntegrationTestUtils.safeUniqueTestName(testInfo);
        this.topicB = "topic-b-" + this.safeTestName;
        this.inputTopic = "input-topic-" + this.safeTestName;
        this.outputTopic = "output-topic-" + this.safeTestName;
        this.applicationId = "app-" + this.safeTestName;
        CLUSTER.createTopic(this.inputTopic, 4, 1);
        CLUSTER.createTopic(this.outputTopic, 1, 1);
    }

    private Properties createStreamsConfig(String topologyOptimization) {
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("application.id", this.applicationId);
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("state.dir", this.testFolder.getPath());
        streamsConfiguration.put("statestore.cache.max.bytes", (Object)0);
        streamsConfiguration.put("commit.interval.ms", (Object)100L);
        streamsConfiguration.put("default.key.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        streamsConfiguration.put("topology.optimization", topologyOptimization);
        return streamsConfiguration;
    }

    @AfterEach
    public void whenShuttingDown() throws IOException {
        this.kafkaStreamsInstances.stream().filter(Objects::nonNull).forEach(KafkaStreams::close);
        Utils.delete((File)this.testFolder);
    }

    @ParameterizedTest
    @ValueSource(strings={"all", "none"})
    public void shouldThrowAnExceptionWhenNumberOfPartitionsOfRepartitionOperationDoNotMatchSourceTopicWhenJoining(String topologyOptimization) throws InterruptedException {
        int topicBNumberOfPartitions = 6;
        String inputTopicRepartitionName = "join-repartition-test";
        AtomicReference expectedThrowable = new AtomicReference();
        int inputTopicRepartitionedNumOfPartitions = 2;
        CLUSTER.createTopic(this.topicB, 6, 1);
        StreamsBuilder builder = new StreamsBuilder();
        Repartitioned inputTopicRepartitioned = Repartitioned.as((String)"join-repartition-test").withNumberOfPartitions(2);
        KStream topicBStream = builder.stream(this.topicB, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        builder.stream(this.inputTopic, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).repartition(inputTopicRepartitioned).join(topicBStream, (value1, value2) -> value2, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofSeconds(10L))).to(this.outputTopic);
        Properties streamsConfiguration = this.createStreamsConfig(topologyOptimization);
        builder.build(streamsConfiguration);
        this.startStreams(builder, KafkaStreams.State.REBALANCING, KafkaStreams.State.ERROR, streamsConfiguration, (t, e) -> expectedThrowable.set(e));
        String expectedMsg = String.format("Number of partitions [%s] of repartition topic [%s] doesn't match number of partitions [%s] of the source topic.", 2, this.toRepartitionTopicName("join-repartition-test"), 6);
        Assertions.assertNotNull(expectedThrowable.get());
        Assertions.assertTrue((boolean)((Throwable)expectedThrowable.get()).getMessage().contains(expectedMsg));
    }

    @ParameterizedTest
    @ValueSource(strings={"all", "none"})
    public void shouldDeductNumberOfPartitionsFromRepartitionOperation(String topologyOptimization) throws Exception {
        String topicBMapperName = "topic-b-mapper";
        int topicBNumberOfPartitions = 6;
        String inputTopicRepartitionName = "join-repartition-test";
        int inputTopicRepartitionedNumOfPartitions = 3;
        long timestamp = System.currentTimeMillis();
        CLUSTER.createTopic(this.topicB, 6, 1);
        List expectedRecords = Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)2, (Object)"B"));
        this.sendEvents(timestamp, expectedRecords);
        this.sendEvents(this.topicB, timestamp, expectedRecords);
        StreamsBuilder builder = new StreamsBuilder();
        Repartitioned inputTopicRepartitioned = Repartitioned.as((String)"join-repartition-test").withNumberOfPartitions(3);
        KStream topicBStream = builder.stream(this.topicB, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).map(KeyValue::new, Named.as((String)"topic-b-mapper"));
        builder.stream(this.inputTopic, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).repartition(inputTopicRepartitioned).join(topicBStream, (value1, value2) -> value2, JoinWindows.of((Duration)Duration.ofSeconds(10L))).to(this.outputTopic);
        Properties streamsConfiguration = this.createStreamsConfig(topologyOptimization);
        builder.build(streamsConfiguration);
        this.startStreams(builder, streamsConfiguration);
        Assertions.assertEquals((int)3, (int)this.getNumberOfPartitionsForTopic(this.toRepartitionTopicName("join-repartition-test")));
        Assertions.assertEquals((int)3, (int)this.getNumberOfPartitionsForTopic(this.toRepartitionTopicName("topic-b-mapper")));
        this.validateReceivedMessages((Deserializer)new IntegerDeserializer(), (Deserializer)new StringDeserializer(), expectedRecords);
    }

    @ParameterizedTest
    @ValueSource(strings={"all", "none"})
    public void shouldDoProperJoiningWhenNumberOfPartitionsAreValidWhenUsingRepartitionOperation(String topologyOptimization) throws Exception {
        String topicBRepartitionedName = "topic-b-scale-up";
        String inputTopicRepartitionedName = "input-topic-scale-up";
        long timestamp = System.currentTimeMillis();
        CLUSTER.createTopic(this.topicB, 1, 1);
        List expectedRecords = Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)2, (Object)"B"));
        ArrayList<KeyValue<Integer, String>> recordsToSend = new ArrayList<KeyValue<Integer, String>>(expectedRecords);
        recordsToSend.add((KeyValue)new KeyValue(null, (Object)"C"));
        this.sendEvents(timestamp, recordsToSend);
        this.sendEvents(this.topicB, timestamp, recordsToSend);
        StreamsBuilder builder = new StreamsBuilder();
        Repartitioned inputTopicRepartitioned = Repartitioned.as((String)"input-topic-scale-up").withNumberOfPartitions(4);
        Repartitioned topicBRepartitioned = Repartitioned.as((String)"topic-b-scale-up").withNumberOfPartitions(4);
        KStream topicBStream = builder.stream(this.topicB, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).repartition(topicBRepartitioned);
        builder.stream(this.inputTopic, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).repartition(inputTopicRepartitioned).join(topicBStream, (value1, value2) -> value2, JoinWindows.of((Duration)Duration.ofSeconds(10L))).to(this.outputTopic);
        this.startStreams(builder, this.createStreamsConfig(topologyOptimization));
        Assertions.assertEquals((int)4, (int)this.getNumberOfPartitionsForTopic(this.toRepartitionTopicName("topic-b-scale-up")));
        Assertions.assertEquals((int)4, (int)this.getNumberOfPartitionsForTopic(this.toRepartitionTopicName("input-topic-scale-up")));
        this.validateReceivedMessages((Deserializer)new IntegerDeserializer(), (Deserializer)new StringDeserializer(), expectedRecords);
    }

    @ParameterizedTest
    @ValueSource(strings={"all", "none"})
    public void shouldRepartitionToMultiplePartitions(String topologyOptimization) throws Exception {
        String repartitionName = "broadcasting-partitioner-test";
        long timestamp = System.currentTimeMillis();
        final AtomicInteger partitionerInvocation = new AtomicInteger(0);
        String broadcastingOutputTopic = "broadcast-output-topic-" + this.safeTestName;
        CLUSTER.createTopic(broadcastingOutputTopic, 4, 1);
        List expectedRecordsOnRepartition = Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)2, (Object)"B"), new KeyValue((Object)2, (Object)"B"), new KeyValue((Object)2, (Object)"B"), new KeyValue((Object)2, (Object)"B"));
        List<KeyValue<Integer, String>> expectedRecords = expectedRecordsOnRepartition.subList(3, 5);
        this.sendEvents(timestamp, expectedRecords);
        StreamsBuilder builder = new StreamsBuilder();
        class BroadcastingPartitioner
        implements StreamPartitioner<Integer, String> {
            BroadcastingPartitioner() {
            }

            @Deprecated
            public Integer partition(String topic, Integer key, String value, int numPartitions) {
                return null;
            }

            public Optional<Set<Integer>> partitions(String topic, Integer key, String value, int numPartitions) {
                partitionerInvocation.incrementAndGet();
                return Optional.of(IntStream.range(0, numPartitions).boxed().collect(Collectors.toSet()));
            }
        }
        Repartitioned repartitioned = Repartitioned.as((String)"broadcasting-partitioner-test").withStreamPartitioner((StreamPartitioner)new BroadcastingPartitioner());
        builder.stream(this.inputTopic, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).repartition(repartitioned).to(broadcastingOutputTopic);
        this.startStreams(builder, this.createStreamsConfig(topologyOptimization));
        String topic = this.toRepartitionTopicName("broadcasting-partitioner-test");
        this.validateReceivedMessages((Deserializer)new IntegerDeserializer(), (Deserializer)new StringDeserializer(), expectedRecordsOnRepartition, topic);
        this.validateReceivedMessages((Deserializer)new IntegerDeserializer(), (Deserializer)new StringDeserializer(), expectedRecordsOnRepartition, broadcastingOutputTopic);
        Assertions.assertTrue((boolean)this.topicExists(topic));
        Assertions.assertEquals((int)expectedRecords.size(), (int)partitionerInvocation.get());
    }

    @ParameterizedTest
    @ValueSource(strings={"all", "none"})
    public void shouldUseStreamPartitionerForRepartitionOperation(String topologyOptimization) throws Exception {
        boolean partition = true;
        String repartitionName = "partitioner-test";
        long timestamp = System.currentTimeMillis();
        AtomicInteger partitionerInvocation = new AtomicInteger(0);
        List expectedRecords = Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)2, (Object)"B"));
        this.sendEvents(timestamp, expectedRecords);
        StreamsBuilder builder = new StreamsBuilder();
        Repartitioned repartitioned = Repartitioned.as((String)"partitioner-test").withStreamPartitioner((topic, key, value, numPartitions) -> {
            partitionerInvocation.incrementAndGet();
            return 1;
        });
        builder.stream(this.inputTopic, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).repartition(repartitioned).to(this.outputTopic);
        this.startStreams(builder, this.createStreamsConfig(topologyOptimization));
        String topic2 = this.toRepartitionTopicName("partitioner-test");
        this.validateReceivedMessages((Deserializer)new IntegerDeserializer(), (Deserializer)new StringDeserializer(), expectedRecords);
        Assertions.assertTrue((boolean)this.topicExists(topic2));
        Assertions.assertEquals((int)expectedRecords.size(), (int)partitionerInvocation.get());
    }

    @ParameterizedTest
    @ValueSource(strings={"all", "none"})
    public void shouldPerformSelectKeyWithRepartitionOperation(String topologyOptimization) throws Exception {
        long timestamp = System.currentTimeMillis();
        this.sendEvents(timestamp, Arrays.asList(new KeyValue((Object)1, (Object)"10"), new KeyValue((Object)2, (Object)"20")));
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(this.inputTopic, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).selectKey((key, value) -> Integer.valueOf(value)).repartition().to(this.outputTopic);
        this.startStreams(builder, this.createStreamsConfig(topologyOptimization));
        this.validateReceivedMessages((Deserializer)new IntegerDeserializer(), (Deserializer)new StringDeserializer(), Arrays.asList(new KeyValue((Object)10, (Object)"10"), new KeyValue((Object)20, (Object)"20")));
        String topology = builder.build().describe().toString();
        Assertions.assertEquals((int)1, (int)KStreamRepartitionIntegrationTest.countOccurrencesInTopology(topology, "Sink: .*-repartition.*"));
    }

    @ParameterizedTest
    @ValueSource(strings={"all", "none"})
    public void shouldCreateRepartitionTopicIfKeyChangingOperationWasNotPerformed(String topologyOptimization) throws Exception {
        String repartitionName = "dummy";
        long timestamp = System.currentTimeMillis();
        this.sendEvents(timestamp, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)2, (Object)"B")));
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(this.inputTopic, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).repartition(Repartitioned.as((String)"dummy")).to(this.outputTopic);
        this.startStreams(builder, this.createStreamsConfig(topologyOptimization));
        this.validateReceivedMessages((Deserializer)new IntegerDeserializer(), (Deserializer)new StringDeserializer(), Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)2, (Object)"B")));
        String topology = builder.build().describe().toString();
        Assertions.assertTrue((boolean)this.topicExists(this.toRepartitionTopicName("dummy")));
        Assertions.assertEquals((int)1, (int)KStreamRepartitionIntegrationTest.countOccurrencesInTopology(topology, "Sink: .*dummy-repartition.*"));
    }

    @ParameterizedTest
    @ValueSource(strings={"all", "none"})
    public void shouldPerformKeySelectOperationWhenRepartitionOperationIsUsedWithKeySelector(String topologyOptimization) throws Exception {
        String repartitionedName = "new-key";
        long timestamp = System.currentTimeMillis();
        this.sendEvents(timestamp, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)2, (Object)"B")));
        StreamsBuilder builder = new StreamsBuilder();
        Repartitioned repartitioned = Repartitioned.as((String)"new-key").withKeySerde(Serdes.String());
        builder.stream(this.inputTopic, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).selectKey((key, value) -> key.toString(), Named.as((String)"new-key")).repartition(repartitioned).groupByKey().count().toStream().to(this.outputTopic);
        this.startStreams(builder, this.createStreamsConfig(topologyOptimization));
        this.validateReceivedMessages((Deserializer)new StringDeserializer(), (Deserializer)new LongDeserializer(), Arrays.asList(new KeyValue((Object)"1", (Object)1L), new KeyValue((Object)"2", (Object)1L)));
        String topology = builder.build().describe().toString();
        String repartitionTopicName = this.toRepartitionTopicName("new-key");
        Assertions.assertTrue((boolean)this.topicExists(repartitionTopicName));
        Assertions.assertEquals((int)1, (int)KStreamRepartitionIntegrationTest.countOccurrencesInTopology(topology, "Sink: .*new-key-repartition.*"));
        Assertions.assertEquals((int)1, (int)KStreamRepartitionIntegrationTest.countOccurrencesInTopology(topology, "<-- new-key\n"));
    }

    @ParameterizedTest
    @ValueSource(strings={"all", "none"})
    public void shouldCreateRepartitionTopicWithSpecifiedNumberOfPartitions(String topologyOptimization) throws Exception {
        String repartitionName = "new-partitions";
        long timestamp = System.currentTimeMillis();
        this.sendEvents(timestamp, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)2, (Object)"B")));
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(this.inputTopic, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).repartition(Repartitioned.as((String)"new-partitions").withNumberOfPartitions(2)).groupByKey().count().toStream().to(this.outputTopic);
        this.startStreams(builder, this.createStreamsConfig(topologyOptimization));
        this.validateReceivedMessages((Deserializer)new IntegerDeserializer(), (Deserializer)new LongDeserializer(), Arrays.asList(new KeyValue((Object)1, (Object)1L), new KeyValue((Object)2, (Object)1L)));
        String repartitionTopicName = this.toRepartitionTopicName("new-partitions");
        Assertions.assertTrue((boolean)this.topicExists(repartitionTopicName));
        Assertions.assertEquals((int)2, (int)this.getNumberOfPartitionsForTopic(repartitionTopicName));
    }

    @ParameterizedTest
    @ValueSource(strings={"all", "none"})
    public void shouldInheritRepartitionTopicPartitionNumberFromUpstreamTopicWhenNumberOfPartitionsIsNotSpecified(String topologyOptimization) throws Exception {
        String repartitionName = "new-topic";
        long timestamp = System.currentTimeMillis();
        this.sendEvents(timestamp, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)2, (Object)"B")));
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(this.inputTopic, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).repartition(Repartitioned.as((String)"new-topic")).groupByKey().count().toStream().to(this.outputTopic);
        this.startStreams(builder, this.createStreamsConfig(topologyOptimization));
        this.validateReceivedMessages((Deserializer)new IntegerDeserializer(), (Deserializer)new LongDeserializer(), Arrays.asList(new KeyValue((Object)1, (Object)1L), new KeyValue((Object)2, (Object)1L)));
        String repartitionTopicName = this.toRepartitionTopicName("new-topic");
        Assertions.assertTrue((boolean)this.topicExists(repartitionTopicName));
        Assertions.assertEquals((int)4, (int)this.getNumberOfPartitionsForTopic(repartitionTopicName));
    }

    @ParameterizedTest
    @ValueSource(strings={"all", "none"})
    public void shouldCreateOnlyOneRepartitionTopicWhenRepartitionIsFollowedByGroupByKey(String topologyOptimization) throws Exception {
        String repartitionName = "new-partitions";
        long timestamp = System.currentTimeMillis();
        this.sendEvents(timestamp, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)2, (Object)"B")));
        StreamsBuilder builder = new StreamsBuilder();
        Repartitioned repartitioned = Repartitioned.as((String)"new-partitions").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()).withNumberOfPartitions(1);
        builder.stream(this.inputTopic, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).selectKey((key, value) -> key.toString()).repartition(repartitioned).groupByKey().count().toStream().to(this.outputTopic);
        this.startStreams(builder, this.createStreamsConfig(topologyOptimization));
        String topology = builder.build().describe().toString();
        this.validateReceivedMessages((Deserializer)new StringDeserializer(), (Deserializer)new LongDeserializer(), Arrays.asList(new KeyValue((Object)"1", (Object)1L), new KeyValue((Object)"2", (Object)1L)));
        Assertions.assertTrue((boolean)this.topicExists(this.toRepartitionTopicName("new-partitions")));
        Assertions.assertEquals((int)1, (int)KStreamRepartitionIntegrationTest.countOccurrencesInTopology(topology, "Sink: .*-repartition"));
    }

    @ParameterizedTest
    @ValueSource(strings={"all", "none"})
    public void shouldGenerateRepartitionTopicWhenNameIsNotSpecified(String topologyOptimization) throws Exception {
        long timestamp = System.currentTimeMillis();
        this.sendEvents(timestamp, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)2, (Object)"B")));
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(this.inputTopic, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).selectKey((key, value) -> key.toString()).repartition(Repartitioned.with((Serde)Serdes.String(), (Serde)Serdes.String())).to(this.outputTopic);
        this.startStreams(builder, this.createStreamsConfig(topologyOptimization));
        this.validateReceivedMessages((Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), Arrays.asList(new KeyValue((Object)"1", (Object)"A"), new KeyValue((Object)"2", (Object)"B")));
        String topology = builder.build().describe().toString();
        Assertions.assertEquals((int)1, (int)KStreamRepartitionIntegrationTest.countOccurrencesInTopology(topology, "Sink: .*-repartition"));
    }

    @ParameterizedTest
    @ValueSource(strings={"all", "none"})
    public void shouldGoThroughRebalancingCorrectly(String topologyOptimization) throws Exception {
        String repartitionName = "rebalancing-test";
        long timestamp = System.currentTimeMillis();
        this.sendEvents(timestamp, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)2, (Object)"B")));
        StreamsBuilder builder = new StreamsBuilder();
        Repartitioned repartitioned = Repartitioned.as((String)"rebalancing-test").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()).withNumberOfPartitions(2);
        builder.stream(this.inputTopic, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).selectKey((key, value) -> key.toString()).repartition(repartitioned).groupByKey().count().toStream().to(this.outputTopic);
        Properties streamsConfiguration = this.createStreamsConfig(topologyOptimization);
        this.startStreams(builder, streamsConfiguration);
        Properties streamsToCloseConfigs = new Properties();
        streamsToCloseConfigs.putAll((Map<?, ?>)streamsConfiguration);
        streamsToCloseConfigs.put("state.dir", TestUtils.tempDirectory().getPath() + "-2");
        KafkaStreams kafkaStreamsToClose = this.startStreams(builder, streamsToCloseConfigs);
        this.validateReceivedMessages((Deserializer)new StringDeserializer(), (Deserializer)new LongDeserializer(), Arrays.asList(new KeyValue((Object)"1", (Object)1L), new KeyValue((Object)"2", (Object)1L)));
        kafkaStreamsToClose.close();
        this.sendEvents(timestamp, Arrays.asList(new KeyValue((Object)1, (Object)"C"), new KeyValue((Object)2, (Object)"D")));
        this.validateReceivedMessages((Deserializer)new StringDeserializer(), (Deserializer)new LongDeserializer(), Arrays.asList(new KeyValue((Object)"1", (Object)2L), new KeyValue((Object)"2", (Object)2L)));
        String repartitionTopicName = this.toRepartitionTopicName("rebalancing-test");
        Assertions.assertTrue((boolean)this.topicExists(repartitionTopicName));
        Assertions.assertEquals((int)2, (int)this.getNumberOfPartitionsForTopic(repartitionTopicName));
    }

    private int getNumberOfPartitionsForTopic(String topic) throws Exception {
        try (Admin adminClient = KStreamRepartitionIntegrationTest.createAdminClient();){
            TopicDescription topicDescription = (TopicDescription)((KafkaFuture)adminClient.describeTopics(Collections.singleton(topic)).topicNameValues().get(topic)).get();
            int n = topicDescription.partitions().size();
            return n;
        }
    }

    private boolean topicExists(String topic) throws Exception {
        try (Admin adminClient = KStreamRepartitionIntegrationTest.createAdminClient();){
            Set topics = (Set)adminClient.listTopics().names().get();
            boolean bl = topics.contains(topic);
            return bl;
        }
    }

    private String toRepartitionTopicName(String input) {
        return this.applicationId + "-" + input + "-repartition";
    }

    private static Admin createAdminClient() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        return Admin.create((Properties)properties);
    }

    private static int countOccurrencesInTopology(String topologyString, String searchPattern) {
        Matcher matcher = Pattern.compile(searchPattern).matcher(topologyString);
        ArrayList<String> repartitionTopicsFound = new ArrayList<String>();
        while (matcher.find()) {
            repartitionTopicsFound.add(matcher.group());
        }
        return repartitionTopicsFound.size();
    }

    private void sendEvents(long timestamp, List<KeyValue<Integer, String>> events) {
        this.sendEvents(this.inputTopic, timestamp, events);
    }

    private void sendEvents(String topic, long timestamp, List<KeyValue<Integer, String>> events) {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, events, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, (Properties)new Properties()), timestamp);
    }

    private KafkaStreams startStreams(StreamsBuilder builder, Properties streamsConfiguration) throws InterruptedException {
        return this.startStreams(builder, KafkaStreams.State.REBALANCING, KafkaStreams.State.RUNNING, streamsConfiguration, null);
    }

    private KafkaStreams startStreams(StreamsBuilder builder, KafkaStreams.State expectedOldState, KafkaStreams.State expectedNewState, Properties streamsConfiguration, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) throws InterruptedException {
        CountDownLatch latch;
        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(streamsConfiguration), streamsConfiguration);
        if (uncaughtExceptionHandler == null) {
            latch = new CountDownLatch(1);
        } else {
            latch = new CountDownLatch(2);
            kafkaStreams.setUncaughtExceptionHandler(e -> {
                uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
                latch.countDown();
                if (e instanceof RuntimeException) {
                    throw (RuntimeException)e;
                }
                if (e instanceof Error) {
                    throw (Error)e;
                }
                throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", e);
            });
        }
        kafkaStreams.setStateListener((newState, oldState) -> {
            if (expectedOldState == oldState && expectedNewState == newState) {
                latch.countDown();
            }
        });
        kafkaStreams.start();
        latch.await(60000L, TimeUnit.MILLISECONDS);
        this.kafkaStreamsInstances.add(kafkaStreams);
        return kafkaStreams;
    }

    private <K, V> void validateReceivedMessages(Deserializer<K> keySerializer, Deserializer<V> valueSerializer, List<KeyValue<K, V>> expectedRecords) throws Exception {
        this.validateReceivedMessages(keySerializer, valueSerializer, expectedRecords, this.outputTopic);
    }

    private <K, V> void validateReceivedMessages(Deserializer<K> keySerializer, Deserializer<V> valueSerializer, List<KeyValue<K, V>> expectedRecords, String outputTopic) throws Exception {
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        consumerProperties.setProperty("group.id", "group-" + this.safeTestName);
        consumerProperties.setProperty("auto.offset.reset", "earliest");
        consumerProperties.setProperty("key.deserializer", keySerializer.getClass().getName());
        consumerProperties.setProperty("value.deserializer", valueSerializer.getClass().getName());
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerProperties, outputTopic, expectedRecords);
    }
}

