/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

@Timeout(value=600L)
@Tag(value="integration")
public class RegexSourceIntegrationTest {
    private static final int NUM_BROKERS = 1;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final MockTime mockTime;
    private static final String TOPIC_1 = "topic-1";
    private static final String TOPIC_2 = "topic-2";
    private static final String TOPIC_A = "topic-A";
    private static final String TOPIC_C = "topic-C";
    private static final String TOPIC_Y = "topic-Y";
    private static final String TOPIC_Z = "topic-Z";
    private static final String FA_TOPIC = "fa";
    private static final String FOO_TOPIC = "foo";
    private static final String PARTITIONED_TOPIC_1 = "partitioned-1";
    private static final String PARTITIONED_TOPIC_2 = "partitioned-2";
    private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
    private Properties streamsConfiguration;
    private static final String STREAM_TASKS_NOT_UPDATED = "Stream tasks not updated";
    private KafkaStreams streams;
    private static volatile AtomicInteger topicSuffixGenerator = new AtomicInteger(0);
    private String outputTopic;

    public RegexSourceIntegrationTest() {
        this.mockTime = RegexSourceIntegrationTest.CLUSTER.time;
    }

    @BeforeAll
    public static void startCluster() throws IOException, InterruptedException {
        CLUSTER.start();
        CLUSTER.createTopics(TOPIC_1, TOPIC_2, TOPIC_A, TOPIC_C, TOPIC_Y, TOPIC_Z, FA_TOPIC, FOO_TOPIC);
        CLUSTER.createTopic(PARTITIONED_TOPIC_1, 2, 1);
        CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1);
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void setUp(TestInfo testInfo) throws InterruptedException {
        this.outputTopic = this.createTopic(topicSuffixGenerator.incrementAndGet());
        Properties properties = new Properties();
        properties.put("statestore.cache.max.bytes", (Object)0);
        properties.put("commit.interval.ms", (Object)100L);
        properties.put("metadata.max.age.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("max.task.idle.ms", (Object)0L);
        properties.put("session.timeout.ms", (Object)10000);
        this.streamsConfiguration = StreamsTestUtils.getStreamsConfig(IntegrationTestUtils.safeUniqueTestName(RegexSourceIntegrationTest.class, testInfo), CLUSTER.bootstrapServers(), STRING_SERDE_CLASSNAME, STRING_SERDE_CLASSNAME, properties);
    }

    @AfterEach
    public void tearDown() throws IOException {
        if (this.streams != null) {
            this.streams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRegexMatchesTopicsAWhenCreated() throws Exception {
        try {
            Serde stringSerde = Serdes.String();
            List<String> expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1");
            List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
            CLUSTER.createTopic("TEST-TOPIC-1");
            StreamsBuilder builder = new StreamsBuilder();
            KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
            pattern1Stream.to(this.outputTopic, Produced.with((Serde)stringSerde, (Serde)stringSerde));
            final CopyOnWriteArrayList assignedTopics = new CopyOnWriteArrayList();
            this.streams = new KafkaStreams(builder.build(), this.streamsConfiguration, (KafkaClientSupplier)new DefaultKafkaClientSupplier(){

                public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
                    return new KafkaConsumer<byte[], byte[]>(config, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer()){

                        public void subscribe(Pattern topics, ConsumerRebalanceListener listener) {
                            super.subscribe(topics, (ConsumerRebalanceListener)new TheConsumerRebalanceListener(assignedTopics, listener));
                        }
                    };
                }
            });
            this.streams.start();
            TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), (String)STREAM_TASKS_NOT_UPDATED);
            CLUSTER.createTopic("TEST-TOPIC-2");
            TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), (String)STREAM_TASKS_NOT_UPDATED);
            this.streams.close();
        }
        catch (Throwable throwable) {
            CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
            throw throwable;
        }
        CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRegexRecordsAreProcessedAfterNewTopicCreatedWithMultipleSubtopologies() throws Exception {
        String topic1 = "TEST-TOPIC-1";
        String topic2 = "TEST-TOPIC-2";
        try {
            CLUSTER.createTopic("TEST-TOPIC-1");
            StreamsBuilder builder = new StreamsBuilder();
            KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
            KStream otherStream = builder.stream(Pattern.compile("not-a-match"));
            pattern1Stream.selectKey((k, v) -> k).groupByKey().aggregate(() -> "", (k, v, a) -> v).toStream().to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
            Topology topology = builder.build();
            MatcherAssert.assertThat((Object)topology.describe().subtopologies().size(), (Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(1)));
            this.streams = new KafkaStreams(topology, this.streamsConfiguration);
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(this.streams);
            CLUSTER.createTopic("TEST-TOPIC-2");
            KeyValue record1 = new KeyValue((Object)"1", (Object)"1");
            KeyValue record2 = new KeyValue((Object)"2", (Object)"2");
            IntegrationTestUtils.produceKeyValuesSynchronously("TEST-TOPIC-1", Collections.singletonList(record1), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class), (Time)RegexSourceIntegrationTest.CLUSTER.time);
            IntegrationTestUtils.produceKeyValuesSynchronously("TEST-TOPIC-2", Collections.singletonList(record2), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class), (Time)RegexSourceIntegrationTest.CLUSTER.time);
            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class), this.outputTopic, Arrays.asList(record1, record2));
            this.streams.close();
        }
        catch (Throwable throwable) {
            CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
            throw throwable;
        }
        CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
    }

    private String createTopic(int suffix) throws InterruptedException {
        String outputTopic = "outputTopic_" + suffix;
        CLUSTER.createTopic(outputTopic);
        return outputTopic;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRegexMatchesTopicsAWhenDeleted() throws Exception {
        Serde stringSerde = Serdes.String();
        List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B");
        List<String> expectedSecondAssignment = Collections.singletonList("TEST-TOPIC-B");
        final CopyOnWriteArrayList assignedTopics = new CopyOnWriteArrayList();
        try {
            CLUSTER.createTopics("TEST-TOPIC-A", "TEST-TOPIC-B");
            StreamsBuilder builder = new StreamsBuilder();
            KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]"));
            pattern1Stream.to(this.outputTopic, Produced.with((Serde)stringSerde, (Serde)stringSerde));
            this.streams = new KafkaStreams(builder.build(), this.streamsConfiguration, (KafkaClientSupplier)new DefaultKafkaClientSupplier(){

                public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
                    return new KafkaConsumer<byte[], byte[]>(config, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer()){

                        public void subscribe(Pattern topics, ConsumerRebalanceListener listener) {
                            super.subscribe(topics, (ConsumerRebalanceListener)new TheConsumerRebalanceListener(assignedTopics, listener));
                        }
                    };
                }
            });
            this.streams.start();
            TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), (String)STREAM_TASKS_NOT_UPDATED);
        }
        finally {
            CLUSTER.deleteTopic("TEST-TOPIC-A");
        }
        TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), (String)STREAM_TASKS_NOT_UPDATED);
    }

    @Test
    public void shouldAddStateStoreToRegexDefinedSource() throws Exception {
        MockKeyValueStoreBuilder storeBuilder = new MockKeyValueStoreBuilder("testStateStore", false);
        long thirtySecondTimeout = 30000L;
        TopologyWrapper topology = new TopologyWrapper();
        topology.addSource("ingest", Pattern.compile("topic-\\d+"));
        topology.addProcessor("my-processor", new MockApiProcessorSupplier(), new String[]{"ingest"});
        topology.addStateStore((StoreBuilder)storeBuilder, new String[]{"my-processor"});
        this.streams = new KafkaStreams((Topology)topology, this.streamsConfiguration);
        this.streams.start();
        TestCondition stateStoreNameBoundToSourceTopic = () -> {
            Map stateStoreToSourceTopic = topology.getInternalBuilder().stateStoreNameToFullSourceTopicNames();
            List topicNamesList = (List)stateStoreToSourceTopic.get("testStateStore");
            return topicNamesList != null && !topicNamesList.isEmpty() && ((String)topicNamesList.get(0)).equals(TOPIC_1);
        };
        TestUtils.waitForCondition((TestCondition)stateStoreNameBoundToSourceTopic, (long)30000L, (String)"Did not find topic: [topic-1] connected to state store: [testStateStore]");
    }

    @Test
    public void testShouldReadFromRegexAndNamedTopics() throws Exception {
        String topic1TestMessage = "topic-1 test";
        String topic2TestMessage = "topic-2 test";
        String topicATestMessage = "topic-A test";
        String topicCTestMessage = "topic-C test";
        String topicYTestMessage = "topic-Y test";
        String topicZTestMessage = "topic-Z test";
        Serde stringSerde = Serdes.String();
        StreamsBuilder builder = new StreamsBuilder();
        KStream pattern1Stream = builder.stream(Pattern.compile("topic-\\d"));
        KStream pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]"));
        KStream namedTopicsStream = builder.stream(Arrays.asList(TOPIC_Y, TOPIC_Z));
        pattern1Stream.to(this.outputTopic, Produced.with((Serde)stringSerde, (Serde)stringSerde));
        pattern2Stream.to(this.outputTopic, Produced.with((Serde)stringSerde, (Serde)stringSerde));
        namedTopicsStream.to(this.outputTopic, Produced.with((Serde)stringSerde, (Serde)stringSerde));
        this.streams = new KafkaStreams(builder.build(), this.streamsConfiguration);
        this.streams.start();
        Properties producerConfig = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Collections.singleton("topic-1 test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Collections.singleton("topic-2 test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Collections.singleton("topic-A test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Collections.singleton("topic-C test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Collections.singleton("topic-Y test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Collections.singleton("topic-Z test"), producerConfig, (Time)this.mockTime);
        Properties consumerConfig = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
        List<String> expectedReceivedValues = Arrays.asList("topic-A test", "topic-1 test", "topic-2 test", "topic-C test", "topic-Y test", "topic-Z test");
        List receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, this.outputTopic, 6);
        ArrayList<Object> actualValues = new ArrayList<Object>(6);
        for (KeyValue receivedKeyValue : receivedKeyValues) {
            actualValues.add(receivedKeyValue.value);
        }
        Collections.sort(actualValues);
        Collections.sort(expectedReceivedValues);
        MatcherAssert.assertThat(actualValues, (Matcher)CoreMatchers.equalTo(expectedReceivedValues));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMultipleConsumersCanReadFromPartitionedTopic() throws Exception {
        KafkaStreams partitionedStreamsLeader = null;
        KafkaStreams partitionedStreamsFollower = null;
        try {
            Serde stringSerde = Serdes.String();
            StreamsBuilder builderLeader = new StreamsBuilder();
            StreamsBuilder builderFollower = new StreamsBuilder();
            List<String> expectedAssignment = Arrays.asList(PARTITIONED_TOPIC_1, PARTITIONED_TOPIC_2);
            KStream partitionedStreamLeader = builderLeader.stream(Pattern.compile("partitioned-\\d"));
            KStream partitionedStreamFollower = builderFollower.stream(Pattern.compile("partitioned-\\d"));
            partitionedStreamLeader.to(this.outputTopic, Produced.with((Serde)stringSerde, (Serde)stringSerde));
            partitionedStreamFollower.to(this.outputTopic, Produced.with((Serde)stringSerde, (Serde)stringSerde));
            final CopyOnWriteArrayList leaderAssignment = new CopyOnWriteArrayList();
            final CopyOnWriteArrayList followerAssignment = new CopyOnWriteArrayList();
            partitionedStreamsLeader = new KafkaStreams(builderLeader.build(), this.streamsConfiguration, (KafkaClientSupplier)new DefaultKafkaClientSupplier(){

                public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
                    return new KafkaConsumer<byte[], byte[]>(config, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer()){

                        public void subscribe(Pattern topics, ConsumerRebalanceListener listener) {
                            super.subscribe(topics, (ConsumerRebalanceListener)new TheConsumerRebalanceListener(leaderAssignment, listener));
                        }
                    };
                }
            });
            partitionedStreamsFollower = new KafkaStreams(builderFollower.build(), this.streamsConfiguration, (KafkaClientSupplier)new DefaultKafkaClientSupplier(){

                public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
                    return new KafkaConsumer<byte[], byte[]>(config, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer()){

                        public void subscribe(Pattern topics, ConsumerRebalanceListener listener) {
                            super.subscribe(topics, (ConsumerRebalanceListener)new TheConsumerRebalanceListener(followerAssignment, listener));
                        }
                    };
                }
            });
            partitionedStreamsLeader.start();
            partitionedStreamsFollower.start();
            TestUtils.waitForCondition(() -> followerAssignment.equals(expectedAssignment) && leaderAssignment.equals(expectedAssignment), (String)"topic assignment not completed");
        }
        finally {
            if (partitionedStreamsLeader != null) {
                partitionedStreamsLeader.close();
            }
            if (partitionedStreamsFollower != null) {
                partitionedStreamsFollower.close();
            }
        }
    }

    @Test
    public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception {
        String fMessage = "fMessage";
        String fooMessage = "fooMessage";
        Serde stringSerde = Serdes.String();
        StreamsBuilder builder = new StreamsBuilder();
        KStream pattern1Stream = builder.stream(Pattern.compile("foo.*"));
        KStream pattern2Stream = builder.stream(Pattern.compile("f.*"));
        pattern1Stream.to(this.outputTopic, Produced.with((Serde)stringSerde, (Serde)stringSerde));
        pattern2Stream.to(this.outputTopic, Produced.with((Serde)stringSerde, (Serde)stringSerde));
        AtomicBoolean expectError = new AtomicBoolean(false);
        this.streams = new KafkaStreams(builder.build(), this.streamsConfiguration);
        this.streams.setStateListener((newState, oldState) -> {
            if (newState == KafkaStreams.State.ERROR) {
                expectError.set(true);
            }
        });
        this.streams.setUncaughtExceptionHandler(e -> {
            expectError.set(true);
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
        });
        this.streams.start();
        Properties producerConfig = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Collections.singleton("fMessage"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Collections.singleton("fooMessage"), producerConfig, (Time)this.mockTime);
        Properties consumerConfig = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
        try {
            IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, this.outputTopic, 2, 5000L);
            throw new IllegalStateException("This should not happen: an assertion error should have been thrown before this.");
        }
        catch (AssertionError assertionError) {
            MatcherAssert.assertThat((Object)expectError.get(), (Matcher)CoreMatchers.is((Object)true));
            return;
        }
    }

    private static class TheConsumerRebalanceListener
    implements ConsumerRebalanceListener {
        private final List<String> assignedTopics;
        private final ConsumerRebalanceListener listener;

        TheConsumerRebalanceListener(List<String> assignedTopics, ConsumerRebalanceListener listener) {
            this.assignedTopics = assignedTopics;
            this.listener = listener;
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            for (TopicPartition partition : partitions) {
                this.assignedTopics.remove(partition.topic());
            }
            this.listener.onPartitionsRevoked(partitions);
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            for (TopicPartition partition : partitions) {
                this.assignedTopics.add(partition.topic());
            }
            Collections.sort(this.assignedTopics);
            this.listener.onPartitionsAssigned(partitions);
        }
    }
}

