/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class KStreamAggregationDedupIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final long COMMIT_INTERVAL_MS = 300L;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final MockTime mockTime;
    private static volatile int testNo = 0;
    private KStreamBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private String streamOneInput;
    private String outputTopic;
    private KGroupedStream<String, String> groupedStream;
    private Reducer<String> reducer;
    private KStream<Integer, String> stream;

    public KStreamAggregationDedupIntegrationTest() {
        this.mockTime = KStreamAggregationDedupIntegrationTest.CLUSTER.time;
    }

    @Before
    public void before() throws InterruptedException {
        this.builder = new KStreamBuilder();
        this.createTopics();
        this.streamsConfiguration = new Properties();
        String applicationId = "kgrouped-stream-test-" + ++testNo;
        this.streamsConfiguration.put("application.id", applicationId);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("commit.interval.ms", (Object)300L);
        this.streamsConfiguration.put("cache.max.bytes.buffering", (Object)0xA00000L);
        this.streamsConfiguration.put("internal.leave.group.on.close", (Object)true);
        KeyValueMapper mapper = MockKeyValueMapper.SelectValueMapper();
        this.stream = this.builder.stream(Serdes.Integer(), Serdes.String(), new String[]{this.streamOneInput});
        this.groupedStream = this.stream.groupBy(mapper, Serdes.String(), Serdes.String());
        this.reducer = new Reducer<String>(){

            public String apply(String value1, String value2) {
                return value1 + ":" + value2;
            }
        };
    }

    @After
    public void whenShuttingDown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldReduce() throws Exception {
        this.produceMessages(System.currentTimeMillis());
        this.groupedStream.reduce(this.reducer, "reduce-by-key").to(Serdes.String(), Serdes.String(), this.outputTopic);
        this.startStreams();
        this.produceMessages(System.currentTimeMillis());
        List results = this.receiveMessages((Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), 5);
        Collections.sort(results, new Comparator<KeyValue<String, String>>(){

            @Override
            public int compare(KeyValue<String, String> o1, KeyValue<String, String> o2) {
                return KStreamAggregationDedupIntegrationTest.compare(o1, o2);
            }
        });
        MatcherAssert.assertThat(results, (Matcher)Is.is(Arrays.asList(KeyValue.pair((Object)"A", (Object)"A:A"), KeyValue.pair((Object)"B", (Object)"B:B"), KeyValue.pair((Object)"C", (Object)"C:C"), KeyValue.pair((Object)"D", (Object)"D:D"), KeyValue.pair((Object)"E", (Object)"E:E"))));
    }

    private static <K extends Comparable, V extends Comparable> int compare(KeyValue<K, V> o1, KeyValue<K, V> o2) {
        int keyComparison = ((Comparable)o1.key).compareTo(o2.key);
        if (keyComparison == 0) {
            return ((Comparable)o1.value).compareTo(o2.value);
        }
        return keyComparison;
    }

    @Test
    public void shouldReduceWindowed() throws Exception {
        long firstBatchTimestamp = System.currentTimeMillis() - 1000L;
        this.produceMessages(firstBatchTimestamp);
        long secondBatchTimestamp = System.currentTimeMillis();
        this.produceMessages(secondBatchTimestamp);
        this.produceMessages(secondBatchTimestamp);
        this.groupedStream.reduce(this.reducer, (Windows)TimeWindows.of((long)500L), "reduce-time-windows").toStream((KeyValueMapper)new KeyValueMapper<Windowed<String>, String, String>(){

            public String apply(Windowed<String> windowedKey, String value) {
                return (String)windowedKey.key() + "@" + windowedKey.window().start();
            }
        }).to(Serdes.String(), Serdes.String(), this.outputTopic);
        this.startStreams();
        List windowedOutput = this.receiveMessages((Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), 10);
        Comparator<KeyValue<String, String>> comparator = new Comparator<KeyValue<String, String>>(){

            @Override
            public int compare(KeyValue<String, String> o1, KeyValue<String, String> o2) {
                return KStreamAggregationDedupIntegrationTest.compare(o1, o2);
            }
        };
        Collections.sort(windowedOutput, comparator);
        long firstBatchWindow = firstBatchTimestamp / 500L * 500L;
        long secondBatchWindow = secondBatchTimestamp / 500L * 500L;
        MatcherAssert.assertThat(windowedOutput, (Matcher)Is.is(Arrays.asList(new KeyValue((Object)("A@" + firstBatchWindow), (Object)"A"), new KeyValue((Object)("A@" + secondBatchWindow), (Object)"A:A"), new KeyValue((Object)("B@" + firstBatchWindow), (Object)"B"), new KeyValue((Object)("B@" + secondBatchWindow), (Object)"B:B"), new KeyValue((Object)("C@" + firstBatchWindow), (Object)"C"), new KeyValue((Object)("C@" + secondBatchWindow), (Object)"C:C"), new KeyValue((Object)("D@" + firstBatchWindow), (Object)"D"), new KeyValue((Object)("D@" + secondBatchWindow), (Object)"D:D"), new KeyValue((Object)("E@" + firstBatchWindow), (Object)"E"), new KeyValue((Object)("E@" + secondBatchWindow), (Object)"E:E"))));
    }

    @Test
    public void shouldGroupByKey() throws Exception {
        long timestamp = this.mockTime.milliseconds();
        this.produceMessages(timestamp);
        this.produceMessages(timestamp);
        this.stream.groupByKey(Serdes.Integer(), Serdes.String()).count((Windows)TimeWindows.of((long)500L), "count-windows").toStream((KeyValueMapper)new KeyValueMapper<Windowed<Integer>, Long, String>(){

            public String apply(Windowed<Integer> windowedKey, Long value) {
                return windowedKey.key() + "@" + windowedKey.window().start();
            }
        }).to(Serdes.String(), Serdes.Long(), this.outputTopic);
        this.startStreams();
        List results = this.receiveMessages((Deserializer)new StringDeserializer(), (Deserializer)new LongDeserializer(), 5);
        Collections.sort(results, new Comparator<KeyValue<String, Long>>(){

            @Override
            public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) {
                return KStreamAggregationDedupIntegrationTest.compare(o1, o2);
            }
        });
        long window = timestamp / 500L * 500L;
        MatcherAssert.assertThat(results, (Matcher)Is.is(Arrays.asList(KeyValue.pair((Object)("1@" + window), (Object)2L), KeyValue.pair((Object)("2@" + window), (Object)2L), KeyValue.pair((Object)("3@" + window), (Object)2L), KeyValue.pair((Object)("4@" + window), (Object)2L), KeyValue.pair((Object)("5@" + window), (Object)2L))));
    }

    private void produceMessages(long timestamp) throws ExecutionException, InterruptedException {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.streamOneInput, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)2, (Object)"B"), new KeyValue((Object)3, (Object)"C"), new KeyValue((Object)4, (Object)"D"), new KeyValue((Object)5, (Object)"E")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, (Properties)new Properties()), timestamp);
    }

    private void createTopics() throws InterruptedException {
        this.streamOneInput = "stream-one-" + testNo;
        this.outputTopic = "output-" + testNo;
        CLUSTER.createTopic(this.streamOneInput, 3, 1);
        CLUSTER.createTopic(this.outputTopic);
    }

    private void startStreams() {
        this.kafkaStreams = new KafkaStreams((TopologyBuilder)this.builder, this.streamsConfiguration);
        this.kafkaStreams.start();
    }

    private <K, V> List<KeyValue<K, V>> receiveMessages(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, int numMessages) throws InterruptedException {
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        consumerProperties.setProperty("group.id", "kgroupedstream-test-" + testNo);
        consumerProperties.setProperty("auto.offset.reset", "earliest");
        consumerProperties.setProperty("key.deserializer", keyDeserializer.getClass().getName());
        consumerProperties.setProperty("value.deserializer", valueDeserializer.getClass().getName());
        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties, this.outputTopic, numMessages, 60000L);
    }
}

