/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.lang.reflect.Field;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyTestDriverWrapper;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.ChangedDeserializer;
import org.apache.kafka.streams.kstream.internals.ChangedSerializer;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KTableImplTest {
    private final Consumed<String, String> consumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
    private final Produced<String, String> produced = Produced.with((Serde)Serdes.String(), (Serde)Serdes.String());
    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new StringSerializer());
    private StreamsBuilder builder;
    private KTable<String, String> table;

    @Before
    public void setUp() {
        this.builder = new StreamsBuilder();
        this.table = this.builder.table("test");
    }

    @Test
    public void testKTable() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        String topic2 = "topic2";
        KTable table1 = builder.table(topic1, this.consumed);
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        table1.toStream().process(supplier, new String[0]);
        KTable table2 = table1.mapValues((ValueMapper)new ValueMapper<String, Integer>(){

            public Integer apply(String value) {
                return new Integer(value);
            }
        });
        table2.toStream().process(supplier, new String[0]);
        KTable table3 = table2.filter((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        table3.toStream().process(supplier, new String[0]);
        table1.toStream().to(topic2, this.produced);
        KTable table4 = builder.table(topic2, this.consumed);
        table4.toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"A", (Object)"01"));
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"B", (Object)"02"));
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"C", (Object)"03"));
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"D", (Object)"04"));
        }
        List processors = supplier.capturedProcessors(4);
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:01", "B:02", "C:03", "D:04"}), processors.get((int)0).processed);
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:1", "B:2", "C:3", "D:4"}), processors.get((int)1).processed);
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:null", "B:2", "C:null", "D:4"}), processors.get((int)2).processed);
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:01", "B:02", "C:03", "D:04"}), processors.get((int)3).processed);
    }

    @Test
    public void testValueGetter() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        String topic2 = "topic2";
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.consumed);
        KTableImpl table2 = (KTableImpl)table1.mapValues((ValueMapper)new ValueMapper<String, Integer>(){

            public Integer apply(String value) {
                return new Integer(value);
            }
        });
        KTableImpl table3 = (KTableImpl)table2.filter((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        table1.toStream().to("topic2", this.produced);
        KTableImpl table4 = (KTableImpl)builder.table("topic2", this.consumed);
        Topology topology = builder.build();
        KTableValueGetterSupplier getterSupplier1 = table1.valueGetterSupplier();
        KTableValueGetterSupplier getterSupplier2 = table2.valueGetterSupplier();
        KTableValueGetterSupplier getterSupplier3 = table3.valueGetterSupplier();
        KTableValueGetterSupplier getterSupplier4 = table4.valueGetterSupplier();
        InternalTopologyBuilder topologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
        topologyBuilder.connectProcessorAndStateStores(table1.name, getterSupplier1.storeNames());
        topologyBuilder.connectProcessorAndStateStores(table2.name, getterSupplier2.storeNames());
        topologyBuilder.connectProcessorAndStateStores(table3.name, getterSupplier3.storeNames());
        topologyBuilder.connectProcessorAndStateStores(table4.name, getterSupplier4.storeNames());
        try (TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(topology, this.props);){
            Assert.assertEquals((long)2L, (long)driver.getAllStateStores().size());
            KTableValueGetter getter1 = getterSupplier1.get();
            KTableValueGetter getter2 = getterSupplier2.get();
            KTableValueGetter getter3 = getterSupplier3.get();
            KTableValueGetter getter4 = getterSupplier4.get();
            getter1.init(driver.setCurrentNodeForProcessorContext(table1.name));
            getter2.init(driver.setCurrentNodeForProcessorContext(table2.name));
            getter3.init(driver.setCurrentNodeForProcessorContext(table3.name));
            getter4.init(driver.setCurrentNodeForProcessorContext(table4.name));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"01"));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"01"));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"C", (Object)"01"));
            Assert.assertEquals((Object)"01", (Object)getter1.get((Object)"A"));
            Assert.assertEquals((Object)"01", (Object)getter1.get((Object)"B"));
            Assert.assertEquals((Object)"01", (Object)getter1.get((Object)"C"));
            Assert.assertEquals((Object)new Integer(1), (Object)getter2.get((Object)"A"));
            Assert.assertEquals((Object)new Integer(1), (Object)getter2.get((Object)"B"));
            Assert.assertEquals((Object)new Integer(1), (Object)getter2.get((Object)"C"));
            Assert.assertNull((Object)getter3.get((Object)"A"));
            Assert.assertNull((Object)getter3.get((Object)"B"));
            Assert.assertNull((Object)getter3.get((Object)"C"));
            Assert.assertEquals((Object)"01", (Object)getter4.get((Object)"A"));
            Assert.assertEquals((Object)"01", (Object)getter4.get((Object)"B"));
            Assert.assertEquals((Object)"01", (Object)getter4.get((Object)"C"));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"02"));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"02"));
            Assert.assertEquals((Object)"02", (Object)getter1.get((Object)"A"));
            Assert.assertEquals((Object)"02", (Object)getter1.get((Object)"B"));
            Assert.assertEquals((Object)"01", (Object)getter1.get((Object)"C"));
            Assert.assertEquals((Object)new Integer(2), (Object)getter2.get((Object)"A"));
            Assert.assertEquals((Object)new Integer(2), (Object)getter2.get((Object)"B"));
            Assert.assertEquals((Object)new Integer(1), (Object)getter2.get((Object)"C"));
            Assert.assertEquals((Object)new Integer(2), (Object)getter3.get((Object)"A"));
            Assert.assertEquals((Object)new Integer(2), (Object)getter3.get((Object)"B"));
            Assert.assertNull((Object)getter3.get((Object)"C"));
            Assert.assertEquals((Object)"02", (Object)getter4.get((Object)"A"));
            Assert.assertEquals((Object)"02", (Object)getter4.get((Object)"B"));
            Assert.assertEquals((Object)"01", (Object)getter4.get((Object)"C"));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"03"));
            Assert.assertEquals((Object)"03", (Object)getter1.get((Object)"A"));
            Assert.assertEquals((Object)"02", (Object)getter1.get((Object)"B"));
            Assert.assertEquals((Object)"01", (Object)getter1.get((Object)"C"));
            Assert.assertEquals((Object)new Integer(3), (Object)getter2.get((Object)"A"));
            Assert.assertEquals((Object)new Integer(2), (Object)getter2.get((Object)"B"));
            Assert.assertEquals((Object)new Integer(1), (Object)getter2.get((Object)"C"));
            Assert.assertNull((Object)getter3.get((Object)"A"));
            Assert.assertEquals((Object)new Integer(2), (Object)getter3.get((Object)"B"));
            Assert.assertNull((Object)getter3.get((Object)"C"));
            Assert.assertEquals((Object)"03", (Object)getter4.get((Object)"A"));
            Assert.assertEquals((Object)"02", (Object)getter4.get((Object)"B"));
            Assert.assertEquals((Object)"01", (Object)getter4.get((Object)"C"));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)null));
            Assert.assertNull((Object)getter1.get((Object)"A"));
            Assert.assertEquals((Object)"02", (Object)getter1.get((Object)"B"));
            Assert.assertEquals((Object)"01", (Object)getter1.get((Object)"C"));
            Assert.assertNull((Object)getter2.get((Object)"A"));
            Assert.assertEquals((Object)new Integer(2), (Object)getter2.get((Object)"B"));
            Assert.assertEquals((Object)new Integer(1), (Object)getter2.get((Object)"C"));
            Assert.assertNull((Object)getter3.get((Object)"A"));
            Assert.assertEquals((Object)new Integer(2), (Object)getter3.get((Object)"B"));
            Assert.assertNull((Object)getter3.get((Object)"C"));
            Assert.assertNull((Object)getter4.get((Object)"A"));
            Assert.assertEquals((Object)"02", (Object)getter4.get((Object)"B"));
            Assert.assertEquals((Object)"01", (Object)getter4.get((Object)"C"));
        }
    }

    @Test
    public void testStateStoreLazyEval() {
        String topic1 = "topic1";
        String topic2 = "topic2";
        StreamsBuilder builder = new StreamsBuilder();
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.consumed);
        builder.table("topic2", this.consumed);
        KTableImpl table1Mapped = (KTableImpl)table1.mapValues((ValueMapper)new ValueMapper<String, Integer>(){

            public Integer apply(String value) {
                return new Integer(value);
            }
        });
        table1Mapped.filter((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            Assert.assertEquals((long)2L, (long)driver.getAllStateStores().size());
        }
    }

    @Test
    public void testStateStore() {
        String topic1 = "topic1";
        String topic2 = "topic2";
        StreamsBuilder builder = new StreamsBuilder();
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.consumed);
        KTableImpl table2 = (KTableImpl)builder.table("topic2", this.consumed);
        KTableImpl table1Mapped = (KTableImpl)table1.mapValues((ValueMapper)new ValueMapper<String, Integer>(){

            public Integer apply(String value) {
                return new Integer(value);
            }
        });
        KTableImpl table1MappedFiltered = (KTableImpl)table1Mapped.filter((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        table2.join((KTable)table1MappedFiltered, (ValueJoiner)new ValueJoiner<String, Integer, String>(){

            public String apply(String v1, Integer v2) {
                return v1 + v2;
            }
        });
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            Assert.assertEquals((long)2L, (long)driver.getAllStateStores().size());
        }
    }

    private void assertTopologyContainsProcessor(Topology topology, String processorName) {
        for (TopologyDescription.Subtopology subtopology : topology.describe().subtopologies()) {
            for (TopologyDescription.Node node : subtopology.nodes()) {
                if (!node.name().equals(processorName)) continue;
                return;
            }
        }
        throw new AssertionError((Object)("No processor named '" + processorName + "'" + "found in the provided Topology:\n" + topology.describe()));
    }

    @Test
    public void shouldCreateSourceAndSinkNodesForRepartitioningTopic() throws NoSuchFieldException, IllegalAccessException {
        String topic1 = "topic1";
        String storeName1 = "storeName1";
        StreamsBuilder builder = new StreamsBuilder();
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.consumed, Materialized.as((String)"storeName1").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        table1.groupBy(MockMapper.noOpKeyValueMapper()).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, Materialized.as((String)"mock-result1"));
        table1.groupBy(MockMapper.noOpKeyValueMapper()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, Materialized.as((String)"mock-result2"));
        Topology topology = builder.build();
        try (TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(topology, this.props);){
            Assert.assertEquals((long)3L, (long)driver.getAllStateStores().size());
            this.assertTopologyContainsProcessor(topology, "KSTREAM-SINK-0000000003");
            this.assertTopologyContainsProcessor(topology, "KSTREAM-SOURCE-0000000004");
            this.assertTopologyContainsProcessor(topology, "KSTREAM-SINK-0000000007");
            this.assertTopologyContainsProcessor(topology, "KSTREAM-SOURCE-0000000008");
            Field valSerializerField = ((SinkNode)driver.getProcessor("KSTREAM-SINK-0000000003")).getClass().getDeclaredField("valSerializer");
            Field valDeserializerField = ((SourceNode)driver.getProcessor("KSTREAM-SOURCE-0000000004")).getClass().getDeclaredField("valDeserializer");
            valSerializerField.setAccessible(true);
            valDeserializerField.setAccessible(true);
            Assert.assertNotNull((Object)((ChangedSerializer)valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000003"))).inner());
            Assert.assertNotNull((Object)((ChangedDeserializer)valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000004"))).inner());
            Assert.assertNotNull((Object)((ChangedSerializer)valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000007"))).inner());
            Assert.assertNotNull((Object)((ChangedDeserializer)valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000008"))).inner());
        }
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullSelectorOnToStream() {
        this.table.toStream(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullPredicateOnFilter() {
        this.table.filter(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullPredicateOnFilterNot() {
        this.table.filterNot(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnMapValues() {
        this.table.mapValues((ValueMapper)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullMapperOnMapValueWithKey() {
        this.table.mapValues((ValueMapperWithKey)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullSelectorOnGroupBy() {
        this.table.groupBy(null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullOtherTableOnJoin() {
        this.table.join(null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test
    public void shouldAllowNullStoreInJoin() {
        this.table.join(this.table, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullJoinerJoin() {
        this.table.join(this.table, null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullOtherTableOnOuterJoin() {
        this.table.outerJoin(null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullJoinerOnOuterJoin() {
        this.table.outerJoin(this.table, null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullJoinerOnLeftJoin() {
        this.table.leftJoin(this.table, null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldNotAllowNullOtherTableOnLeftJoin() {
        this.table.leftJoin(null, MockValueJoiner.TOSTRING_JOINER);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnFilterWhenMaterializedIsNull() {
        this.table.filter((Predicate)new Predicate<String, String>(){

            public boolean test(String key, String value) {
                return false;
            }
        }, (Materialized)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnFilterNotWhenMaterializedIsNull() {
        this.table.filterNot((Predicate)new Predicate<String, String>(){

            public boolean test(String key, String value) {
                return false;
            }
        }, (Materialized)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnJoinWhenMaterializedIsNull() {
        this.table.join(this.table, MockValueJoiner.TOSTRING_JOINER, (Materialized)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnLeftJoinWhenMaterializedIsNull() {
        this.table.leftJoin(this.table, MockValueJoiner.TOSTRING_JOINER, (Materialized)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnOuterJoinWhenMaterializedIsNull() {
        this.table.outerJoin(this.table, MockValueJoiner.TOSTRING_JOINER, (Materialized)null);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnTransformValuesWithKeyWhenTransformerSupplierIsNull() {
        this.table.transformValues((ValueTransformerWithKeySupplier)null, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnTransformValuesWithKeyWhenMaterializedIsNull() {
        ValueTransformerWithKeySupplier valueTransformerSupplier = (ValueTransformerWithKeySupplier)EasyMock.mock(ValueTransformerWithKeySupplier.class);
        this.table.transformValues(valueTransformerSupplier, (Materialized)null, new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerOnTransformValuesWithKeyWhenStoreNamesNull() {
        ValueTransformerWithKeySupplier valueTransformerSupplier = (ValueTransformerWithKeySupplier)EasyMock.mock(ValueTransformerWithKeySupplier.class);
        this.table.transformValues(valueTransformerSupplier, (String[])null);
    }
}

