/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class KTableAggregateTest {
    private final Serde<String> stringSerde = Serdes.String();
    private KStreamTestDriver driver = null;
    private File stateDir = null;
    @Rule
    public EmbeddedKafkaCluster cluster = null;

    @After
    public void tearDown() {
        if (this.driver != null) {
            this.driver.close();
        }
        this.driver = null;
    }

    @Before
    public void setUp() throws IOException {
        this.stateDir = TestUtils.tempDirectory((String)"kafka-test");
    }

    @Test
    public void testAggBasic() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        String topic1 = "topic1";
        MockProcessorSupplier proc = new MockProcessorSupplier();
        KTable table1 = builder.table(this.stringSerde, this.stringSerde, "topic1", "anyStoreName");
        KTable table2 = table1.groupBy(MockKeyValueMapper.NoOpKeyValueMapper(), this.stringSerde, this.stringSerde).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, this.stringSerde, "topic1-Canonized");
        table2.toStream().process(proc, new String[0]);
        this.driver = new KStreamTestDriver(builder, this.stateDir);
        this.driver.process("topic1", "A", "1");
        this.driver.flushState();
        this.driver.process("topic1", "B", "2");
        this.driver.flushState();
        this.driver.process("topic1", "A", "3");
        this.driver.flushState();
        this.driver.process("topic1", "B", "4");
        this.driver.flushState();
        this.driver.process("topic1", "C", "5");
        this.driver.flushState();
        this.driver.process("topic1", "D", "6");
        this.driver.flushState();
        this.driver.process("topic1", "B", "7");
        this.driver.flushState();
        this.driver.process("topic1", "C", "8");
        this.driver.flushState();
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:0+1", "B:0+2", "A:0+1-1+3", "B:0+2-2+4", "C:0+5", "D:0+6", "B:0+2-2+4-4+7", "C:0+5-5+8"}), proc.processed);
    }

    @Test
    public void testAggCoalesced() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        String topic1 = "topic1";
        MockProcessorSupplier proc = new MockProcessorSupplier();
        KTable table1 = builder.table(this.stringSerde, this.stringSerde, "topic1", "anyStoreName");
        KTable table2 = table1.groupBy(MockKeyValueMapper.NoOpKeyValueMapper(), this.stringSerde, this.stringSerde).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, this.stringSerde, "topic1-Canonized");
        table2.toStream().process(proc, new String[0]);
        this.driver = new KStreamTestDriver(builder, this.stateDir);
        this.driver.process("topic1", "A", "1");
        this.driver.process("topic1", "A", "3");
        this.driver.process("topic1", "A", "4");
        this.driver.flushState();
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:0+4"}), proc.processed);
    }

    @Test
    public void testAggRepartition() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        String topic1 = "topic1";
        MockProcessorSupplier proc = new MockProcessorSupplier();
        KTable table1 = builder.table(this.stringSerde, this.stringSerde, "topic1", "anyStoreName");
        KTable table2 = table1.groupBy((KeyValueMapper)new KeyValueMapper<String, String, KeyValue<String, String>>(){

            public KeyValue<String, String> apply(String key, String value) {
                switch (key) {
                    case "null": {
                        return KeyValue.pair(null, (Object)value);
                    }
                    case "NULL": {
                        return null;
                    }
                }
                return KeyValue.pair((Object)value, (Object)value);
            }
        }, this.stringSerde, this.stringSerde).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, this.stringSerde, "topic1-Canonized");
        table2.toStream().process(proc, new String[0]);
        this.driver = new KStreamTestDriver(builder, this.stateDir);
        this.driver.process("topic1", "A", "1");
        this.driver.flushState();
        this.driver.process("topic1", "A", null);
        this.driver.flushState();
        this.driver.process("topic1", "A", "1");
        this.driver.flushState();
        this.driver.process("topic1", "B", "2");
        this.driver.flushState();
        this.driver.process("topic1", "null", "3");
        this.driver.flushState();
        this.driver.process("topic1", "B", "4");
        this.driver.flushState();
        this.driver.process("topic1", "NULL", "5");
        this.driver.flushState();
        this.driver.process("topic1", "B", "7");
        this.driver.flushState();
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"1:0+1", "1:0+1-1", "1:0+1-1+1", "2:0+2", "2:0+2-2", "4:0+4", "4:0+4-4", "7:0+7"}), proc.processed);
    }

    private void testCountHelper(KStreamBuilder builder, String input, MockProcessorSupplier<String, Long> proc) throws IOException {
        this.driver = new KStreamTestDriver(builder, this.stateDir);
        this.driver.process(input, "A", "green");
        this.driver.flushState();
        this.driver.process(input, "B", "green");
        this.driver.flushState();
        this.driver.process(input, "A", "blue");
        this.driver.flushState();
        this.driver.process(input, "C", "yellow");
        this.driver.flushState();
        this.driver.process(input, "D", "green");
        this.driver.flushState();
        this.driver.flushState();
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"green:1", "green:2", "green:1", "blue:1", "yellow:1", "green:2"}), proc.processed);
    }

    @Test
    public void testCount() throws IOException {
        KStreamBuilder builder = new KStreamBuilder();
        String input = "count-test-input";
        MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<String, Long>();
        builder.table(Serdes.String(), Serdes.String(), "count-test-input", "anyStoreName").groupBy(MockKeyValueMapper.SelectValueKeyValueMapper(), this.stringSerde, this.stringSerde).count("count").toStream().process(proc, new String[0]);
        this.testCountHelper(builder, "count-test-input", proc);
    }

    @Test
    public void testCountWithInternalStore() throws IOException {
        KStreamBuilder builder = new KStreamBuilder();
        String input = "count-test-input";
        MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<String, Long>();
        builder.table(Serdes.String(), Serdes.String(), "count-test-input", "anyStoreName").groupBy(MockKeyValueMapper.SelectValueKeyValueMapper(), this.stringSerde, this.stringSerde).count().toStream().process(proc, new String[0]);
        this.testCountHelper(builder, "count-test-input", proc);
    }

    @Test
    public void testCountCoalesced() throws IOException {
        KStreamBuilder builder = new KStreamBuilder();
        String input = "count-test-input";
        MockProcessorSupplier proc = new MockProcessorSupplier();
        builder.table(Serdes.String(), Serdes.String(), "count-test-input", "anyStoreName").groupBy(MockKeyValueMapper.SelectValueKeyValueMapper(), this.stringSerde, this.stringSerde).count("count").toStream().process(proc, new String[0]);
        this.driver = new KStreamTestDriver(builder, this.stateDir);
        this.driver.process("count-test-input", "A", "green");
        this.driver.process("count-test-input", "B", "green");
        this.driver.process("count-test-input", "A", "blue");
        this.driver.process("count-test-input", "C", "yellow");
        this.driver.process("count-test-input", "D", "green");
        this.driver.flushState();
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"blue:1", "yellow:1", "green:2"}), proc.processed);
    }

    @Test
    public void testRemoveOldBeforeAddNew() throws IOException {
        KStreamBuilder builder = new KStreamBuilder();
        String input = "count-test-input";
        MockProcessorSupplier proc = new MockProcessorSupplier();
        builder.table(Serdes.String(), Serdes.String(), "count-test-input", "anyStoreName").groupBy((KeyValueMapper)new KeyValueMapper<String, String, KeyValue<String, String>>(){

            public KeyValue<String, String> apply(String key, String value) {
                return KeyValue.pair((Object)String.valueOf(key.charAt(0)), (Object)String.valueOf(key.charAt(1)));
            }
        }, this.stringSerde, this.stringSerde).aggregate((Initializer)new Initializer<String>(){

            public String apply() {
                return "";
            }
        }, (Aggregator)new Aggregator<String, String, String>(){

            public String apply(String aggKey, String value, String aggregate) {
                return aggregate + value;
            }
        }, (Aggregator)new Aggregator<String, String, String>(){

            public String apply(String key, String value, String aggregate) {
                return aggregate.replaceAll(value, "");
            }
        }, Serdes.String(), "someStore").toStream().process(proc, new String[0]);
        this.driver = new KStreamTestDriver(builder, this.stateDir);
        this.driver.process("count-test-input", "11", "A");
        this.driver.flushState();
        this.driver.process("count-test-input", "12", "B");
        this.driver.flushState();
        this.driver.process("count-test-input", "11", null);
        this.driver.flushState();
        this.driver.process("count-test-input", "12", "C");
        this.driver.flushState();
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"1:1", "1:12", "1:2", "1:2"}), proc.processed);
    }

    @Test
    public void shouldForwardToCorrectProcessorNodeWhenMultiCacheEvictions() throws Exception {
        String tableOne = "tableOne";
        String tableTwo = "tableTwo";
        KStreamBuilder builder = new KStreamBuilder();
        String reduceTopic = "TestDriver-reducer-store-repartition";
        final HashMap reduceResults = new HashMap();
        KTable one = builder.table(Serdes.String(), Serdes.String(), "tableOne", "tableOne");
        KTable two = builder.table(Serdes.Long(), Serdes.String(), "tableTwo", "tableTwo");
        KTable reduce = two.groupBy((KeyValueMapper)new KeyValueMapper<Long, String, KeyValue<String, Long>>(){

            public KeyValue<String, Long> apply(Long key, String value) {
                return new KeyValue((Object)value, (Object)key);
            }
        }, Serdes.String(), Serdes.Long()).reduce((Reducer)new Reducer<Long>(){

            public Long apply(Long value1, Long value2) {
                return value1 + value2;
            }
        }, (Reducer)new Reducer<Long>(){

            public Long apply(Long value1, Long value2) {
                return value1 - value2;
            }
        }, "reducer-store");
        reduce.foreach((ForeachAction)new ForeachAction<String, Long>(){

            public void apply(String key, Long value) {
                reduceResults.put(key, value);
            }
        });
        one.leftJoin(reduce, (ValueJoiner)new ValueJoiner<String, Long, String>(){

            public String apply(String value1, Long value2) {
                return value1 + ":" + value2;
            }
        }).mapValues((ValueMapper)new ValueMapper<String, String>(){

            public String apply(String value) {
                return value;
            }
        });
        this.driver = new KStreamTestDriver(builder, this.stateDir, 111L);
        this.driver.process("TestDriver-reducer-store-repartition", "1", new Change((Object)1L, null));
        this.driver.process("tableOne", "2", "2");
        this.driver.process("TestDriver-reducer-store-repartition", "2", new Change((Object)2L, null));
        this.driver.process("TestDriver-reducer-store-repartition", "2", new Change((Object)2L, null));
        Assert.assertEquals((Object)2L, reduceResults.get("2"));
        this.driver.process("tableOne", "1", "5");
        Assert.assertEquals((Object)4L, reduceResults.get("2"));
    }
}

