/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class FanoutIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final long COMMIT_INTERVAL_MS = 300L;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final MockTime mockTime;
    private static final String INPUT_TOPIC_A = "A";
    private static final String OUTPUT_TOPIC_B = "B";
    private static final String OUTPUT_TOPIC_C = "C";

    public FanoutIntegrationTest() {
        this.mockTime = FanoutIntegrationTest.CLUSTER.time;
    }

    @BeforeClass
    public static void startKafkaCluster() throws InterruptedException {
        CLUSTER.createTopics(INPUT_TOPIC_A, OUTPUT_TOPIC_B, OUTPUT_TOPIC_C);
    }

    @Test
    public void shouldFanoutTheInput() throws Exception {
        List<String> inputValues = Arrays.asList("Hello", "World");
        ArrayList<String> expectedValuesForB = new ArrayList<String>();
        ArrayList<String> expectedValuesForC = new ArrayList<String>();
        for (String input : inputValues) {
            expectedValuesForB.add(input.toUpperCase(Locale.getDefault()));
            expectedValuesForC.add(input.toLowerCase(Locale.getDefault()));
        }
        StreamsBuilder builder = new StreamsBuilder();
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("application.id", "fanout-integration-test");
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("default.value.serde", Serdes.String().getClass().getName());
        streamsConfiguration.put("commit.interval.ms", (Object)300L);
        streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        streamsConfiguration.put("auto.offset.reset", "earliest");
        KStream stream1 = builder.stream(INPUT_TOPIC_A);
        KStream stream2 = stream1.mapValues((ValueMapper)new ValueMapper<String, String>(){

            public String apply(String value) {
                return value.toUpperCase(Locale.getDefault());
            }
        });
        KStream stream3 = stream1.mapValues((ValueMapper)new ValueMapper<String, String>(){

            public String apply(String value) {
                return value.toLowerCase(Locale.getDefault());
            }
        });
        stream2.to(OUTPUT_TOPIC_B);
        stream3.to(OUTPUT_TOPIC_C);
        KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
        streams.start();
        Properties producerConfig = new Properties();
        producerConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        producerConfig.put("acks", "all");
        producerConfig.put("retries", (Object)0);
        producerConfig.put("key.serializer", ByteArraySerializer.class);
        producerConfig.put("value.serializer", StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(INPUT_TOPIC_A, inputValues, producerConfig, (Time)this.mockTime);
        Properties consumerConfigB = new Properties();
        consumerConfigB.put("bootstrap.servers", CLUSTER.bootstrapServers());
        consumerConfigB.put("group.id", "fanout-integration-test-standard-consumer-topicB");
        consumerConfigB.put("auto.offset.reset", "earliest");
        consumerConfigB.put("key.deserializer", ByteArrayDeserializer.class);
        consumerConfigB.put("value.deserializer", StringDeserializer.class);
        List actualValuesForB = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigB, OUTPUT_TOPIC_B, inputValues.size());
        Assert.assertThat(actualValuesForB, (Matcher)CoreMatchers.equalTo(expectedValuesForB));
        Properties consumerConfigC = new Properties();
        consumerConfigC.put("bootstrap.servers", CLUSTER.bootstrapServers());
        consumerConfigC.put("group.id", "fanout-integration-test-standard-consumer-topicC");
        consumerConfigC.put("auto.offset.reset", "earliest");
        consumerConfigC.put("key.deserializer", ByteArrayDeserializer.class);
        consumerConfigC.put("value.deserializer", StringDeserializer.class);
        List actualValuesForC = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigC, OUTPUT_TOPIC_C, inputValues.size());
        streams.close();
        Assert.assertThat(actualValuesForC, (Matcher)CoreMatchers.equalTo(expectedValuesForC));
    }
}

