/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.serialization.DoubleSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KGroupedTableImplTest {
    private final StreamsBuilder builder = new StreamsBuilder();
    private static final String INVALID_STORE_NAME = "~foo bar~";
    private KGroupedTable<String, String> groupedTable;
    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.Integer());
    private final String topic = "input";

    @Before
    public void before() {
        this.groupedTable = this.builder.table("blah", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupBy(MockMapper.selectValueKeyValueMapper());
    }

    @Test(expected=InvalidTopicException.class)
    public void shouldNotAllowInvalidStoreNameOnAggregate() {
        this.groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, Materialized.as((String)INVALID_STORE_NAME));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullInitializerOnAggregate() {
        this.groupedTable.aggregate(null, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, Materialized.as((String)"store"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullAdderOnAggregate() {
        this.groupedTable.aggregate(MockInitializer.STRING_INIT, null, MockAggregator.TOSTRING_REMOVER, Materialized.as((String)"store"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullSubtractorOnAggregate() {
        this.groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null, Materialized.as((String)"store"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullAdderOnReduce() {
        this.groupedTable.reduce(null, MockReducer.STRING_REMOVER, Materialized.as((String)"store"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullSubtractorOnReduce() {
        this.groupedTable.reduce(MockReducer.STRING_ADDER, null, Materialized.as((String)"store"));
    }

    @Test(expected=InvalidTopicException.class)
    public void shouldNotAllowInvalidStoreNameOnReduce() {
        this.groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, Materialized.as((String)INVALID_STORE_NAME));
    }

    private Map<String, Integer> getReducedResults(KTable<String, Integer> inputKTable) {
        final HashMap<String, Integer> reducedResults = new HashMap<String, Integer>();
        inputKTable.toStream().foreach((ForeachAction)new ForeachAction<String, Integer>(){

            public void apply(String key, Integer value) {
                reducedResults.put(key, value);
            }
        });
        return reducedResults;
    }

    private void assertReduced(Map<String, Integer> reducedResults, String topic, TopologyTestDriver driver) {
        ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new DoubleSerializer());
        driver.pipeInput(recordFactory.create(topic, (Object)"A", (Object)1.1, 10L));
        driver.pipeInput(recordFactory.create(topic, (Object)"B", (Object)2.2, 10L));
        Assert.assertEquals((Object)1, (Object)reducedResults.get("A"));
        Assert.assertEquals((Object)2, (Object)reducedResults.get("B"));
        driver.pipeInput(recordFactory.create(topic, (Object)"A", (Object)2.6, 10L));
        driver.pipeInput(recordFactory.create(topic, (Object)"B", (Object)1.3, 10L));
        driver.pipeInput(recordFactory.create(topic, (Object)"A", (Object)5.7, 10L));
        driver.pipeInput(recordFactory.create(topic, (Object)"B", (Object)6.2, 10L));
        Assert.assertEquals((Object)5, (Object)reducedResults.get("A"));
        Assert.assertEquals((Object)6, (Object)reducedResults.get("B"));
    }

    @Test
    public void shouldReduce() {
        KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection = new KeyValueMapper<String, Number, KeyValue<String, Integer>>(){

            public KeyValue<String, Integer> apply(String key, Number value) {
                return KeyValue.pair((Object)key, (Object)value.intValue());
            }
        };
        KTable reduced = this.builder.table("input", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Double()), Materialized.as((String)"store").withKeySerde(Serdes.String()).withValueSerde(Serdes.Double())).groupBy((KeyValueMapper)intProjection).reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR, Materialized.as((String)"reduced"));
        Map<String, Integer> results = this.getReducedResults((KTable<String, Integer>)reduced);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.assertReduced(results, "input", driver);
            Assert.assertEquals((Object)reduced.queryableStoreName(), (Object)"reduced");
        }
    }

    @Test
    public void shouldReduceWithInternalStoreName() {
        KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection = new KeyValueMapper<String, Number, KeyValue<String, Integer>>(){

            public KeyValue<String, Integer> apply(String key, Number value) {
                return KeyValue.pair((Object)key, (Object)value.intValue());
            }
        };
        KTable reduced = this.builder.table("input", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Double()), Materialized.as((String)"store").withKeySerde(Serdes.String()).withValueSerde(Serdes.Double())).groupBy((KeyValueMapper)intProjection).reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR);
        Map<String, Integer> results = this.getReducedResults((KTable<String, Integer>)reduced);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.assertReduced(results, "input", driver);
            Assert.assertNull((Object)reduced.queryableStoreName());
        }
    }

    @Test
    public void shouldReduceAndMaterializeResults() {
        KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection = new KeyValueMapper<String, Number, KeyValue<String, Integer>>(){

            public KeyValue<String, Integer> apply(String key, Number value) {
                return KeyValue.pair((Object)key, (Object)value.intValue());
            }
        };
        KTable reduced = this.builder.table("input", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Double())).groupBy((KeyValueMapper)intProjection).reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR, Materialized.as((String)"reduce").withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer()));
        Map<String, Integer> results = this.getReducedResults((KTable<String, Integer>)reduced);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.assertReduced(results, "input", driver);
            KeyValueStore reduce = driver.getKeyValueStore("reduce");
            MatcherAssert.assertThat((Object)reduce.get((Object)"A"), (Matcher)CoreMatchers.equalTo((Object)5));
            MatcherAssert.assertThat((Object)reduce.get((Object)"B"), (Matcher)CoreMatchers.equalTo((Object)6));
        }
    }

    @Test
    public void shouldCountAndMaterializeResults() {
        KTable table = this.builder.table("input", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        table.groupBy(MockMapper.selectValueKeyValueMapper(), Serialized.with((Serde)Serdes.String(), (Serde)Serdes.String())).count(Materialized.as((String)"count").withKeySerde(Serdes.String()).withValueSerde(Serdes.Long()));
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData("input", driver);
            KeyValueStore counts = driver.getKeyValueStore("count");
            MatcherAssert.assertThat((Object)counts.get((Object)"1"), (Matcher)CoreMatchers.equalTo((Object)3L));
            MatcherAssert.assertThat((Object)counts.get((Object)"2"), (Matcher)CoreMatchers.equalTo((Object)2L));
        }
    }

    @Test
    public void shouldAggregateAndMaterializeResults() {
        KTable table = this.builder.table("input", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        table.groupBy(MockMapper.selectValueKeyValueMapper(), Serialized.with((Serde)Serdes.String(), (Serde)Serdes.String())).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, Materialized.as((String)"aggregate").withValueSerde(Serdes.String()).withKeySerde(Serdes.String()));
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData("input", driver);
            KeyValueStore aggregate = driver.getKeyValueStore("aggregate");
            MatcherAssert.assertThat((Object)aggregate.get((Object)"1"), (Matcher)CoreMatchers.equalTo((Object)"0+1+1+1"));
            MatcherAssert.assertThat((Object)aggregate.get((Object)"2"), (Matcher)CoreMatchers.equalTo((Object)"0+2+2"));
        }
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointOnCountWhenMaterializedIsNull() {
        this.groupedTable.count((Materialized)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnReduceWhenMaterializedIsNull() {
        this.groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, (Materialized)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnReduceWhenAdderIsNull() {
        this.groupedTable.reduce(null, MockReducer.STRING_REMOVER, Materialized.as((String)"store"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnReduceWhenSubtractorIsNull() {
        this.groupedTable.reduce(MockReducer.STRING_ADDER, null, Materialized.as((String)"store"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnAggregateWhenInitializerIsNull() {
        this.groupedTable.aggregate(null, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, Materialized.as((String)"store"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnAggregateWhenAdderIsNull() {
        this.groupedTable.aggregate(MockInitializer.STRING_INIT, null, MockAggregator.TOSTRING_REMOVER, Materialized.as((String)"store"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnAggregateWhenSubtractorIsNull() {
        this.groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null, Materialized.as((String)"store"));
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnAggregateWhenMaterializedIsNull() {
        this.groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, (Materialized)null);
    }

    private void processData(String topic, TopologyTestDriver driver) {
        ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        driver.pipeInput(recordFactory.create(topic, (Object)"A", (Object)"1"));
        driver.pipeInput(recordFactory.create(topic, (Object)"B", (Object)"1"));
        driver.pipeInput(recordFactory.create(topic, (Object)"C", (Object)"1"));
        driver.pipeInput(recordFactory.create(topic, (Object)"D", (Object)"2"));
        driver.pipeInput(recordFactory.create(topic, (Object)"E", (Object)"2"));
    }
}

