/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.io.File;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class KTableSourceTest {
    private final Serde<String> stringSerde = Serdes.String();
    private final Consumed<String, String> stringConsumed = Consumed.with(this.stringSerde, this.stringSerde);
    private final Serde<Integer> intSerde = Serdes.Integer();
    @Rule
    public final KStreamTestDriver driver = new KStreamTestDriver();
    private File stateDir = null;

    @Before
    public void setUp() {
        this.stateDir = TestUtils.tempDirectory((String)"kafka-test");
    }

    @Test
    public void testKTable() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTable table1 = builder.table(topic1, Consumed.with(this.stringSerde, this.intSerde));
        MockProcessorSupplier proc1 = new MockProcessorSupplier();
        table1.toStream().process(proc1, new String[0]);
        this.driver.setUp(builder, this.stateDir);
        this.driver.process(topic1, "A", 1);
        this.driver.process(topic1, "B", 2);
        this.driver.process(topic1, "C", 3);
        this.driver.process(topic1, "D", 4);
        this.driver.flushState();
        this.driver.process(topic1, "A", null);
        this.driver.process(topic1, "B", null);
        this.driver.flushState();
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:1", "B:2", "C:3", "D:4", "A:null", "B:null"}), proc1.processed);
    }

    @Test
    public void testValueGetter() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table(topic1, this.stringConsumed);
        KTableValueGetterSupplier getterSupplier1 = table1.valueGetterSupplier();
        this.driver.setUp(builder, this.stateDir);
        KTableValueGetter getter1 = getterSupplier1.get();
        getter1.init(this.driver.context());
        this.driver.process(topic1, "A", "01");
        this.driver.process(topic1, "B", "01");
        this.driver.process(topic1, "C", "01");
        Assert.assertEquals((Object)"01", (Object)getter1.get((Object)"A"));
        Assert.assertEquals((Object)"01", (Object)getter1.get((Object)"B"));
        Assert.assertEquals((Object)"01", (Object)getter1.get((Object)"C"));
        this.driver.process(topic1, "A", "02");
        this.driver.process(topic1, "B", "02");
        Assert.assertEquals((Object)"02", (Object)getter1.get((Object)"A"));
        Assert.assertEquals((Object)"02", (Object)getter1.get((Object)"B"));
        Assert.assertEquals((Object)"01", (Object)getter1.get((Object)"C"));
        this.driver.process(topic1, "A", "03");
        Assert.assertEquals((Object)"03", (Object)getter1.get((Object)"A"));
        Assert.assertEquals((Object)"02", (Object)getter1.get((Object)"B"));
        Assert.assertEquals((Object)"01", (Object)getter1.get((Object)"C"));
        this.driver.process(topic1, "A", null);
        this.driver.process(topic1, "B", null);
        Assert.assertNull((Object)getter1.get((Object)"A"));
        Assert.assertNull((Object)getter1.get((Object)"B"));
        Assert.assertEquals((Object)"01", (Object)getter1.get((Object)"C"));
    }

    @Test
    public void testNotSendingOldValue() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table(topic1, this.stringConsumed);
        MockProcessorSupplier proc1 = new MockProcessorSupplier();
        builder.build().addProcessor("proc1", proc1, new String[]{table1.name});
        this.driver.setUp(builder, this.stateDir);
        this.driver.process(topic1, "A", "01");
        this.driver.process(topic1, "B", "01");
        this.driver.process(topic1, "C", "01");
        this.driver.flushState();
        proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
        this.driver.process(topic1, "A", "02");
        this.driver.process(topic1, "B", "02");
        this.driver.flushState();
        proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)");
        this.driver.process(topic1, "A", "03");
        this.driver.flushState();
        proc1.checkAndClearProcessResult("A:(03<-null)");
        this.driver.process(topic1, "A", null);
        this.driver.process(topic1, "B", null);
        this.driver.flushState();
        proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
    }

    @Test
    public void testSendingOldValue() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table(topic1, this.stringConsumed);
        table1.enableSendingOldValues();
        Assert.assertTrue((boolean)table1.sendingOldValueEnabled());
        MockProcessorSupplier proc1 = new MockProcessorSupplier();
        builder.build().addProcessor("proc1", proc1, new String[]{table1.name});
        this.driver.setUp(builder, this.stateDir);
        this.driver.process(topic1, "A", "01");
        this.driver.process(topic1, "B", "01");
        this.driver.process(topic1, "C", "01");
        this.driver.flushState();
        proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
        this.driver.process(topic1, "A", "02");
        this.driver.process(topic1, "B", "02");
        this.driver.flushState();
        proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)");
        this.driver.process(topic1, "A", "03");
        this.driver.flushState();
        proc1.checkAndClearProcessResult("A:(03<-02)");
        this.driver.process(topic1, "A", null);
        this.driver.process(topic1, "B", null);
        this.driver.flushState();
        proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)");
    }
}

