/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsMetadata;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.namedtopology.AddNamedTopologyResult;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStoreQueryParameters;
import org.apache.kafka.streams.processor.internals.namedtopology.RemoveNamedTopologyResult;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.StreamsMetadataImpl;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;

@Category(value={IntegrationTest.class})
public class NamedTopologyIntegrationTest {
    @Rule
    public Timeout globalTimeout = Timeout.seconds((long)600L);
    private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45L);
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final String TOPOLOGY_1 = "topology-1";
    private static final String TOPOLOGY_2 = "topology-2";
    private static final String TOPOLOGY_3 = "topology-3";
    private static final String INPUT_STREAM_1 = "input-stream-1";
    private static final String INPUT_STREAM_2 = "input-stream-2";
    private static final String INPUT_STREAM_3 = "input-stream-3";
    private static final String OUTPUT_STREAM_1 = "output-stream-1";
    private static final String OUTPUT_STREAM_2 = "output-stream-2";
    private static final String OUTPUT_STREAM_3 = "output-stream-3";
    private static final String SUM_OUTPUT = "sum";
    private static final String COUNT_OUTPUT = "count";
    private static final String DELAYED_INPUT_STREAM_1 = "delayed-input-stream-1";
    private static final String DELAYED_INPUT_STREAM_2 = "delayed-input-stream-2";
    private static final String NEW_STREAM = "new-stream";
    private static final String EXISTING_STREAM = "existing-stream";
    private static final String SINGLE_PARTITION_INPUT_STREAM = "single-partition-input-stream";
    private static final String SINGLE_PARTITION_OUTPUT_STREAM = "single-partition-output-stream";
    private static final Materialized<Object, Long, KeyValueStore<Bytes, byte[]>> IN_MEMORY_STORE = Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"store"));
    private static final Materialized<Object, Long, KeyValueStore<Bytes, byte[]>> ROCKSDB_STORE = Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)"store"));
    private static Properties producerConfig;
    private static Properties consumerConfig;
    @Rule
    public final TestName testName = new TestName();
    private String appId;
    private String changelog1;
    private String changelog2;
    private String changelog3;
    private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA;
    private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA;
    private static final List<KeyValue<String, Long>> SUM_OUTPUT_DATA;
    private static final String TOPIC_PREFIX = "unique_topic_prefix";
    private final KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier();
    private Properties props;
    private Properties props2;
    private KafkaStreamsNamedTopologyWrapper streams;
    private KafkaStreamsNamedTopologyWrapper streams2;
    private NamedTopologyBuilder topology1Builder;
    private NamedTopologyBuilder topology1BuilderDup;
    private NamedTopologyBuilder topology2Builder;
    private NamedTopologyBuilder topology3Builder;
    private NamedTopologyBuilder topology1Builder2;
    private NamedTopologyBuilder topology2Builder2;

    @BeforeClass
    public static void initializeClusterAndStandardTopics() throws Exception {
        CLUSTER.start();
        CLUSTER.createTopic(INPUT_STREAM_1, 2, 1);
        CLUSTER.createTopic(INPUT_STREAM_2, 2, 1);
        CLUSTER.createTopic(INPUT_STREAM_3, 2, 1);
        producerConfig = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class);
        consumerConfig = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), StringDeserializer.class, LongDeserializer.class);
        NamedTopologyIntegrationTest.produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        NamedTopologyIntegrationTest.produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
        NamedTopologyIntegrationTest.produceToInputTopics(INPUT_STREAM_3, STANDARD_INPUT_DATA);
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    private Properties configProps(String appId, String host) {
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("application.id", appId);
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("state.dir", TestUtils.tempDirectory((String)appId).getPath());
        streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        streamsConfiguration.put("default.value.serde", Serdes.Long().getClass());
        streamsConfiguration.put("num.stream.threads", (Object)2);
        streamsConfiguration.put("application.server", host + ":2020");
        streamsConfiguration.put("commit.interval.ms", (Object)1000L);
        streamsConfiguration.put("auto.offset.reset", "earliest");
        streamsConfiguration.put("session.timeout.ms", (Object)10000);
        streamsConfiguration.put("__internal.override.topic.prefix__", TOPIC_PREFIX);
        return streamsConfiguration;
    }

    @Before
    public void setup() throws Exception {
        this.appId = IntegrationTestUtils.safeUniqueTestName(NamedTopologyIntegrationTest.class, this.testName);
        this.changelog1 = "unique_topic_prefix-topology-1-store-changelog";
        this.changelog2 = "unique_topic_prefix-topology-2-store-changelog";
        this.changelog3 = "unique_topic_prefix-topology-3-store-changelog";
        this.props = this.configProps(this.appId, "host1");
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.props, this.clientSupplier);
        this.topology1Builder = this.streams.newNamedTopologyBuilder(TOPOLOGY_1);
        this.topology1BuilderDup = this.streams.newNamedTopologyBuilder(TOPOLOGY_1);
        this.topology2Builder = this.streams.newNamedTopologyBuilder(TOPOLOGY_2);
        this.topology3Builder = this.streams.newNamedTopologyBuilder(TOPOLOGY_3);
        CLUSTER.createTopic(OUTPUT_STREAM_1, 2, 1);
        CLUSTER.createTopic(OUTPUT_STREAM_2, 2, 1);
        CLUSTER.createTopic(OUTPUT_STREAM_3, 2, 1);
    }

    private void setupSecondKafkaStreams() {
        this.props2 = this.configProps(this.appId, "host2");
        this.streams2 = new KafkaStreamsNamedTopologyWrapper(this.props2, this.clientSupplier);
        this.topology1Builder2 = this.streams2.newNamedTopologyBuilder(TOPOLOGY_1);
        this.topology2Builder2 = this.streams2.newNamedTopologyBuilder(TOPOLOGY_2);
    }

    @After
    public void shutdown() throws Exception {
        if (this.streams != null) {
            this.streams.close(Duration.ofSeconds(30L));
        }
        if (this.streams2 != null) {
            this.streams2.close(Duration.ofSeconds(30L));
        }
        CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("-changelog") || t.contains("-repartition")).forEach(t -> {
            try {
                MatcherAssert.assertThat((String)"topic was not decorated", (boolean)t.contains(TOPIC_PREFIX));
                CLUSTER.deleteTopicsAndWait((String)t);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        CLUSTER.deleteTopicsAndWait(OUTPUT_STREAM_1, OUTPUT_STREAM_2, OUTPUT_STREAM_3);
        CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
    }

    @Test
    public void shouldPrefixAllInternalTopicNamesWithNamedTopology() throws Exception {
        String countTopologyName = "count-topology";
        String fkjTopologyName = "FKJ-topology";
        NamedTopologyBuilder countBuilder = this.streams.newNamedTopologyBuilder("count-topology");
        countBuilder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count();
        NamedTopologyBuilder fkjBuilder = this.streams.newNamedTopologyBuilder("FKJ-topology");
        UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
        KTable left = fkjBuilder.table(INPUT_STREAM_2, Consumed.with(serdeScope.decorateSerde(Serdes.String(), this.props, true), serdeScope.decorateSerde(Serdes.Long(), this.props, false)));
        KTable right = fkjBuilder.table(INPUT_STREAM_3, Consumed.with(serdeScope.decorateSerde(Serdes.String(), this.props, true), serdeScope.decorateSerde(Serdes.Long(), this.props, false)));
        left.join(right, Object::toString, (value1, value2) -> String.valueOf(value1 + value2), Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), this.props, false)));
        this.streams.start(Arrays.asList(fkjBuilder.build(), countBuilder.build()));
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.streams), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
        String countTopicPrefix = "unique_topic_prefix-count-topology";
        String fkjTopicPrefix = "unique_topic_prefix-FKJ-topology";
        Set internalTopics = CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains(TOPIC_PREFIX)).filter(t -> t.endsWith("-repartition") || t.endsWith("-changelog") || t.endsWith("-topic")).collect(Collectors.toSet());
        MatcherAssert.assertThat(internalTopics, (Matcher)CoreMatchers.is((Object)Utils.mkSet((Object[])new String[]{"unique_topic_prefix-count-topology-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition", "unique_topic_prefix-count-topology-KSTREAM-AGGREGATE-STATE-STORE-0000000002-changelog", "unique_topic_prefix-FKJ-topology-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic", "unique_topic_prefix-FKJ-topology-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic", "unique_topic_prefix-FKJ-topology-KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000010-changelog", "unique_topic_prefix-FKJ-topology-input-stream-2-STATE-STORE-0000000000-changelog", "unique_topic_prefix-FKJ-topology-input-stream-3-STATE-STORE-0000000003-changelog"})));
    }

    @Test
    public void shouldProcessSingleNamedTopologyAndPrefixInternalTopics() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).selectKey((k, v) -> k).groupByKey().count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_1);
        this.streams.start(this.topology1Builder.build());
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.streams), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
        List results = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3);
        MatcherAssert.assertThat(results, (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        Set<String> allTopics = CLUSTER.getAllTopicsInCluster();
        MatcherAssert.assertThat((Object)allTopics.contains("unique_topic_prefix-topology-1-store-changelog"), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)allTopics.contains("unique_topic_prefix-topology-1-store-repartition"), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void shouldProcessMultipleIdenticalNamedTopologiesWithInMemoryAndPersistentStateStores() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(INPUT_STREAM_2).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
        this.topology3Builder.stream(INPUT_STREAM_3).groupBy((k, v) -> k).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_3);
        this.streams.start(Arrays.asList(this.topology1Builder.build(), this.topology2Builder.build(), this.topology3Builder.build()));
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.streams), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat((Object)CLUSTER.getAllTopicsInCluster().containsAll(Arrays.asList(this.changelog1, this.changelog2, this.changelog3)), (Matcher)CoreMatchers.is((Object)true));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology() throws Exception {
        try {
            CLUSTER.createTopic(SINGLE_PARTITION_INPUT_STREAM, 1, 1);
            CLUSTER.createTopic(SINGLE_PARTITION_OUTPUT_STREAM, 1, 1);
            NamedTopologyIntegrationTest.produceToInputTopics(SINGLE_PARTITION_INPUT_STREAM, STANDARD_INPUT_DATA);
            String topology1Store = "store-topology-1";
            String topology2Store = "store-topology-2";
            this.topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(Materialized.as((String)"store-topology-1")).toStream().to(OUTPUT_STREAM_1);
            this.topology2Builder.stream(SINGLE_PARTITION_INPUT_STREAM).groupByKey().count(Materialized.as((String)"store-topology-2")).toStream().to(SINGLE_PARTITION_OUTPUT_STREAM);
            this.streams.addNamedTopology(this.topology1Builder.build());
            this.streams.removeNamedTopology(TOPOLOGY_1);
            MatcherAssert.assertThat((Object)this.streams.getTopologyByName(TOPOLOGY_1), (Matcher)CoreMatchers.is(Optional.empty()));
            this.streams.addNamedTopology(this.topology1Builder.build());
            this.streams.addNamedTopology(this.topology2Builder.build());
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(15L));
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, SINGLE_PARTITION_OUTPUT_STREAM, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
            ReadOnlyKeyValueStore store = (ReadOnlyKeyValueStore)this.streams.store(NamedTopologyStoreQueryParameters.fromNamedTopologyAndStoreNameAndType((String)TOPOLOGY_1, (String)"store-topology-1", (QueryableStoreType)QueryableStoreTypes.keyValueStore()));
            MatcherAssert.assertThat((Object)store.get((Object)"A"), (Matcher)CoreMatchers.equalTo((Object)2L));
            Collection streamsMetadata = this.streams.streamsMetadataForStore("store-topology-1", TOPOLOGY_1);
            Collection streamsMetadata2 = this.streams.streamsMetadataForStore("store-topology-2", TOPOLOGY_2);
            MatcherAssert.assertThat((Object)streamsMetadata.size(), (Matcher)CoreMatchers.equalTo((Object)1));
            MatcherAssert.assertThat((Object)streamsMetadata2.size(), (Matcher)CoreMatchers.equalTo((Object)1));
            KeyQueryMetadata keyMetadata = this.streams.queryMetadataForKey("store-topology-1", (Object)"A", (Serializer)new StringSerializer(), TOPOLOGY_1);
            KeyQueryMetadata keyMetadata2 = this.streams.queryMetadataForKey("store-topology-2", (Object)"A", (Serializer)new StringSerializer(), TOPOLOGY_2);
            MatcherAssert.assertThat((Object)keyMetadata, (Matcher)CoreMatchers.not((Object)KeyQueryMetadata.NOT_AVAILABLE));
            MatcherAssert.assertThat((Object)keyMetadata, (Matcher)CoreMatchers.equalTo((Object)keyMetadata2));
            Map partitionLags1 = this.streams.allLocalStorePartitionLagsForTopology(TOPOLOGY_1);
            Map partitionLags2 = this.streams.allLocalStorePartitionLagsForTopology(TOPOLOGY_2);
            MatcherAssert.assertThat(partitionLags1.keySet(), (Matcher)CoreMatchers.equalTo(Collections.singleton("store-topology-1")));
            MatcherAssert.assertThat(((Map)partitionLags1.get("store-topology-1")).keySet(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new Integer[]{0, 1})));
            MatcherAssert.assertThat(partitionLags2.keySet(), (Matcher)CoreMatchers.equalTo(Collections.singleton("store-topology-2")));
            MatcherAssert.assertThat(((Map)partitionLags2.get("store-topology-2")).keySet(), (Matcher)CoreMatchers.equalTo(Collections.singleton(0)));
            this.setupSecondKafkaStreams();
            this.topology1Builder2.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(Materialized.as((String)"store-topology-1")).toStream().to(OUTPUT_STREAM_1);
            this.topology2Builder2.stream(SINGLE_PARTITION_INPUT_STREAM).groupByKey().count(Materialized.as((String)"store-topology-2")).toStream().to(SINGLE_PARTITION_OUTPUT_STREAM);
            this.streams2.start(Arrays.asList(this.topology1Builder2.build(), this.topology2Builder2.build()));
            IntegrationTestUtils.waitForApplicationState(Arrays.asList(this.streams, this.streams2), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
            this.verifyMetadataForTopology(TOPOLOGY_1, this.streams.streamsMetadataForStore("store-topology-1", TOPOLOGY_1), this.streams2.streamsMetadataForStore("store-topology-1", TOPOLOGY_1));
            this.verifyMetadataForTopology(TOPOLOGY_2, this.streams.streamsMetadataForStore("store-topology-2", TOPOLOGY_2), this.streams2.streamsMetadataForStore("store-topology-2", TOPOLOGY_2));
            this.verifyMetadataForTopology(TOPOLOGY_1, this.streams.allStreamsClientsMetadataForTopology(TOPOLOGY_1), this.streams2.allStreamsClientsMetadataForTopology(TOPOLOGY_1));
            this.verifyMetadataForTopology(TOPOLOGY_2, this.streams.allStreamsClientsMetadataForTopology(TOPOLOGY_2), this.streams2.allStreamsClientsMetadataForTopology(TOPOLOGY_2));
        }
        catch (Throwable throwable) {
            CLUSTER.deleteTopics(SINGLE_PARTITION_INPUT_STREAM, SINGLE_PARTITION_OUTPUT_STREAM);
            throw throwable;
        }
        CLUSTER.deleteTopics(SINGLE_PARTITION_INPUT_STREAM, SINGLE_PARTITION_OUTPUT_STREAM);
    }

    @Test
    public void shouldAddNamedTopologyToRunningApplicationWithEmptyInitialTopology() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.streams.start();
        this.streams.addNamedTopology(this.topology1Builder.build()).all().get();
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldAddNamedTopologyToRunningApplicationWithSingleInitialNamedTopology() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(INPUT_STREAM_2).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
        this.streams.start(this.topology1Builder.build());
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.streams), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
        this.streams.addNamedTopology(this.topology2Builder.build()).all().get();
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldAddNamedTopologyToRunningApplicationWithMultipleInitialNamedTopologies() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(INPUT_STREAM_2).groupBy((k, v) -> k).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_2);
        this.topology3Builder.stream(INPUT_STREAM_3).groupBy((k, v) -> k).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_3);
        this.streams.start(Arrays.asList(this.topology1Builder.build(), this.topology2Builder.build()));
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.streams), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
        this.streams.addNamedTopology(this.topology3Builder.build()).all().get();
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldAddNamedTopologyToRunningApplicationWithMultipleNodes() throws Exception {
        this.setupSecondKafkaStreams();
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology1Builder2.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(INPUT_STREAM_2).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
        this.topology2Builder2.stream(INPUT_STREAM_2).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
        this.streams.start(this.topology1Builder.build());
        this.streams2.start(this.topology1Builder2.build());
        IntegrationTestUtils.waitForApplicationState(Arrays.asList(this.streams, this.streams2), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        AddNamedTopologyResult result = this.streams.addNamedTopology(this.topology2Builder.build());
        AddNamedTopologyResult result2 = this.streams2.addNamedTopology(this.topology2Builder2.build());
        result.all().get();
        result2.all().get();
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets() throws Exception {
        this.setupSecondKafkaStreams();
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology1Builder2.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.streams.start(this.topology1Builder.build());
        this.streams2.start(this.topology1Builder2.build());
        IntegrationTestUtils.waitForApplicationState(Arrays.asList(this.streams, this.streams2), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        RemoveNamedTopologyResult result = this.streams.removeNamedTopology(TOPOLOGY_1, true);
        this.streams2.removeNamedTopology(TOPOLOGY_1, true).all().get();
        result.all().get();
        MatcherAssert.assertThat((Object)this.streams.getTopologyByName(TOPOLOGY_1), (Matcher)CoreMatchers.equalTo(Optional.empty()));
        MatcherAssert.assertThat((Object)this.streams2.getTopologyByName(TOPOLOGY_1), (Matcher)CoreMatchers.equalTo(Optional.empty()));
        MatcherAssert.assertThat((Object)this.streams.getAllTopologies().isEmpty(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)this.streams2.getAllTopologies().isEmpty(), (Matcher)CoreMatchers.is((Object)true));
        this.streams.cleanUpNamedTopology(TOPOLOGY_1);
        this.streams2.cleanUpNamedTopology(TOPOLOGY_1);
        CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("-changelog")).forEach(t -> {
            try {
                CLUSTER.deleteTopicAndWait((String)t);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        this.topology2Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
        this.topology2Builder2.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
        NamedTopology topology2Client1 = this.topology2Builder.build();
        NamedTopology topology2Client2 = this.topology2Builder2.build();
        AddNamedTopologyResult result1 = this.streams.addNamedTopology(topology2Client1);
        this.streams2.addNamedTopology(topology2Client2).all().get();
        result1.all().get();
        MatcherAssert.assertThat((Object)this.streams.getAllTopologies(), (Matcher)CoreMatchers.equalTo(Collections.singleton(topology2Client1)));
        MatcherAssert.assertThat((Object)this.streams2.getAllTopologies(), (Matcher)CoreMatchers.equalTo(Collections.singleton(topology2Client2)));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing() throws Exception {
        CLUSTER.createTopic(DELAYED_INPUT_STREAM_1, 2, 1);
        CLUSTER.createTopic(DELAYED_INPUT_STREAM_2, 2, 1);
        try {
            this.topology1Builder.stream(DELAYED_INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
            this.topology2Builder.stream(DELAYED_INPUT_STREAM_2).map((k, v) -> {
                throw new IllegalStateException("Should not process any records for removed topology-2");
            });
            this.streams.start(Arrays.asList(this.topology1Builder.build(), this.topology2Builder.build()));
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.streams), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
            this.streams.removeNamedTopology(TOPOLOGY_2).all().get();
            NamedTopologyIntegrationTest.produceToInputTopics(DELAYED_INPUT_STREAM_1, STANDARD_INPUT_DATA);
            NamedTopologyIntegrationTest.produceToInputTopics(DELAYED_INPUT_STREAM_2, STANDARD_INPUT_DATA);
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        }
        catch (Throwable throwable) {
            CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1, DELAYED_INPUT_STREAM_2);
            throw throwable;
        }
        CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1, DELAYED_INPUT_STREAM_2);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldRemoveAndReplaceTopologicallyIncompatibleNamedTopology() throws Exception {
        CLUSTER.createTopics(SUM_OUTPUT, COUNT_OUTPUT);
        CLUSTER.createTopic(DELAYED_INPUT_STREAM_1, 2, 1);
        try {
            KStream inputStream1 = this.topology1Builder.stream(INPUT_STREAM_1);
            inputStream1.groupByKey().count().toStream().to(COUNT_OUTPUT);
            inputStream1.groupByKey().reduce(Long::sum).toStream().to(SUM_OUTPUT);
            this.streams.start(Collections.singletonList(this.topology1Builder.build()));
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.streams), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), (Matcher)CoreMatchers.equalTo(SUM_OUTPUT_DATA));
            this.streams.removeNamedTopology(TOPOLOGY_1).all().get();
            this.streams.cleanUpNamedTopology(TOPOLOGY_1);
            NamedTopologyBuilder topology1Builder2 = this.streams.newNamedTopologyBuilder(TOPOLOGY_1);
            KStream inputStream2 = topology1Builder2.stream(DELAYED_INPUT_STREAM_1);
            inputStream2.groupByKey().reduce(Long::sum).toStream().to(SUM_OUTPUT);
            inputStream2.groupByKey().count().toStream().to(COUNT_OUTPUT);
            NamedTopologyIntegrationTest.produceToInputTopics(DELAYED_INPUT_STREAM_1, STANDARD_INPUT_DATA);
            this.streams.addNamedTopology(topology1Builder2.build()).all().get();
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), (Matcher)CoreMatchers.equalTo(SUM_OUTPUT_DATA));
        }
        catch (Throwable throwable) {
            CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
            CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1);
            throw throwable;
        }
        CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
        CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1);
    }

    @Test
    public void shouldAllowPatternSubscriptionWithMultipleNamedTopologies() throws Exception {
        this.topology1Builder.stream(Pattern.compile(INPUT_STREAM_1)).groupBy((k, v) -> k).count().toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(Pattern.compile(INPUT_STREAM_2)).groupBy((k, v) -> k).count().toStream().to(OUTPUT_STREAM_2);
        this.topology3Builder.stream(Pattern.compile(INPUT_STREAM_3)).groupBy((k, v) -> k).count().toStream().to(OUTPUT_STREAM_3);
        this.streams.start(Arrays.asList(this.topology1Builder.build(), this.topology2Builder.build(), this.topology3Builder.build()));
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.streams), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldAllowMixedCollectionAndPatternSubscriptionWithMultipleNamedTopologies() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count().toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(Pattern.compile(INPUT_STREAM_2)).groupBy((k, v) -> k).count().toStream().to(OUTPUT_STREAM_2);
        this.topology3Builder.stream(Pattern.compile(INPUT_STREAM_3)).groupBy((k, v) -> k).count().toStream().to(OUTPUT_STREAM_3);
        this.streams.start(Arrays.asList(this.topology1Builder.build(), this.topology2Builder.build(), this.topology3Builder.build()));
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.streams), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopology() throws Exception {
        CLUSTER.createTopics(SUM_OUTPUT, COUNT_OUTPUT);
        try {
            KStream inputStream1 = this.topology1Builder.stream(INPUT_STREAM_1);
            inputStream1.groupByKey().count().toStream().to(COUNT_OUTPUT);
            inputStream1.groupByKey().reduce(Long::sum).toStream().to(SUM_OUTPUT);
            this.streams.start();
            NamedTopology namedTopology = this.topology1Builder.build();
            this.streams.addNamedTopology(namedTopology).all().get();
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), (Matcher)CoreMatchers.equalTo(SUM_OUTPUT_DATA));
            this.streams.removeNamedTopology(TOPOLOGY_1, true).all().get();
            this.streams.cleanUpNamedTopology(TOPOLOGY_1);
            CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("changelog")).forEach(t -> {
                try {
                    CLUSTER.deleteTopicAndWait((String)t);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            KStream inputStream = this.topology1BuilderDup.stream(INPUT_STREAM_1);
            inputStream.groupByKey().count().toStream().to(COUNT_OUTPUT);
            inputStream.groupByKey().reduce(Long::sum).toStream().to(SUM_OUTPUT);
            NamedTopology namedTopologyDup = this.topology1BuilderDup.build();
            this.streams.addNamedTopology(namedTopologyDup).all().get();
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), (Matcher)CoreMatchers.equalTo(SUM_OUTPUT_DATA));
        }
        catch (Throwable throwable) {
            CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
            throw throwable;
        }
        CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
    }

    @Test
    public void shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning() throws Exception {
        CLUSTER.createTopics(SUM_OUTPUT, COUNT_OUTPUT);
        KStream inputStream1 = this.topology1Builder.stream(INPUT_STREAM_1);
        inputStream1.map(KeyValue::new).groupByKey().count().toStream().to(COUNT_OUTPUT);
        inputStream1.map(KeyValue::new).groupByKey().reduce(Long::sum).toStream().to(SUM_OUTPUT);
        this.streams.start();
        NamedTopology namedTopology = this.topology1Builder.build();
        this.streams.addNamedTopology(namedTopology).all().get();
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), (Matcher)CoreMatchers.equalTo(SUM_OUTPUT_DATA));
        this.streams.removeNamedTopology(TOPOLOGY_1, true).all().get();
        this.streams.cleanUpNamedTopology(TOPOLOGY_1);
        CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("-changelog") || t.contains("-repartition")).forEach(t -> {
            try {
                CLUSTER.deleteTopicsAndWait((String)t);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        KStream inputStream = this.topology1BuilderDup.stream(INPUT_STREAM_1);
        inputStream.map(KeyValue::new).groupByKey().count().toStream().to(COUNT_OUTPUT);
        inputStream.map(KeyValue::new).groupByKey().reduce(Long::sum).toStream().to(SUM_OUTPUT);
        NamedTopology namedTopologyDup = this.topology1BuilderDup.build();
        this.streams.addNamedTopology(namedTopologyDup).all().get();
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), (Matcher)CoreMatchers.equalTo(SUM_OUTPUT_DATA));
        CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics() throws Exception {
        try {
            CLUSTER.createTopic(EXISTING_STREAM, 2, 1);
            NamedTopologyIntegrationTest.produceToInputTopics(EXISTING_STREAM, STANDARD_INPUT_DATA);
            this.setupSecondKafkaStreams();
            this.topology1Builder.stream(EXISTING_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
            this.topology1Builder2.stream(EXISTING_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
            TrackingExceptionHandler handler = new TrackingExceptionHandler();
            this.streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler)handler);
            this.streams2.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler)handler);
            this.streams.start(this.topology1Builder.build());
            this.streams2.start(this.topology1Builder2.build());
            IntegrationTestUtils.waitForApplicationState(Arrays.asList(this.streams, this.streams2), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
            this.topology2Builder.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
            this.topology2Builder2.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
            MatcherAssert.assertThat((Object)handler.nextError(TOPOLOGY_2), (Matcher)CoreMatchers.nullValue());
            this.streams.addNamedTopology(this.topology2Builder.build());
            this.streams2.addNamedTopology(this.topology2Builder2.build());
            TestUtils.retryOnExceptionWithTimeout(() -> {
                Throwable error = handler.nextError(TOPOLOGY_2);
                MatcherAssert.assertThat((Object)error, (Matcher)CoreMatchers.notNullValue());
                MatcherAssert.assertThat(error.getCause().getClass(), (Matcher)CoreMatchers.is(MissingSourceTopicException.class));
            });
            NamedTopologyIntegrationTest.produceToInputTopics(EXISTING_STREAM, Collections.singletonList(KeyValue.pair((Object)"A", (Object)30L)));
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 1), (Matcher)CoreMatchers.equalTo(Collections.singletonList(KeyValue.pair((Object)"A", (Object)3L))));
            CLUSTER.createTopic(NEW_STREAM, 2, 1);
            NamedTopologyIntegrationTest.produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA);
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
            MatcherAssert.assertThat((Object)this.streams.metadataForLocalThreads().size(), (Matcher)CoreMatchers.equalTo((Object)2));
            MatcherAssert.assertThat((Object)this.streams2.metadataForLocalThreads().size(), (Matcher)CoreMatchers.equalTo((Object)2));
            Set localThreadsNames = this.streams.metadataForLocalThreads().stream().map(t -> ClientUtils.extractThreadId((String)t.threadName())).collect(Collectors.toSet());
            Set localThreadsNames2 = this.streams2.metadataForLocalThreads().stream().map(t -> ClientUtils.extractThreadId((String)t.threadName())).collect(Collectors.toSet());
            MatcherAssert.assertThat((Object)localThreadsNames.contains("StreamThread-1"), (Matcher)CoreMatchers.is((Object)true));
            MatcherAssert.assertThat((Object)localThreadsNames.contains("StreamThread-2"), (Matcher)CoreMatchers.is((Object)true));
            MatcherAssert.assertThat((Object)localThreadsNames2.contains("StreamThread-1"), (Matcher)CoreMatchers.is((Object)true));
            MatcherAssert.assertThat((Object)localThreadsNames2.contains("StreamThread-2"), (Matcher)CoreMatchers.is((Object)true));
        }
        catch (Throwable throwable) {
            CLUSTER.deleteTopicsAndWait(EXISTING_STREAM, NEW_STREAM);
            throw throwable;
        }
        CLUSTER.deleteTopicsAndWait(EXISTING_STREAM, NEW_STREAM);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldWaitForMissingInputTopicsToBeCreated() throws Exception {
        this.setupSecondKafkaStreams();
        this.topology1Builder.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology1Builder2.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        TrackingExceptionHandler handler = new TrackingExceptionHandler();
        this.streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler)handler);
        this.streams2.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler)handler);
        this.streams.start(this.topology1Builder.build());
        this.streams2.start(this.topology1Builder2.build());
        IntegrationTestUtils.waitForApplicationState(Arrays.asList(this.streams, this.streams2), KafkaStreams.State.RUNNING, STARTUP_TIMEOUT);
        TestUtils.retryOnExceptionWithTimeout(() -> {
            Throwable error = handler.nextError(TOPOLOGY_1);
            MatcherAssert.assertThat((Object)error, (Matcher)CoreMatchers.notNullValue());
            MatcherAssert.assertThat(error.getCause().getClass(), (Matcher)CoreMatchers.is(MissingSourceTopicException.class));
        });
        try {
            CLUSTER.createTopic(NEW_STREAM, 2, 1);
            NamedTopologyIntegrationTest.produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA);
            List output = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3);
            output.retainAll(COUNT_OUTPUT_DATA);
            MatcherAssert.assertThat(output, (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
            MatcherAssert.assertThat((Object)this.streams.metadataForLocalThreads().size(), (Matcher)CoreMatchers.equalTo((Object)2));
            MatcherAssert.assertThat((Object)this.streams2.metadataForLocalThreads().size(), (Matcher)CoreMatchers.equalTo((Object)2));
            Set localThreadsNames = this.streams.metadataForLocalThreads().stream().map(t -> ClientUtils.extractThreadId((String)t.threadName())).collect(Collectors.toSet());
            Set localThreadsNames2 = this.streams2.metadataForLocalThreads().stream().map(t -> ClientUtils.extractThreadId((String)t.threadName())).collect(Collectors.toSet());
            MatcherAssert.assertThat((Object)localThreadsNames.contains("StreamThread-1"), (Matcher)CoreMatchers.is((Object)true));
            MatcherAssert.assertThat((Object)localThreadsNames.contains("StreamThread-2"), (Matcher)CoreMatchers.is((Object)true));
            MatcherAssert.assertThat((Object)localThreadsNames2.contains("StreamThread-1"), (Matcher)CoreMatchers.is((Object)true));
            MatcherAssert.assertThat((Object)localThreadsNames2.contains("StreamThread-2"), (Matcher)CoreMatchers.is((Object)true));
        }
        catch (Throwable throwable) {
            CLUSTER.deleteTopicsAndWait(NEW_STREAM);
            throw throwable;
        }
        CLUSTER.deleteTopicsAndWait(NEW_STREAM);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldBackOffTaskAndEmitDataWithinSameTopology() throws Exception {
        CLUSTER.createTopic(DELAYED_INPUT_STREAM_1, 2, 1);
        CLUSTER.createTopic(DELAYED_INPUT_STREAM_2, 2, 1);
        try {
            AtomicInteger noOutputExpected = new AtomicInteger(0);
            AtomicInteger outputExpected = new AtomicInteger(0);
            this.props.put("cache.max.bytes.buffering", (Object)0);
            this.props.put("commit.interval.ms", (Object)15000L);
            this.props.put("state.dir", TestUtils.tempDirectory((String)this.appId).getPath());
            this.props.put("default.key.serde", Serdes.IntegerSerde.class);
            this.props.put("default.value.serde", Serdes.StringSerde.class);
            this.streams = new KafkaStreamsNamedTopologyWrapper(this.props);
            this.streams.setUncaughtExceptionHandler(exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD);
            NamedTopologyBuilder builder = this.streams.newNamedTopologyBuilder("topology_A");
            builder.stream(DELAYED_INPUT_STREAM_1).peek((k, v) -> outputExpected.incrementAndGet()).to(OUTPUT_STREAM_1);
            builder.stream(DELAYED_INPUT_STREAM_2).peek((k, v) -> {
                throw new RuntimeException("Kaboom");
            }).peek((k, v) -> noOutputExpected.incrementAndGet()).to(OUTPUT_STREAM_2);
            this.streams.addNamedTopology(builder.build());
            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState((KafkaStreams)this.streams);
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(DELAYED_INPUT_STREAM_2, Arrays.asList(new KeyValue((Object)1, (Object)"A")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, (Properties)new Properties()), 0L);
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(DELAYED_INPUT_STREAM_1, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)1, (Object)"B")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, (Properties)new Properties()), 0L);
            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), IntegerDeserializer.class, StringDeserializer.class), OUTPUT_STREAM_1, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)1, (Object)"B")));
            MatcherAssert.assertThat((Object)noOutputExpected.get(), (Matcher)CoreMatchers.equalTo((Object)0));
            MatcherAssert.assertThat((Object)outputExpected.get(), (Matcher)CoreMatchers.equalTo((Object)2));
        }
        catch (Throwable throwable) {
            CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1, DELAYED_INPUT_STREAM_2);
            throw throwable;
        }
        CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1, DELAYED_INPUT_STREAM_2);
    }

    private void verifyMetadataForTopology(String topologyName, Collection<StreamsMetadata> left, Collection<StreamsMetadata> right) {
        MatcherAssert.assertThat((Object)left.size(), (Matcher)CoreMatchers.equalTo((Object)right.size()));
        Iterator<StreamsMetadata> leftIter = left.iterator();
        Iterator<StreamsMetadata> rightIter = right.iterator();
        while (leftIter.hasNext()) {
            StreamsMetadataImpl leftMetadata = (StreamsMetadataImpl)leftIter.next();
            StreamsMetadataImpl rightMetadata = (StreamsMetadataImpl)rightIter.next();
            this.verifyPartitionsAndStoresForTopology(topologyName, leftMetadata);
            this.verifyPartitionsAndStoresForTopology(topologyName, rightMetadata);
            MatcherAssert.assertThat((Object)NamedTopologyIntegrationTest.verifyEquivalentMetadataForHost(leftMetadata, rightMetadata), (Matcher)CoreMatchers.is((Object)true));
        }
    }

    private void verifyPartitionsAndStoresForTopology(String topologyName, StreamsMetadataImpl metadata) {
        MatcherAssert.assertThat((Object)metadata.topologyName(), (Matcher)CoreMatchers.equalTo((Object)topologyName));
        MatcherAssert.assertThat((Object)this.streams.getTopologyByName(topologyName).isPresent(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)this.streams2.getTopologyByName(topologyName).isPresent(), (Matcher)CoreMatchers.is((Object)true));
        List streams1SourceTopicsForTopology = ((NamedTopology)this.streams.getTopologyByName(topologyName).get()).sourceTopics();
        List streams2SourceTopicsForTopology = ((NamedTopology)this.streams2.getTopologyByName(topologyName).get()).sourceTopics();
        MatcherAssert.assertThat((Object)streams1SourceTopicsForTopology.containsAll(metadata.topicPartitions().stream().map(TopicPartition::topic).collect(Collectors.toList())), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)streams2SourceTopicsForTopology.containsAll(metadata.topicPartitions().stream().map(TopicPartition::topic).collect(Collectors.toList())), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)metadata.stateStoreNames(), (Matcher)CoreMatchers.equalTo(Collections.singleton("store-" + topologyName)));
        MatcherAssert.assertThat((Object)metadata.standbyTopicPartitions().isEmpty(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)metadata.standbyStateStoreNames().isEmpty(), (Matcher)CoreMatchers.is((Object)true));
    }

    private static boolean verifyEquivalentMetadataForHost(StreamsMetadataImpl left, StreamsMetadataImpl right) {
        return left.hostInfo().equals((Object)right.hostInfo()) && left.stateStoreNames().equals(right.stateStoreNames()) && left.topicPartitions().equals(right.topicPartitions()) && left.standbyStateStoreNames().equals(right.standbyStateStoreNames()) && left.standbyTopicPartitions().equals(right.standbyTopicPartitions());
    }

    private static void produceToInputTopics(String topic, Collection<KeyValue<String, Long>> records) {
        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, (Time)NamedTopologyIntegrationTest.CLUSTER.time);
    }

    static {
        STANDARD_INPUT_DATA = Arrays.asList(KeyValue.pair((Object)"A", (Object)100L), KeyValue.pair((Object)"B", (Object)200L), KeyValue.pair((Object)"A", (Object)300L), KeyValue.pair((Object)"C", (Object)400L), KeyValue.pair((Object)"C", (Object)-50L));
        COUNT_OUTPUT_DATA = Arrays.asList(KeyValue.pair((Object)"B", (Object)1L), KeyValue.pair((Object)"A", (Object)2L), KeyValue.pair((Object)"C", (Object)2L));
        SUM_OUTPUT_DATA = Arrays.asList(KeyValue.pair((Object)"B", (Object)200L), KeyValue.pair((Object)"A", (Object)400L), KeyValue.pair((Object)"C", (Object)350L));
    }

    private static class TrackingExceptionHandler
    implements StreamsUncaughtExceptionHandler {
        private final Map<String, Queue<Throwable>> newErrorsByTopology = new HashMap<String, Queue<Throwable>>();

        private TrackingExceptionHandler() {
        }

        public synchronized StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse handle(Throwable exception) {
            String topologyName = exception instanceof StreamsException && ((StreamsException)exception).taskId().isPresent() ? ((TaskId)((StreamsException)exception).taskId().get()).topologyName() : null;
            this.newErrorsByTopology.computeIfAbsent(topologyName, t -> new LinkedList()).add(exception);
            if (exception.getCause() instanceof MissingSourceTopicException) {
                return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
            }
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
        }

        public synchronized Throwable nextError(String topologyName) {
            return this.newErrorsByTopology.containsKey(topologyName) ? this.newErrorsByTopology.get(topologyName).poll() : null;
        }
    }
}

