/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.io.File;
import java.util.HashMap;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class KTableAggregateTest {
    private final Serde<String> stringSerde = Serdes.String();
    private final Consumed<String, String> consumed = Consumed.with(this.stringSerde, this.stringSerde);
    private final Serialized<String, String> stringSerialzied = Serialized.with(this.stringSerde, this.stringSerde);
    private final MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier();
    private File stateDir = null;
    @Rule
    public EmbeddedKafkaCluster cluster = null;
    @Rule
    public final KStreamTestDriver driver = new KStreamTestDriver();

    @Before
    public void setUp() {
        this.stateDir = TestUtils.tempDirectory((String)"kafka-test");
    }

    @Test
    public void testAggBasic() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTable table1 = builder.table("topic1", this.consumed);
        KTable table2 = table1.groupBy(MockMapper.noOpKeyValueMapper(), this.stringSerialzied).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, Materialized.as((String)"topic1-Canonized").withValueSerde(this.stringSerde));
        table2.toStream().process(this.supplier, new String[0]);
        this.driver.setUp(builder, this.stateDir, Serdes.String(), Serdes.String());
        this.driver.process("topic1", "A", "1");
        this.driver.flushState();
        this.driver.process("topic1", "B", "2");
        this.driver.flushState();
        this.driver.process("topic1", "A", "3");
        this.driver.flushState();
        this.driver.process("topic1", "B", "4");
        this.driver.flushState();
        this.driver.process("topic1", "C", "5");
        this.driver.flushState();
        this.driver.process("topic1", "D", "6");
        this.driver.flushState();
        this.driver.process("topic1", "B", "7");
        this.driver.flushState();
        this.driver.process("topic1", "C", "8");
        this.driver.flushState();
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:0+1", "B:0+2", "A:0+1-1+3", "B:0+2-2+4", "C:0+5", "D:0+6", "B:0+2-2+4-4+7", "C:0+5-5+8"}), this.supplier.theCapturedProcessor().processed);
    }

    @Test
    public void testAggCoalesced() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTable table1 = builder.table("topic1", this.consumed);
        KTable table2 = table1.groupBy(MockMapper.noOpKeyValueMapper(), this.stringSerialzied).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, Materialized.as((String)"topic1-Canonized").withValueSerde(this.stringSerde));
        table2.toStream().process(this.supplier, new String[0]);
        this.driver.setUp(builder, this.stateDir);
        this.driver.process("topic1", "A", "1");
        this.driver.process("topic1", "A", "3");
        this.driver.process("topic1", "A", "4");
        this.driver.flushState();
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:0+4"}), this.supplier.theCapturedProcessor().processed);
    }

    @Test
    public void testAggRepartition() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTable table1 = builder.table("topic1", this.consumed);
        KTable table2 = table1.groupBy((KeyValueMapper)new KeyValueMapper<String, String, KeyValue<String, String>>(){

            public KeyValue<String, String> apply(String key, String value) {
                switch (key) {
                    case "null": {
                        return KeyValue.pair(null, (Object)value);
                    }
                    case "NULL": {
                        return null;
                    }
                }
                return KeyValue.pair((Object)value, (Object)value);
            }
        }, this.stringSerialzied).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, Materialized.as((String)"topic1-Canonized").withValueSerde(this.stringSerde));
        table2.toStream().process(this.supplier, new String[0]);
        this.driver.setUp(builder, this.stateDir);
        this.driver.process("topic1", "A", "1");
        this.driver.flushState();
        this.driver.process("topic1", "A", null);
        this.driver.flushState();
        this.driver.process("topic1", "A", "1");
        this.driver.flushState();
        this.driver.process("topic1", "B", "2");
        this.driver.flushState();
        this.driver.process("topic1", "null", "3");
        this.driver.flushState();
        this.driver.process("topic1", "B", "4");
        this.driver.flushState();
        this.driver.process("topic1", "NULL", "5");
        this.driver.flushState();
        this.driver.process("topic1", "B", "7");
        this.driver.flushState();
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"1:0+1", "1:0+1-1", "1:0+1-1+1", "2:0+2", "2:0+2-2", "4:0+4", "4:0+4-4", "7:0+7"}), this.supplier.theCapturedProcessor().processed);
    }

    private void testCountHelper(StreamsBuilder builder, String input, MockProcessorSupplier<String, Object> supplier) {
        this.driver.setUp(builder, this.stateDir);
        this.driver.process(input, "A", "green");
        this.driver.flushState();
        this.driver.process(input, "B", "green");
        this.driver.flushState();
        this.driver.process(input, "A", "blue");
        this.driver.flushState();
        this.driver.process(input, "C", "yellow");
        this.driver.flushState();
        this.driver.process(input, "D", "green");
        this.driver.flushState();
        this.driver.flushState();
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"green:1", "green:2", "green:1", "blue:1", "yellow:1", "green:2"}), supplier.theCapturedProcessor().processed);
    }

    @Test
    public void testCount() {
        StreamsBuilder builder = new StreamsBuilder();
        String input = "count-test-input";
        builder.table("count-test-input", this.consumed).groupBy(MockMapper.selectValueKeyValueMapper(), this.stringSerialzied).count(Materialized.as((String)"count")).toStream().process(this.supplier, new String[0]);
        this.testCountHelper(builder, "count-test-input", this.supplier);
    }

    @Test
    public void testCountWithInternalStore() {
        StreamsBuilder builder = new StreamsBuilder();
        String input = "count-test-input";
        builder.table("count-test-input", this.consumed).groupBy(MockMapper.selectValueKeyValueMapper(), this.stringSerialzied).count().toStream().process(this.supplier, new String[0]);
        this.testCountHelper(builder, "count-test-input", this.supplier);
    }

    @Test
    public void testCountCoalesced() {
        StreamsBuilder builder = new StreamsBuilder();
        String input = "count-test-input";
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        builder.table("count-test-input", this.consumed).groupBy(MockMapper.selectValueKeyValueMapper(), this.stringSerialzied).count(Materialized.as((String)"count")).toStream().process(supplier, new String[0]);
        this.driver.setUp(builder, this.stateDir);
        MockProcessor proc = supplier.theCapturedProcessor();
        this.driver.process("count-test-input", "A", "green");
        this.driver.process("count-test-input", "B", "green");
        this.driver.process("count-test-input", "A", "blue");
        this.driver.process("count-test-input", "C", "yellow");
        this.driver.process("count-test-input", "D", "green");
        this.driver.flushState();
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"blue:1", "yellow:1", "green:2"}), proc.processed);
    }

    @Test
    public void testRemoveOldBeforeAddNew() {
        StreamsBuilder builder = new StreamsBuilder();
        String input = "count-test-input";
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        builder.table("count-test-input", this.consumed).groupBy((KeyValueMapper)new KeyValueMapper<String, String, KeyValue<String, String>>(){

            public KeyValue<String, String> apply(String key, String value) {
                return KeyValue.pair((Object)String.valueOf(key.charAt(0)), (Object)String.valueOf(key.charAt(1)));
            }
        }, this.stringSerialzied).aggregate((Initializer)new Initializer<String>(){

            public String apply() {
                return "";
            }
        }, (Aggregator)new Aggregator<String, String, String>(){

            public String apply(String aggKey, String value, String aggregate) {
                return aggregate + value;
            }
        }, (Aggregator)new Aggregator<String, String, String>(){

            public String apply(String key, String value, String aggregate) {
                return aggregate.replaceAll(value, "");
            }
        }, Materialized.as((String)"someStore").withValueSerde(Serdes.String())).toStream().process(supplier, new String[0]);
        this.driver.setUp(builder, this.stateDir);
        MockProcessor proc = supplier.theCapturedProcessor();
        this.driver.process("count-test-input", "11", "A");
        this.driver.flushState();
        this.driver.process("count-test-input", "12", "B");
        this.driver.flushState();
        this.driver.process("count-test-input", "11", null);
        this.driver.flushState();
        this.driver.process("count-test-input", "12", "C");
        this.driver.flushState();
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"1:1", "1:12", "1:2", "1:2"}), proc.processed);
    }

    @Test
    public void shouldForwardToCorrectProcessorNodeWhenMultiCacheEvictions() {
        String tableOne = "tableOne";
        String tableTwo = "tableTwo";
        StreamsBuilder builder = new StreamsBuilder();
        String reduceTopic = "TestDriver-reducer-store-repartition";
        final HashMap reduceResults = new HashMap();
        KTable one = builder.table("tableOne", this.consumed);
        KTable two = builder.table("tableTwo", Consumed.with((Serde)Serdes.Long(), (Serde)Serdes.String()));
        KTable reduce = two.groupBy((KeyValueMapper)new KeyValueMapper<Long, String, KeyValue<String, Long>>(){

            public KeyValue<String, Long> apply(Long key, String value) {
                return new KeyValue((Object)value, (Object)key);
            }
        }, Serialized.with((Serde)Serdes.String(), (Serde)Serdes.Long())).reduce((Reducer)new Reducer<Long>(){

            public Long apply(Long value1, Long value2) {
                return value1 + value2;
            }
        }, (Reducer)new Reducer<Long>(){

            public Long apply(Long value1, Long value2) {
                return value1 - value2;
            }
        }, Materialized.as((String)"reducer-store"));
        reduce.toStream().foreach((ForeachAction)new ForeachAction<String, Long>(){

            public void apply(String key, Long value) {
                reduceResults.put(key, value);
            }
        });
        one.leftJoin(reduce, (ValueJoiner)new ValueJoiner<String, Long, String>(){

            public String apply(String value1, Long value2) {
                return value1 + ":" + value2;
            }
        }).mapValues((ValueMapper)new ValueMapper<String, String>(){

            public String apply(String value) {
                return value;
            }
        });
        this.driver.setUp(builder, this.stateDir, 111L);
        this.driver.process("TestDriver-reducer-store-repartition", "1", new Change((Object)1L, null));
        this.driver.process("tableOne", "2", "2");
        this.driver.process("TestDriver-reducer-store-repartition", "2", new Change((Object)2L, null));
        this.driver.process("TestDriver-reducer-store-repartition", "2", new Change((Object)2L, null));
        Assert.assertEquals((Object)2L, reduceResults.get("2"));
        this.driver.process("tableOne", "1", "5");
        Assert.assertEquals((Object)4L, reduceResults.get("2"));
    }
}

