/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.errors.UnknownTopologyException;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStoreQueryParameters;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class NamedTopologyTest {
    private static final String UNKNOWN_TOPOLOGY = "not-a-real-topology";
    private static final String UNKNOWN_STORE = "not-a-real-store";
    private final Properties props = NamedTopologyTest.configProps();
    private final KafkaStreamsNamedTopologyWrapper streams = new KafkaStreamsNamedTopologyWrapper(this.props);
    private final NamedTopologyBuilder builder1 = this.streams.newNamedTopologyBuilder("topology-1");
    private final NamedTopologyBuilder builder2 = this.streams.newNamedTopologyBuilder("topology-2");
    private final NamedTopologyBuilder builder3 = this.streams.newNamedTopologyBuilder("topology-3");

    @Before
    public void setup() {
        this.builder1.stream("input-1");
        this.builder2.stream("input-2");
        this.builder3.stream("input-3");
    }

    @After
    public void cleanup() {
        this.streams.close();
    }

    private static Properties configProps() {
        Properties props = new Properties();
        props.put("application.id", "Named-Topology-App");
        props.put("bootstrap.servers", "localhost:2018");
        props.put("state.dir", TestUtils.tempDirectory().getPath());
        return props;
    }

    @Test
    public void shouldThrowIllegalArgumentOnIllegalName() {
        Assert.assertThrows(IllegalArgumentException.class, () -> this.streams.newNamedTopologyBuilder("__not-allowed__"));
    }

    @Test
    public void shouldBuildSingleNamedTopology() {
        this.builder1.stream("stream-1").filter((k, v) -> !k.equals(v)).to("output-1");
        this.streams.start(this.builder1.build());
    }

    @Test
    public void shouldBuildMultipleIdenticalNamedTopologyWithRepartition() {
        this.builder1.stream("stream-1").selectKey((k, v) -> v).groupByKey().count().toStream().to("output-1");
        this.builder2.stream("stream-2").selectKey((k, v) -> v).groupByKey().count().toStream().to("output-2");
        this.builder3.stream("stream-3").selectKey((k, v) -> v).groupByKey().count().toStream().to("output-3");
        this.streams.start(Arrays.asList(this.builder1.build(), this.builder2.build(), this.builder3.build()));
    }

    @Test
    public void shouldReturnTopologyByName() {
        NamedTopology topology1 = this.builder1.build();
        NamedTopology topology2 = this.builder2.build();
        NamedTopology topology3 = this.builder3.build();
        this.streams.start(Arrays.asList(topology1, topology2, topology3));
        MatcherAssert.assertThat(this.streams.getTopologyByName("topology-1").get(), (Matcher)CoreMatchers.equalTo((Object)topology1));
        MatcherAssert.assertThat(this.streams.getTopologyByName("topology-2").get(), (Matcher)CoreMatchers.equalTo((Object)topology2));
        MatcherAssert.assertThat(this.streams.getTopologyByName("topology-3").get(), (Matcher)CoreMatchers.equalTo((Object)topology3));
    }

    @Test
    public void shouldReturnEmptyWhenLookingUpNonExistentTopologyByName() {
        this.streams.start(this.builder1.build());
        MatcherAssert.assertThat((Object)this.streams.getTopologyByName("non-existent-topology").isPresent(), (Matcher)CoreMatchers.equalTo((Object)false));
    }

    @Test
    public void shouldAllowSameStoreNameToBeUsedByMultipleNamedTopologies() {
        this.builder1.stream("stream-1").selectKey((k, v) -> v).groupByKey().count(Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"store")));
        this.builder2.stream("stream-2").selectKey((k, v) -> v).groupByKey().count(Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"store")));
        this.streams.start(Arrays.asList(this.builder1.build(), this.builder2.build()));
    }

    @Test
    public void shouldAllowAddingAndRemovingNamedTopologyAndReturnBeforeCallingStart() throws Exception {
        this.builder1.stream("stream-1").selectKey((k, v) -> v).groupByKey().count(Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"store")));
        this.builder2.stream("stream-2").selectKey((k, v) -> v).groupByKey().count(Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"store")));
        this.streams.addNamedTopology(this.builder1.build()).all().get();
        this.streams.addNamedTopology(this.builder2.build()).all().get();
        this.streams.removeNamedTopology("topology-2").all().get();
    }

    @Test
    public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateStreamFromSameInputTopic() {
        this.builder1.stream("stream");
        this.builder2.stream("stream");
        Assert.assertThrows(TopologyException.class, () -> this.streams.start(Arrays.asList(this.builder1.build(), this.builder2.build())));
    }

    @Test
    public void shouldThrowTopologyExceptionWhenAddingNamedTopologyReadingFromSameInputTopicAfterStart() {
        this.builder1.stream("stream");
        this.builder2.stream("stream");
        this.streams.start();
        this.streams.addNamedTopology(this.builder1.build());
        ExecutionException exception = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)this.streams.addNamedTopology(this.builder2.build()).all().get();
        });
        MatcherAssert.assertThat(exception.getCause().getClass(), (Matcher)CoreMatchers.equalTo(TopologyException.class));
    }

    @Test
    public void shouldThrowTopologyExceptionWhenAddingNamedTopologyReadingFromSameInputTopicBeforeStart() {
        this.builder1.stream("stream");
        this.builder2.stream("stream");
        this.streams.addNamedTopology(this.builder1.build());
        ExecutionException exception = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)this.streams.addNamedTopology(this.builder2.build()).all().get();
        });
        MatcherAssert.assertThat(exception.getCause().getClass(), (Matcher)CoreMatchers.equalTo(TopologyException.class));
    }

    @Test
    public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateTableFromSameInputTopic() {
        this.builder1.table("table");
        this.builder2.table("table");
        Assert.assertThrows(TopologyException.class, () -> this.streams.start(Arrays.asList(this.builder1.build(), this.builder2.build())));
    }

    @Test
    public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateStreamAndTableFromSameInputTopic() {
        this.builder1.stream("input");
        this.builder2.table("input");
        Assert.assertThrows(TopologyException.class, () -> this.streams.start(Arrays.asList(this.builder1.build(), this.builder2.build())));
    }

    @Test
    public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateStreamFromOverlappingInputTopicCollection() {
        this.builder1.stream("stream");
        this.builder2.stream(Arrays.asList("unique-input", "stream"));
        Assert.assertThrows(TopologyException.class, () -> this.streams.start(Arrays.asList(this.builder1.build(), this.builder2.build())));
    }

    @Test
    public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateStreamFromSamePattern() {
        this.builder1.stream(Pattern.compile("some-regex"));
        this.builder2.stream(Pattern.compile("some-regex"));
        Assert.assertThrows(TopologyException.class, () -> this.streams.start(Arrays.asList(this.builder1.build(), this.builder2.build())));
    }

    @Test
    public void shouldThrowUnknownTopologyExceptionForAllLocalStorePartitionLags() {
        this.streams.addNamedTopology(this.builder1.build());
        this.streams.start();
        Assert.assertThrows(UnknownTopologyException.class, () -> this.streams.allLocalStorePartitionLagsForTopology(UNKNOWN_TOPOLOGY));
    }

    @Test
    public void shouldThrowUnknownTopologyExceptionForQueryMetadataForKey() {
        this.streams.addNamedTopology(this.builder1.build());
        this.streams.start();
        Assert.assertThrows(UnknownTopologyException.class, () -> this.streams.queryMetadataForKey("store", (Object)"A", (Serializer)new StringSerializer(), UNKNOWN_TOPOLOGY));
    }

    @Test
    public void shouldThrowUnknownStateStoreExceptionForQueryMetadataForKey() {
        this.streams.addNamedTopology(this.builder1.build());
        this.streams.start();
        Assert.assertThrows(UnknownStateStoreException.class, () -> this.streams.queryMetadataForKey(UNKNOWN_STORE, (Object)"A", (Serializer)new StringSerializer(), "topology-1"));
    }

    @Test
    public void shouldThrowUnknownTopologyExceptionForStreamsMetadataForStore() {
        this.streams.addNamedTopology(this.builder1.build());
        this.streams.start();
        Assert.assertThrows(UnknownTopologyException.class, () -> this.streams.streamsMetadataForStore("store", UNKNOWN_TOPOLOGY));
    }

    @Test
    public void shouldThrowUnknownStateStoreExceptionForStreamsMetadataForStore() {
        this.streams.addNamedTopology(this.builder1.build());
        this.streams.start();
        Assert.assertThrows(UnknownStateStoreException.class, () -> this.streams.streamsMetadataForStore(UNKNOWN_STORE, "topology-1"));
    }

    @Test
    public void shouldThrowUnknownTopologyExceptionForStore() {
        this.streams.addNamedTopology(this.builder1.build());
        this.streams.start();
        Assert.assertThrows(UnknownTopologyException.class, () -> {
            ReadOnlyKeyValueStore cfr_ignored_0 = (ReadOnlyKeyValueStore)this.streams.store(NamedTopologyStoreQueryParameters.fromNamedTopologyAndStoreNameAndType((String)UNKNOWN_TOPOLOGY, (String)"store", (QueryableStoreType)QueryableStoreTypes.keyValueStore()));
        });
    }

    @Test
    public void shouldThrowUnknownStateStoreExceptionForStore() {
        this.streams.addNamedTopology(this.builder1.build());
        this.streams.start();
        Assert.assertThrows(UnknownStateStoreException.class, () -> {
            ReadOnlyKeyValueStore cfr_ignored_0 = (ReadOnlyKeyValueStore)this.streams.store(NamedTopologyStoreQueryParameters.fromNamedTopologyAndStoreNameAndType((String)"topology-1", (String)UNKNOWN_STORE, (QueryableStoreType)QueryableStoreTypes.keyValueStore()));
        });
    }

    @Test
    public void shouldDescribeWithSingleNamedTopology() {
        this.builder1.stream("input").filter((k, v) -> !k.equals(v)).to("output");
        this.streams.start(this.builder1.build());
        MatcherAssert.assertThat((Object)this.streams.getFullTopologyDescription(), (Matcher)CoreMatchers.equalTo((Object)"Topology: topology-1:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-1])\n      --> none\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input])\n      --> KSTREAM-FILTER-0000000002\n    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n      --> KSTREAM-SINK-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Sink: KSTREAM-SINK-0000000003 (topic: output)\n      <-- KSTREAM-FILTER-0000000002\n\n"));
    }

    @Test
    public void shouldDescribeWithMultipleNamedTopologies() {
        this.builder1.stream("stream-1").filter((k, v) -> !k.equals(v)).to("output-1");
        this.builder2.stream("stream-2").filter((k, v) -> !k.equals(v)).to("output-2");
        this.builder3.stream("stream-3").filter((k, v) -> !k.equals(v)).to("output-3");
        this.streams.start(Arrays.asList(this.builder1.build(), this.builder2.build(), this.builder3.build()));
        MatcherAssert.assertThat((Object)this.streams.getFullTopologyDescription(), (Matcher)CoreMatchers.equalTo((Object)"Topology: topology-1:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-1])\n      --> none\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000001 (topics: [stream-1])\n      --> KSTREAM-FILTER-0000000002\n    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n      --> KSTREAM-SINK-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Sink: KSTREAM-SINK-0000000003 (topic: output-1)\n      <-- KSTREAM-FILTER-0000000002\n\nTopology: topology-2:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-2])\n      --> none\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000001 (topics: [stream-2])\n      --> KSTREAM-FILTER-0000000002\n    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n      --> KSTREAM-SINK-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Sink: KSTREAM-SINK-0000000003 (topic: output-2)\n      <-- KSTREAM-FILTER-0000000002\n\nTopology: topology-3:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-3])\n      --> none\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000001 (topics: [stream-3])\n      --> KSTREAM-FILTER-0000000002\n    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n      --> KSTREAM-SINK-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Sink: KSTREAM-SINK-0000000003 (topic: output-3)\n      <-- KSTREAM-FILTER-0000000002\n\n"));
    }

    @Test
    public void shouldDescribeWithEmptyNamedTopology() {
        MatcherAssert.assertThat((Object)this.streams.getFullTopologyDescription(), (Matcher)CoreMatchers.equalTo((Object)""));
    }
}

