/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockPredicate;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

public class StreamsBuilderTest {
    private static final String STREAM_TOPIC = "stream-topic";
    private static final String STREAM_OPERATION_NAME = "stream-operation";
    private static final String STREAM_TOPIC_TWO = "stream-topic-two";
    private static final String TABLE_TOPIC = "table-topic";
    private final StreamsBuilder builder = new StreamsBuilder();
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());

    @Test
    public void shouldNotThrowNullPointerIfOptimizationsNotSpecified() {
        Properties properties = new Properties();
        StreamsBuilder builder = new StreamsBuilder();
        builder.build(properties);
    }

    @Test
    public void shouldAllowJoinUnmaterializedFilteredKTable() {
        KTable filteredKTable = this.builder.table(TABLE_TOPIC).filter(MockPredicate.allGoodPredicate());
        this.builder.stream(STREAM_TOPIC).join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), (Matcher)CoreMatchers.equalTo(Collections.singleton(((StateStore)topology.stateStores().get(0)).name())));
        Assert.assertTrue((boolean)topology.processorConnectedStateStores("KTABLE-FILTER-0000000003").isEmpty());
    }

    @Test
    public void shouldAllowJoinMaterializedFilteredKTable() {
        KTable filteredKTable = this.builder.table(TABLE_TOPIC).filter(MockPredicate.allGoodPredicate(), Materialized.as((String)"store"));
        this.builder.stream(STREAM_TOPIC).join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), (Matcher)CoreMatchers.equalTo(Collections.singleton("store")));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KTABLE-FILTER-0000000003"), (Matcher)CoreMatchers.equalTo(Collections.singleton("store")));
    }

    @Test
    public void shouldAllowJoinUnmaterializedMapValuedKTable() {
        KTable mappedKTable = this.builder.table(TABLE_TOPIC).mapValues(MockMapper.noOpValueMapper());
        this.builder.stream(STREAM_TOPIC).join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), (Matcher)CoreMatchers.equalTo(Collections.singleton(((StateStore)topology.stateStores().get(0)).name())));
        Assert.assertTrue((boolean)topology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003").isEmpty());
    }

    @Test
    public void shouldAllowJoinMaterializedMapValuedKTable() {
        KTable mappedKTable = this.builder.table(TABLE_TOPIC).mapValues(MockMapper.noOpValueMapper(), Materialized.as((String)"store"));
        this.builder.stream(STREAM_TOPIC).join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), (Matcher)CoreMatchers.equalTo(Collections.singleton("store")));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003"), (Matcher)CoreMatchers.equalTo(Collections.singleton("store")));
    }

    @Test
    public void shouldAllowJoinUnmaterializedJoinedKTable() {
        KTable table1 = this.builder.table("table-topic1");
        KTable table2 = this.builder.table("table-topic2");
        this.builder.stream(STREAM_TOPIC).join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER), MockValueJoiner.TOSTRING_JOINER);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)2));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new String[]{((StateStore)topology.stateStores().get(0)).name(), ((StateStore)topology.stateStores().get(1)).name()})));
        Assert.assertTrue((boolean)topology.processorConnectedStateStores("KTABLE-MERGE-0000000007").isEmpty());
    }

    @Test
    public void shouldAllowJoinMaterializedJoinedKTable() {
        KTable table1 = this.builder.table("table-topic1");
        KTable table2 = this.builder.table("table-topic2");
        this.builder.stream(STREAM_TOPIC).join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Materialized.as((String)"store")), MockValueJoiner.TOSTRING_JOINER);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)3));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), (Matcher)CoreMatchers.equalTo(Collections.singleton("store")));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KTABLE-MERGE-0000000007"), (Matcher)CoreMatchers.equalTo(Collections.singleton("store")));
    }

    @Test
    public void shouldAllowJoinMaterializedSourceKTable() {
        KTable table = this.builder.table(TABLE_TOPIC);
        this.builder.stream(STREAM_TOPIC).join(table, MockValueJoiner.TOSTRING_JOINER);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KTABLE-SOURCE-0000000002"), (Matcher)CoreMatchers.equalTo(Collections.singleton(((StateStore)topology.stateStores().get(0)).name())));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KSTREAM-JOIN-0000000004"), (Matcher)CoreMatchers.equalTo(Collections.singleton(((StateStore)topology.stateStores().get(0)).name())));
    }

    @Test
    public void shouldProcessingFromSinkTopic() {
        KStream source = this.builder.stream("topic-source");
        source.to("topic-sink");
        MockProcessorSupplier processorSupplier = new MockProcessorSupplier();
        source.process(processorSupplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic-source", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            inputTopic.pipeInput((Object)"A", (Object)"aa");
        }
        Assert.assertEquals(Collections.singletonList(new KeyValueTimestamp<String, String>("A", "aa", 0L)), processorSupplier.theCapturedProcessor().processed);
    }

    @Test
    public void shouldProcessViaThroughTopic() {
        KStream source = this.builder.stream("topic-source");
        KStream through = source.through("topic-sink");
        MockProcessorSupplier sourceProcessorSupplier = new MockProcessorSupplier();
        source.process(sourceProcessorSupplier, new String[0]);
        MockProcessorSupplier throughProcessorSupplier = new MockProcessorSupplier();
        through.process(throughProcessorSupplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic-source", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            inputTopic.pipeInput((Object)"A", (Object)"aa");
        }
        Assert.assertEquals(Collections.singletonList(new KeyValueTimestamp<String, String>("A", "aa", 0L)), sourceProcessorSupplier.theCapturedProcessor().processed);
        Assert.assertEquals(Collections.singletonList(new KeyValueTimestamp<String, String>("A", "aa", 0L)), throughProcessorSupplier.theCapturedProcessor().processed);
    }

    @Test
    public void shouldMergeStreams() {
        String topic1 = "topic-1";
        String topic2 = "topic-2";
        KStream source1 = this.builder.stream("topic-1");
        KStream source2 = this.builder.stream("topic-2");
        KStream merged = source1.merge(source2);
        MockProcessorSupplier processorSupplier = new MockProcessorSupplier();
        merged.process(processorSupplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic-1", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic-2", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            inputTopic1.pipeInput((Object)"A", (Object)"aa");
            inputTopic2.pipeInput((Object)"B", (Object)"bb");
            inputTopic2.pipeInput((Object)"C", (Object)"cc");
            inputTopic1.pipeInput((Object)"D", (Object)"dd");
        }
        Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<String, String>("A", "aa", 0L), new KeyValueTimestamp<String, String>("B", "bb", 0L), new KeyValueTimestamp<String, String>("C", "cc", 0L), new KeyValueTimestamp<String, String>("D", "dd", 0L)), processorSupplier.theCapturedProcessor().processed);
    }

    @Test
    public void shouldUseSerdesDefinedInMaterializedToConsumeTable() {
        HashMap results = new HashMap();
        String topic = "topic";
        ForeachAction action = results::put;
        this.builder.table("topic", Materialized.as((String)"store").withKeySerde(Serdes.Long()).withValueSerde(Serdes.String())).toStream().foreach(action);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic", (Serializer)new LongSerializer(), (Serializer)new StringSerializer());
            inputTopic.pipeInput((Object)1L, (Object)"value1");
            inputTopic.pipeInput((Object)2L, (Object)"value2");
            KeyValueStore store = driver.getKeyValueStore("store");
            MatcherAssert.assertThat((Object)store.get((Object)1L), (Matcher)CoreMatchers.equalTo((Object)"value1"));
            MatcherAssert.assertThat((Object)store.get((Object)2L), (Matcher)CoreMatchers.equalTo((Object)"value2"));
            MatcherAssert.assertThat(results.get(1L), (Matcher)CoreMatchers.equalTo((Object)"value1"));
            MatcherAssert.assertThat(results.get(2L), (Matcher)CoreMatchers.equalTo((Object)"value2"));
        }
    }

    @Test
    public void shouldUseSerdesDefinedInMaterializedToConsumeGlobalTable() {
        String topic = "topic";
        this.builder.globalTable("topic", Materialized.as((String)"store").withKeySerde(Serdes.Long()).withValueSerde(Serdes.String()));
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic", (Serializer)new LongSerializer(), (Serializer)new StringSerializer());
            inputTopic.pipeInput((Object)1L, (Object)"value1");
            inputTopic.pipeInput((Object)2L, (Object)"value2");
            KeyValueStore store = driver.getKeyValueStore("store");
            MatcherAssert.assertThat((Object)store.get((Object)1L), (Matcher)CoreMatchers.equalTo((Object)"value1"));
            MatcherAssert.assertThat((Object)store.get((Object)2L), (Matcher)CoreMatchers.equalTo((Object)"value2"));
        }
    }

    @Test
    public void shouldNotMaterializeStoresIfNotRequired() {
        String topic = "topic";
        this.builder.table("topic", Materialized.with((Serde)Serdes.Long(), (Serde)Serdes.String()));
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)0));
    }

    @Test
    public void shouldReuseSourceTopicAsChangelogsWithOptimization20() {
        String topic = "topic";
        this.builder.table("topic", Materialized.as((String)"store"));
        Properties props = StreamsTestUtils.getStreamsConfig();
        props.put("topology.optimization", "all");
        Topology topology = this.builder.build(props);
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
        internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)props));
        MatcherAssert.assertThat((Object)internalTopologyBuilder.build().storeToChangelogTopic(), (Matcher)CoreMatchers.equalTo(Collections.singletonMap("store", "topic")));
        MatcherAssert.assertThat(internalTopologyBuilder.stateStores().keySet(), (Matcher)CoreMatchers.equalTo(Collections.singleton("store")));
        MatcherAssert.assertThat((Object)((InternalTopologyBuilder.StateStoreFactory)internalTopologyBuilder.stateStores().get("store")).loggingEnabled(), (Matcher)CoreMatchers.equalTo((Object)false));
        MatcherAssert.assertThat((Object)((InternalTopologyBuilder.TopicsInfo)internalTopologyBuilder.topicGroups().get((Object)Integer.valueOf((int)0))).stateChangelogTopics.isEmpty(), (Matcher)CoreMatchers.equalTo((Object)true));
    }

    @Test
    public void shouldNotReuseSourceTopicAsChangelogsByDefault() {
        String topic = "topic";
        this.builder.table("topic", Materialized.as((String)"store"));
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(this.builder.build());
        internalTopologyBuilder.setApplicationId("appId");
        MatcherAssert.assertThat((Object)internalTopologyBuilder.build().storeToChangelogTopic(), (Matcher)CoreMatchers.equalTo(Collections.singletonMap("store", "appId-store-changelog")));
        MatcherAssert.assertThat(internalTopologyBuilder.stateStores().keySet(), (Matcher)CoreMatchers.equalTo(Collections.singleton("store")));
        MatcherAssert.assertThat((Object)((InternalTopologyBuilder.StateStoreFactory)internalTopologyBuilder.stateStores().get("store")).loggingEnabled(), (Matcher)CoreMatchers.equalTo((Object)true));
        MatcherAssert.assertThat(((InternalTopologyBuilder.TopicsInfo)internalTopologyBuilder.topicGroups().get((Object)Integer.valueOf((int)0))).stateChangelogTopics.keySet(), (Matcher)CoreMatchers.equalTo(Collections.singleton("appId-store-changelog")));
    }

    @Test(expected=TopologyException.class)
    public void shouldThrowExceptionWhenNoTopicPresent() {
        this.builder.stream(Collections.emptyList());
        this.builder.build();
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowExceptionWhenTopicNamesAreNull() {
        this.builder.stream(Arrays.asList(null, null));
        this.builder.build();
    }

    @Test
    public void shouldUseSpecifiedNameForStreamSourceProcessor() {
        String expected = "source-node";
        this.builder.stream(STREAM_TOPIC, Consumed.as((String)"source-node"));
        this.builder.stream(STREAM_TOPIC_TWO);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForOperation(topology, "source-node", "KSTREAM-SOURCE-0000000001");
    }

    @Test
    public void shouldUseSpecifiedNameForTableSourceProcessor() {
        String expected = "source-node";
        this.builder.table(STREAM_TOPIC, Consumed.as((String)"source-node"));
        this.builder.table(STREAM_TOPIC_TWO);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForOperation(topology, "source-node-source", "source-node", "KSTREAM-SOURCE-0000000004", "KTABLE-SOURCE-0000000005");
    }

    @Test
    public void shouldUseSpecifiedNameForGlobalTableSourceProcessor() {
        String expected = "source-processor";
        this.builder.globalTable(STREAM_TOPIC, Consumed.as((String)"source-processor"));
        this.builder.globalTable(STREAM_TOPIC_TWO);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForStateStore(topology.globalStateStores(), "stream-topic-STATE-STORE-0000000000", "stream-topic-two-STATE-STORE-0000000003");
    }

    @Test
    public void shouldUseSpecifiedNameForSinkProcessor() {
        String expected = "sink-processor";
        KStream stream = this.builder.stream(STREAM_TOPIC);
        stream.to(STREAM_TOPIC_TWO, Produced.as((String)"sink-processor"));
        stream.to(STREAM_TOPIC_TWO);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "sink-processor", "KSTREAM-SINK-0000000002");
    }

    @Test
    public void shouldUseSpecifiedNameForMapOperation() {
        this.builder.stream(STREAM_TOPIC).map(KeyValue::pair, Named.as((String)STREAM_OPERATION_NAME));
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForMapValuesOperation() {
        this.builder.stream(STREAM_TOPIC).mapValues(v -> v, Named.as((String)STREAM_OPERATION_NAME));
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForMapValuesWithKeyOperation() {
        this.builder.stream(STREAM_TOPIC).mapValues((k, v) -> v, Named.as((String)STREAM_OPERATION_NAME));
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForFilterOperation() {
        this.builder.stream(STREAM_TOPIC).filter((k, v) -> true, Named.as((String)STREAM_OPERATION_NAME));
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForForEachOperation() {
        this.builder.stream(STREAM_TOPIC).foreach((k, v) -> {}, Named.as((String)STREAM_OPERATION_NAME));
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForTransform() {
        this.builder.stream(STREAM_TOPIC).transform(() -> null, Named.as((String)STREAM_OPERATION_NAME), new String[0]);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForTransformValues() {
        this.builder.stream(STREAM_TOPIC).transformValues(() -> null, Named.as((String)STREAM_OPERATION_NAME), new String[0]);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForTransformValuesWithKey() {
        this.builder.stream(STREAM_TOPIC).transformValues(() -> null, Named.as((String)STREAM_OPERATION_NAME), new String[0]);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForBranchOperation() {
        this.builder.stream(STREAM_TOPIC).branch(Named.as((String)"branch-processor"), new Predicate[]{(k, v) -> true, (k, v) -> false});
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "branch-processor", "branch-processor-predicate-0", "branch-processor-predicate-1");
    }

    @Test
    public void shouldUseSpecifiedNameForJoinOperationBetweenKStreamAndKTable() {
        KStream streamOne = this.builder.stream(STREAM_TOPIC);
        KTable streamTwo = this.builder.table(TABLE_TOPIC);
        streamOne.join(streamTwo, (value1, value2) -> value1, Joined.as((String)STREAM_OPERATION_NAME));
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "KSTREAM-SOURCE-0000000002", "KTABLE-SOURCE-0000000003", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForLeftJoinOperationBetweenKStreamAndKTable() {
        KStream streamOne = this.builder.stream(STREAM_TOPIC);
        KTable streamTwo = this.builder.table(STREAM_TOPIC_TWO);
        streamOne.leftJoin(streamTwo, (value1, value2) -> value1, Joined.as((String)STREAM_OPERATION_NAME));
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "KSTREAM-SOURCE-0000000002", "KTABLE-SOURCE-0000000003", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForLeftJoinOperationBetweenKStreamAndKStream() {
        KStream streamOne = this.builder.stream(STREAM_TOPIC);
        KStream streamTwo = this.builder.stream(STREAM_TOPIC_TWO);
        streamOne.leftJoin(streamTwo, (value1, value2) -> value1, JoinWindows.of((Duration)Duration.ofHours(1L)), StreamJoined.as((String)STREAM_OPERATION_NAME).withName(STREAM_OPERATION_NAME));
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForStateStore(topology.stateStores(), "stream-operation-this-join-store", "stream-operation-outer-other-join-store");
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "KSTREAM-SOURCE-0000000001", "stream-operation-this-windowed", "stream-operation-other-windowed", "stream-operation-this-join", "stream-operation-outer-other-join", "stream-operation-merge");
    }

    @Test
    @Deprecated
    public void shouldUseGeneratedStoreNamesForLeftJoinOperationBetweenKStreamAndKStream() {
        KStream streamOne = this.builder.stream(STREAM_TOPIC);
        KStream streamTwo = this.builder.stream(STREAM_TOPIC_TWO);
        streamOne.leftJoin(streamTwo, (value1, value2) -> value1, JoinWindows.of((Duration)Duration.ofHours(1L)), Joined.as((String)STREAM_OPERATION_NAME));
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForStateStore(topology.stateStores(), "KSTREAM-JOINTHIS-0000000004-store", "KSTREAM-OUTEROTHER-0000000005-store");
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "KSTREAM-SOURCE-0000000001", "stream-operation-this-windowed", "stream-operation-other-windowed", "stream-operation-this-join", "stream-operation-outer-other-join", "stream-operation-merge");
    }

    @Test
    public void shouldUseSpecifiedNameForJoinOperationBetweenKStreamAndKStream() {
        KStream streamOne = this.builder.stream(STREAM_TOPIC);
        KStream streamTwo = this.builder.stream(STREAM_TOPIC_TWO);
        streamOne.join(streamTwo, (value1, value2) -> value1, JoinWindows.of((Duration)Duration.ofHours(1L)), StreamJoined.as((String)STREAM_OPERATION_NAME).withName(STREAM_OPERATION_NAME));
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForStateStore(topology.stateStores(), "stream-operation-this-join-store", "stream-operation-other-join-store");
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "KSTREAM-SOURCE-0000000001", "stream-operation-this-windowed", "stream-operation-other-windowed", "stream-operation-this-join", "stream-operation-other-join", "stream-operation-merge");
    }

    @Test
    @Deprecated
    public void shouldUseGeneratedNameForJoinOperationBetweenKStreamAndKStream() {
        KStream streamOne = this.builder.stream(STREAM_TOPIC);
        KStream streamTwo = this.builder.stream(STREAM_TOPIC_TWO);
        streamOne.join(streamTwo, (value1, value2) -> value1, JoinWindows.of((Duration)Duration.ofHours(1L)), Joined.as((String)STREAM_OPERATION_NAME));
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForStateStore(topology.stateStores(), "KSTREAM-JOINTHIS-0000000004-store", "KSTREAM-JOINOTHER-0000000005-store");
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "KSTREAM-SOURCE-0000000001", "stream-operation-this-windowed", "stream-operation-other-windowed", "stream-operation-this-join", "stream-operation-other-join", "stream-operation-merge");
    }

    @Test
    public void shouldUseSpecifiedNameForOuterJoinOperationBetweenKStreamAndKStream() {
        KStream streamOne = this.builder.stream(STREAM_TOPIC);
        KStream streamTwo = this.builder.stream(STREAM_TOPIC_TWO);
        streamOne.outerJoin(streamTwo, (value1, value2) -> value1, JoinWindows.of((Duration)Duration.ofHours(1L)), StreamJoined.as((String)STREAM_OPERATION_NAME).withName(STREAM_OPERATION_NAME));
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForStateStore(topology.stateStores(), "stream-operation-outer-this-join-store", "stream-operation-outer-other-join-store");
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "KSTREAM-SOURCE-0000000001", "stream-operation-this-windowed", "stream-operation-other-windowed", "stream-operation-outer-this-join", "stream-operation-outer-other-join", "stream-operation-merge");
    }

    @Test
    @Deprecated
    public void shouldUseGeneratedStoreNamesForOuterJoinOperationBetweenKStreamAndKStream() {
        KStream streamOne = this.builder.stream(STREAM_TOPIC);
        KStream streamTwo = this.builder.stream(STREAM_TOPIC_TWO);
        streamOne.outerJoin(streamTwo, (value1, value2) -> value1, JoinWindows.of((Duration)Duration.ofHours(1L)), Joined.as((String)STREAM_OPERATION_NAME));
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForStateStore(topology.stateStores(), "KSTREAM-OUTERTHIS-0000000004-store", "KSTREAM-OUTEROTHER-0000000005-store");
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "KSTREAM-SOURCE-0000000001", "stream-operation-this-windowed", "stream-operation-other-windowed", "stream-operation-outer-this-join", "stream-operation-outer-other-join", "stream-operation-merge");
    }

    @Test
    public void shouldUseSpecifiedNameForMergeOperation() {
        String topic1 = "topic-1";
        String topic2 = "topic-2";
        KStream source1 = this.builder.stream("topic-1");
        KStream source2 = this.builder.stream("topic-2");
        source1.merge(source2, Named.as((String)"merge-processor"));
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "KSTREAM-SOURCE-0000000001", "merge-processor");
    }

    @Test
    public void shouldUseSpecifiedNameForProcessOperation() {
        this.builder.stream(STREAM_TOPIC).process(() -> null, Named.as((String)"test-processor"), new String[0]);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "test-processor");
    }

    @Test
    public void shouldUseSpecifiedNameForPrintOperation() {
        this.builder.stream(STREAM_TOPIC).print(Printed.toSysOut().withName("print-processor"));
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "print-processor");
    }

    @Test
    public void shouldUseSpecifiedNameForFlatTransformValueOperation() {
        this.builder.stream(STREAM_TOPIC).flatTransformValues(() -> null, Named.as((String)STREAM_OPERATION_NAME), new String[0]);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForFlatTransformValueWithKeyOperation() {
        this.builder.stream(STREAM_TOPIC).flatTransformValues(() -> null, Named.as((String)STREAM_OPERATION_NAME), new String[0]);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
    }

    @Test
    public void shouldUseSpecifiedNameForToStream() {
        this.builder.table(STREAM_TOPIC).toStream(Named.as((String)"to-stream"));
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000001", "KTABLE-SOURCE-0000000002", "to-stream");
    }

    @Test
    public void shouldUseSpecifiedNameForToStreamWithMapper() {
        this.builder.table(STREAM_TOPIC).toStream(KeyValue::pair, Named.as((String)"to-stream"));
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000001", "KTABLE-SOURCE-0000000002", "to-stream", "KSTREAM-KEY-SELECT-0000000004");
    }

    @Test
    public void shouldUseSpecifiedNameForAggregateOperationGivenTable() {
        this.builder.table(STREAM_TOPIC).groupBy(KeyValue::pair, Grouped.as((String)"group-operation")).count(Named.as((String)STREAM_OPERATION_NAME));
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        StreamsBuilderTest.assertNamesForStateStore(topology.stateStores(), "stream-topic-STATE-STORE-0000000000", "KTABLE-AGGREGATE-STATE-STORE-0000000004");
        StreamsBuilderTest.assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000001", "KTABLE-SOURCE-0000000002", "group-operation", "stream-operation-sink", "stream-operation-source", STREAM_OPERATION_NAME);
    }

    private static void assertNamesForOperation(ProcessorTopology topology, String ... expected) {
        List processors = topology.processors();
        Assert.assertEquals((String)"Invalid number of expected processors", (long)expected.length, (long)processors.size());
        for (int i = 0; i < expected.length; ++i) {
            Assert.assertEquals((Object)expected[i], (Object)((ProcessorNode)processors.get(i)).name());
        }
    }

    private static void assertNamesForStateStore(List<StateStore> stores, String ... expected) {
        Assert.assertEquals((String)"Invalid number of expected state stores", (long)expected.length, (long)stores.size());
        for (int i = 0; i < expected.length; ++i) {
            Assert.assertEquals((Object)expected[i], (Object)stores.get(i).name());
        }
    }
}

