/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyTestDriverWrapper;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Assert;
import org.junit.Test;

public class KTableFilterTest {
    private final Consumed<String, Integer> consumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer());
    private final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new IntegerSerializer());
    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.Integer());

    private void doTestKTable(StreamsBuilder builder, KTable<String, Integer> table2, KTable<String, Integer> table3, String topic) {
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        table2.toStream().process(supplier, new String[0]);
        table3.toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            driver.pipeInput(this.recordFactory.create(topic, (Object)"A", (Object)1));
            driver.pipeInput(this.recordFactory.create(topic, (Object)"B", (Object)2));
            driver.pipeInput(this.recordFactory.create(topic, (Object)"C", (Object)3));
            driver.pipeInput(this.recordFactory.create(topic, (Object)"D", (Object)4));
            driver.pipeInput(this.recordFactory.create(topic, (Object)"A", null));
            driver.pipeInput(this.recordFactory.create(topic, (Object)"B", null));
        }
        List processors = supplier.capturedProcessors(2);
        processors.get(0).checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
        processors.get(1).checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null");
    }

    @Test
    public void testKTable() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTable table1 = builder.table("topic1", this.consumed);
        KTable table2 = table1.filter((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        KTable table3 = table1.filterNot((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        this.doTestKTable(builder, (KTable<String, Integer>)table2, (KTable<String, Integer>)table3, "topic1");
    }

    @Test
    public void testQueryableKTable() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTable table1 = builder.table("topic1", this.consumed);
        KTable table2 = table1.filter((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        }, Materialized.as((String)"anyStoreNameFilter"));
        KTable table3 = table1.filterNot((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        Assert.assertEquals((Object)"anyStoreNameFilter", (Object)table2.queryableStoreName());
        Assert.assertNull((Object)table3.queryableStoreName());
        this.doTestKTable(builder, (KTable<String, Integer>)table2, (KTable<String, Integer>)table3, "topic1");
    }

    private void doTestValueGetter(StreamsBuilder builder, KTableImpl<String, Integer, Integer> table2, KTableImpl<String, Integer, Integer> table3, String topic1) {
        Topology topology = builder.build();
        KTableValueGetterSupplier getterSupplier2 = table2.valueGetterSupplier();
        KTableValueGetterSupplier getterSupplier3 = table3.valueGetterSupplier();
        InternalTopologyBuilder topologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
        topologyBuilder.connectProcessorAndStateStores(table2.name, getterSupplier2.storeNames());
        topologyBuilder.connectProcessorAndStateStores(table3.name, getterSupplier3.storeNames());
        try (TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(topology, this.props);){
            KTableValueGetter getter2 = getterSupplier2.get();
            KTableValueGetter getter3 = getterSupplier3.get();
            getter2.init(driver.setCurrentNodeForProcessorContext(table2.name));
            getter3.init(driver.setCurrentNodeForProcessorContext(table3.name));
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"A", (Object)1));
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"B", (Object)1));
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"C", (Object)1));
            Assert.assertNull((Object)getter2.get((Object)"A"));
            Assert.assertNull((Object)getter2.get((Object)"B"));
            Assert.assertNull((Object)getter2.get((Object)"C"));
            Assert.assertEquals((long)1L, (long)((Integer)getter3.get((Object)"A")).intValue());
            Assert.assertEquals((long)1L, (long)((Integer)getter3.get((Object)"B")).intValue());
            Assert.assertEquals((long)1L, (long)((Integer)getter3.get((Object)"C")).intValue());
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"A", (Object)2));
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"B", (Object)2));
            Assert.assertEquals((long)2L, (long)((Integer)getter2.get((Object)"A")).intValue());
            Assert.assertEquals((long)2L, (long)((Integer)getter2.get((Object)"B")).intValue());
            Assert.assertNull((Object)getter2.get((Object)"C"));
            Assert.assertNull((Object)getter3.get((Object)"A"));
            Assert.assertNull((Object)getter3.get((Object)"B"));
            Assert.assertEquals((long)1L, (long)((Integer)getter3.get((Object)"C")).intValue());
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"A", (Object)3));
            Assert.assertNull((Object)getter2.get((Object)"A"));
            Assert.assertEquals((long)2L, (long)((Integer)getter2.get((Object)"B")).intValue());
            Assert.assertNull((Object)getter2.get((Object)"C"));
            Assert.assertEquals((long)3L, (long)((Integer)getter3.get((Object)"A")).intValue());
            Assert.assertNull((Object)getter3.get((Object)"B"));
            Assert.assertEquals((long)1L, (long)((Integer)getter3.get((Object)"C")).intValue());
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"A", null));
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"B", null));
            Assert.assertNull((Object)getter2.get((Object)"A"));
            Assert.assertNull((Object)getter2.get((Object)"B"));
            Assert.assertNull((Object)getter2.get((Object)"C"));
            Assert.assertNull((Object)getter3.get((Object)"A"));
            Assert.assertNull((Object)getter3.get((Object)"B"));
            Assert.assertEquals((long)1L, (long)((Integer)getter3.get((Object)"C")).intValue());
        }
    }

    @Test
    public void testValueGetter() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table(topic1, this.consumed);
        KTableImpl table2 = (KTableImpl)table1.filter((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        KTableImpl table3 = (KTableImpl)table1.filterNot((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        this.doTestValueGetter(builder, (KTableImpl<String, Integer, Integer>)table2, (KTableImpl<String, Integer, Integer>)table3, topic1);
    }

    @Test
    public void testQueryableValueGetter() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table(topic1, this.consumed);
        KTableImpl table2 = (KTableImpl)table1.filter((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        }, Materialized.as((String)"anyStoreNameFilter"));
        KTableImpl table3 = (KTableImpl)table1.filterNot((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        Assert.assertEquals((Object)"anyStoreNameFilter", (Object)table2.queryableStoreName());
        Assert.assertNull((Object)table3.queryableStoreName());
        this.doTestValueGetter(builder, (KTableImpl<String, Integer, Integer>)table2, (KTableImpl<String, Integer, Integer>)table3, topic1);
    }

    private void doTestNotSendingOldValue(StreamsBuilder builder, KTableImpl<String, Integer, Integer> table1, KTableImpl<String, Integer, Integer> table2, String topic1) {
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        builder.build().addProcessor("proc1", supplier, new String[]{table1.name});
        builder.build().addProcessor("proc2", supplier, new String[]{table2.name});
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"A", (Object)1));
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"B", (Object)1));
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"C", (Object)1));
            List processors = supplier.capturedProcessors(2);
            processors.get(0).checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
            processors.get(1).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"A", (Object)2));
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"B", (Object)2));
            processors.get(0).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
            processors.get(1).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"A", (Object)3));
            processors.get(0).checkAndClearProcessResult("A:(3<-null)");
            processors.get(1).checkAndClearProcessResult("A:(null<-null)");
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"A", null));
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"B", null));
            processors.get(0).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
            processors.get(1).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
        }
    }

    @Test
    public void testNotSendingOldValue() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table(topic1, this.consumed);
        KTableImpl table2 = (KTableImpl)table1.filter((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        this.doTestNotSendingOldValue(builder, (KTableImpl<String, Integer, Integer>)table1, (KTableImpl<String, Integer, Integer>)table2, topic1);
    }

    @Test
    public void testQueryableNotSendingOldValue() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table(topic1, this.consumed);
        KTableImpl table2 = (KTableImpl)table1.filter((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        }, Materialized.as((String)"anyStoreNameFilter"));
        this.doTestNotSendingOldValue(builder, (KTableImpl<String, Integer, Integer>)table1, (KTableImpl<String, Integer, Integer>)table2, topic1);
    }

    private void doTestSendingOldValue(StreamsBuilder builder, KTableImpl<String, Integer, Integer> table1, KTableImpl<String, Integer, Integer> table2, String topic1) {
        table2.enableSendingOldValues();
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        Topology topology = builder.build();
        topology.addProcessor("proc1", supplier, new String[]{table1.name});
        topology.addProcessor("proc2", supplier, new String[]{table2.name});
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.props);){
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"A", (Object)1));
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"B", (Object)1));
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"C", (Object)1));
            List processors = supplier.capturedProcessors(2);
            processors.get(0).checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
            processors.get(1).checkEmptyAndClearProcessResult();
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"A", (Object)2));
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"B", (Object)2));
            processors.get(0).checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
            processors.get(1).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"A", (Object)3));
            processors.get(0).checkAndClearProcessResult("A:(3<-2)");
            processors.get(1).checkAndClearProcessResult("A:(null<-2)");
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"A", null));
            driver.pipeInput(this.recordFactory.create(topic1, (Object)"B", null));
            processors.get(0).checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
            processors.get(1).checkAndClearProcessResult("B:(null<-2)");
        }
    }

    @Test
    public void testSendingOldValue() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table(topic1, this.consumed);
        KTableImpl table2 = (KTableImpl)table1.filter((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        this.doTestSendingOldValue(builder, (KTableImpl<String, Integer, Integer>)table1, (KTableImpl<String, Integer, Integer>)table2, topic1);
    }

    @Test
    public void testQueryableSendingOldValue() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table(topic1, this.consumed);
        KTableImpl table2 = (KTableImpl)table1.filter((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        }, Materialized.as((String)"anyStoreNameFilter"));
        this.doTestSendingOldValue(builder, (KTableImpl<String, Integer, Integer>)table1, (KTableImpl<String, Integer, Integer>)table2, topic1);
    }

    private void doTestSkipNullOnMaterialization(StreamsBuilder builder, KTableImpl<String, String, String> table1, KTableImpl<String, String, String> table2, String topic1) {
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        Topology topology = builder.build();
        topology.addProcessor("proc1", supplier, new String[]{table1.name});
        topology.addProcessor("proc2", supplier, new String[]{table2.name});
        ConsumerRecordFactory stringRecordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.props);){
            driver.pipeInput(stringRecordFactory.create(topic1, (Object)"A", (Object)"reject"));
            driver.pipeInput(stringRecordFactory.create(topic1, (Object)"B", (Object)"reject"));
            driver.pipeInput(stringRecordFactory.create(topic1, (Object)"C", (Object)"reject"));
        }
        List processors = supplier.capturedProcessors(2);
        processors.get(0).checkAndClearProcessResult("A:(reject<-null)", "B:(reject<-null)", "C:(reject<-null)");
        processors.get(1).checkEmptyAndClearProcessResult();
    }

    @Test
    public void testSkipNullOnMaterialization() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        Consumed consumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
        KTableImpl table1 = (KTableImpl)builder.table(topic1, consumed);
        KTableImpl table2 = (KTableImpl)table1.filter((Predicate)new Predicate<String, String>(){

            public boolean test(String key, String value) {
                return value.equalsIgnoreCase("accept");
            }
        }).groupBy(MockMapper.noOpKeyValueMapper()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER);
        this.doTestSkipNullOnMaterialization(builder, (KTableImpl<String, String, String>)table1, (KTableImpl<String, String, String>)table2, topic1);
    }

    @Test
    public void testQueryableSkipNullOnMaterialization() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        Consumed consumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
        KTableImpl table1 = (KTableImpl)builder.table(topic1, consumed);
        KTableImpl table2 = (KTableImpl)table1.filter((Predicate)new Predicate<String, String>(){

            public boolean test(String key, String value) {
                return value.equalsIgnoreCase("accept");
            }
        }, Materialized.as((String)"anyStoreNameFilter")).groupBy(MockMapper.noOpKeyValueMapper()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, Materialized.as((String)"mock-result"));
        this.doTestSkipNullOnMaterialization(builder, (KTableImpl<String, String, String>)table1, (KTableImpl<String, String, String>)table2, topic1);
    }

    @Test
    public void testTypeVariance() {
        Predicate<Number, Object> numberKeyPredicate = new Predicate<Number, Object>(){

            public boolean test(Number key, Object value) {
                return false;
            }
        };
        new StreamsBuilder().table("empty").filter((Predicate)numberKeyPredicate).filterNot((Predicate)numberKeyPredicate).toStream().to("nirvana");
    }
}

