/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.io.File;
import java.io.IOException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KTableFilterTest {
    private final Serde<Integer> intSerde = Serdes.Integer();
    private final Serde<String> stringSerde = Serdes.String();
    private KStreamTestDriver driver = null;
    private File stateDir = null;

    @After
    public void tearDown() {
        if (this.driver != null) {
            this.driver.close();
        }
        this.driver = null;
    }

    @Before
    public void setUp() throws IOException {
        this.stateDir = TestUtils.tempDirectory((String)"kafka-test");
    }

    private void doTestKTable(KStreamBuilder builder, KTable<String, Integer> table2, KTable<String, Integer> table3, String topic1) {
        MockProcessorSupplier proc2 = new MockProcessorSupplier();
        MockProcessorSupplier proc3 = new MockProcessorSupplier();
        table2.toStream().process(proc2, new String[0]);
        table3.toStream().process(proc3, new String[0]);
        this.driver = new KStreamTestDriver(builder, this.stateDir, Serdes.String(), Serdes.Integer());
        this.driver.process(topic1, "A", 1);
        this.driver.process(topic1, "B", 2);
        this.driver.process(topic1, "C", 3);
        this.driver.process(topic1, "D", 4);
        this.driver.flushState();
        this.driver.process(topic1, "A", null);
        this.driver.process(topic1, "B", null);
        this.driver.flushState();
        proc2.checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
        proc3.checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null");
    }

    @Test
    public void testKTable() {
        KStreamBuilder builder = new KStreamBuilder();
        String topic1 = "topic1";
        KTable table1 = builder.table(this.stringSerde, this.intSerde, topic1, "anyStoreName");
        KTable table2 = table1.filter((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        KTable table3 = table1.filterNot((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        this.doTestKTable(builder, (KTable<String, Integer>)table2, (KTable<String, Integer>)table3, topic1);
    }

    @Test
    public void testQueryableKTable() {
        KStreamBuilder builder = new KStreamBuilder();
        String topic1 = "topic1";
        KTable table1 = builder.table(this.stringSerde, this.intSerde, topic1, "anyStoreName");
        KTable table2 = table1.filter((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        }, "anyStoreNameFilter");
        KTable table3 = table1.filterNot((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        this.doTestKTable(builder, (KTable<String, Integer>)table2, (KTable<String, Integer>)table3, topic1);
    }

    private void doTestValueGetter(KStreamBuilder builder, KTableImpl<String, Integer, Integer> table2, KTableImpl<String, Integer, Integer> table3, String topic1) throws IOException {
        KTableValueGetterSupplier getterSupplier2 = table2.valueGetterSupplier();
        KTableValueGetterSupplier getterSupplier3 = table3.valueGetterSupplier();
        this.driver = new KStreamTestDriver(builder, this.stateDir, Serdes.String(), Serdes.Integer());
        KTableValueGetter getter2 = getterSupplier2.get();
        KTableValueGetter getter3 = getterSupplier3.get();
        getter2.init(this.driver.context());
        getter3.init(this.driver.context());
        this.driver.process(topic1, "A", 1);
        this.driver.process(topic1, "B", 1);
        this.driver.process(topic1, "C", 1);
        Assert.assertNull((Object)getter2.get((Object)"A"));
        Assert.assertNull((Object)getter2.get((Object)"B"));
        Assert.assertNull((Object)getter2.get((Object)"C"));
        Assert.assertEquals((long)1L, (long)((Integer)getter3.get((Object)"A")).intValue());
        Assert.assertEquals((long)1L, (long)((Integer)getter3.get((Object)"B")).intValue());
        Assert.assertEquals((long)1L, (long)((Integer)getter3.get((Object)"C")).intValue());
        this.driver.process(topic1, "A", 2);
        this.driver.process(topic1, "B", 2);
        Assert.assertEquals((long)2L, (long)((Integer)getter2.get((Object)"A")).intValue());
        Assert.assertEquals((long)2L, (long)((Integer)getter2.get((Object)"B")).intValue());
        Assert.assertNull((Object)getter2.get((Object)"C"));
        Assert.assertNull((Object)getter3.get((Object)"A"));
        Assert.assertNull((Object)getter3.get((Object)"B"));
        Assert.assertEquals((long)1L, (long)((Integer)getter3.get((Object)"C")).intValue());
        this.driver.process(topic1, "A", 3);
        Assert.assertNull((Object)getter2.get((Object)"A"));
        Assert.assertEquals((long)2L, (long)((Integer)getter2.get((Object)"B")).intValue());
        Assert.assertNull((Object)getter2.get((Object)"C"));
        Assert.assertEquals((long)3L, (long)((Integer)getter3.get((Object)"A")).intValue());
        Assert.assertNull((Object)getter3.get((Object)"B"));
        Assert.assertEquals((long)1L, (long)((Integer)getter3.get((Object)"C")).intValue());
        this.driver.process(topic1, "A", null);
        this.driver.process(topic1, "B", null);
        Assert.assertNull((Object)getter2.get((Object)"A"));
        Assert.assertNull((Object)getter2.get((Object)"B"));
        Assert.assertNull((Object)getter2.get((Object)"C"));
        Assert.assertNull((Object)getter3.get((Object)"A"));
        Assert.assertNull((Object)getter3.get((Object)"B"));
        Assert.assertEquals((long)1L, (long)((Integer)getter3.get((Object)"C")).intValue());
    }

    @Test
    public void testValueGetter() throws IOException {
        KStreamBuilder builder = new KStreamBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table(this.stringSerde, this.intSerde, topic1, "anyStoreName");
        KTableImpl table2 = (KTableImpl)table1.filter((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        KTableImpl table3 = (KTableImpl)table1.filterNot((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        this.doTestValueGetter(builder, (KTableImpl<String, Integer, Integer>)table2, (KTableImpl<String, Integer, Integer>)table3, topic1);
    }

    @Test
    public void testQueryableValueGetter() throws IOException {
        KStreamBuilder builder = new KStreamBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table(this.stringSerde, this.intSerde, topic1, "anyStoreName");
        KTableImpl table2 = (KTableImpl)table1.filter((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        }, "anyStoreNameFilter");
        KTableImpl table3 = (KTableImpl)table1.filterNot((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        this.doTestValueGetter(builder, (KTableImpl<String, Integer, Integer>)table2, (KTableImpl<String, Integer, Integer>)table3, topic1);
    }

    private void doTestNotSendingOldValue(KStreamBuilder builder, KTableImpl<String, Integer, Integer> table1, KTableImpl<String, Integer, Integer> table2, String topic1) throws IOException {
        MockProcessorSupplier proc1 = new MockProcessorSupplier();
        MockProcessorSupplier proc2 = new MockProcessorSupplier();
        builder.addProcessor("proc1", proc1, new String[]{table1.name});
        builder.addProcessor("proc2", proc2, new String[]{table2.name});
        this.driver = new KStreamTestDriver(builder, this.stateDir, Serdes.String(), Serdes.Integer());
        this.driver.process(topic1, "A", 1);
        this.driver.process(topic1, "B", 1);
        this.driver.process(topic1, "C", 1);
        this.driver.flushState();
        proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
        proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
        this.driver.process(topic1, "A", 2);
        this.driver.process(topic1, "B", 2);
        this.driver.flushState();
        proc1.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
        proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
        this.driver.process(topic1, "A", 3);
        this.driver.flushState();
        proc1.checkAndClearProcessResult("A:(3<-null)");
        proc2.checkAndClearProcessResult("A:(null<-null)");
        this.driver.process(topic1, "A", null);
        this.driver.process(topic1, "B", null);
        this.driver.flushState();
        proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
        proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
    }

    @Test
    public void testNotSendingOldValue() throws IOException {
        KStreamBuilder builder = new KStreamBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table(this.stringSerde, this.intSerde, topic1, "anyStoreName");
        KTableImpl table2 = (KTableImpl)table1.filter((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        this.doTestNotSendingOldValue(builder, (KTableImpl<String, Integer, Integer>)table1, (KTableImpl<String, Integer, Integer>)table2, topic1);
    }

    @Test
    public void testQueryableNotSendingOldValue() throws IOException {
        KStreamBuilder builder = new KStreamBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table(this.stringSerde, this.intSerde, topic1, "anyStoreName");
        KTableImpl table2 = (KTableImpl)table1.filter((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        }, "anyStoreNameFilter");
        this.doTestNotSendingOldValue(builder, (KTableImpl<String, Integer, Integer>)table1, (KTableImpl<String, Integer, Integer>)table2, topic1);
    }

    private void doTestSendingOldValue(KStreamBuilder builder, KTableImpl<String, Integer, Integer> table1, KTableImpl<String, Integer, Integer> table2, String topic1) throws IOException {
        table2.enableSendingOldValues();
        MockProcessorSupplier proc1 = new MockProcessorSupplier();
        MockProcessorSupplier proc2 = new MockProcessorSupplier();
        builder.addProcessor("proc1", proc1, new String[]{table1.name});
        builder.addProcessor("proc2", proc2, new String[]{table2.name});
        this.driver = new KStreamTestDriver(builder, this.stateDir, Serdes.String(), Serdes.Integer());
        this.driver.process(topic1, "A", 1);
        this.driver.process(topic1, "B", 1);
        this.driver.process(topic1, "C", 1);
        this.driver.flushState();
        proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
        proc2.checkEmptyAndClearProcessResult();
        this.driver.process(topic1, "A", 2);
        this.driver.process(topic1, "B", 2);
        this.driver.flushState();
        proc1.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
        proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
        this.driver.process(topic1, "A", 3);
        this.driver.flushState();
        proc1.checkAndClearProcessResult("A:(3<-2)");
        proc2.checkAndClearProcessResult("A:(null<-2)");
        this.driver.process(topic1, "A", null);
        this.driver.process(topic1, "B", null);
        this.driver.flushState();
        proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
        proc2.checkAndClearProcessResult("B:(null<-2)");
    }

    @Test
    public void testSendingOldValue() throws IOException {
        KStreamBuilder builder = new KStreamBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table(this.stringSerde, this.intSerde, topic1, "anyStoreName");
        KTableImpl table2 = (KTableImpl)table1.filter((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        });
        this.doTestSendingOldValue(builder, (KTableImpl<String, Integer, Integer>)table1, (KTableImpl<String, Integer, Integer>)table2, topic1);
    }

    @Test
    public void testQueryableSendingOldValue() throws IOException {
        KStreamBuilder builder = new KStreamBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table(this.stringSerde, this.intSerde, topic1, "anyStoreName");
        KTableImpl table2 = (KTableImpl)table1.filter((Predicate)new Predicate<String, Integer>(){

            public boolean test(String key, Integer value) {
                return value % 2 == 0;
            }
        }, "anyStoreNameFilter");
        this.doTestSendingOldValue(builder, (KTableImpl<String, Integer, Integer>)table1, (KTableImpl<String, Integer, Integer>)table2, topic1);
    }

    private void doTestSkipNullOnMaterialization(KStreamBuilder builder, KTableImpl<String, String, String> table1, KTableImpl<String, String, String> table2, String topic1) throws IOException {
        MockProcessorSupplier proc1 = new MockProcessorSupplier();
        MockProcessorSupplier proc2 = new MockProcessorSupplier();
        builder.addProcessor("proc1", proc1, new String[]{table1.name});
        builder.addProcessor("proc2", proc2, new String[]{table2.name});
        this.driver = new KStreamTestDriver(builder, this.stateDir, this.stringSerde, this.stringSerde);
        this.driver.process(topic1, "A", "reject");
        this.driver.process(topic1, "B", "reject");
        this.driver.process(topic1, "C", "reject");
        this.driver.flushState();
        proc1.checkAndClearProcessResult("A:(reject<-null)", "B:(reject<-null)", "C:(reject<-null)");
        proc2.checkEmptyAndClearProcessResult();
    }

    @Test
    public void testSkipNullOnMaterialization() throws IOException {
        KStreamBuilder builder = new KStreamBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table(this.stringSerde, this.stringSerde, topic1, "anyStoreName");
        KTableImpl table2 = (KTableImpl)table1.filter((Predicate)new Predicate<String, String>(){

            public boolean test(String key, String value) {
                return value.equalsIgnoreCase("accept");
            }
        }).groupBy(MockKeyValueMapper.NoOpKeyValueMapper()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result");
        this.doTestSkipNullOnMaterialization(builder, (KTableImpl<String, String, String>)table1, (KTableImpl<String, String, String>)table2, topic1);
    }

    @Test
    public void testQueryableSkipNullOnMaterialization() throws IOException {
        KStreamBuilder builder = new KStreamBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table(this.stringSerde, this.stringSerde, topic1, "anyStoreName");
        KTableImpl table2 = (KTableImpl)table1.filter((Predicate)new Predicate<String, String>(){

            public boolean test(String key, String value) {
                return value.equalsIgnoreCase("accept");
            }
        }, "anyStoreNameFilter").groupBy(MockKeyValueMapper.NoOpKeyValueMapper()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result");
        this.doTestSkipNullOnMaterialization(builder, (KTableImpl<String, String, String>)table1, (KTableImpl<String, String, String>)table2, topic1);
    }

    @Test
    public void testTypeVariance() throws Exception {
        Predicate<Number, Object> numberKeyPredicate = new Predicate<Number, Object>(){

            public boolean test(Number key, Object value) {
                return false;
            }
        };
        new KStreamBuilder().table("empty", "emptyStore").filter((Predicate)numberKeyPredicate).filterNot((Predicate)numberKeyPredicate).to("nirvana");
    }
}

