/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyTestDriverWrapper;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;

public class KTableSourceTest {
    private final Consumed<String, String> stringConsumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new StringSerializer());
    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());

    @Test
    public void testKTable() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTable table1 = builder.table("topic1", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        table1.toStream().process(supplier, new String[0]);
        ConsumerRecordFactory integerFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new IntegerSerializer());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            driver.pipeInput(integerFactory.create("topic1", (Object)"A", (Object)1));
            driver.pipeInput(integerFactory.create("topic1", (Object)"B", (Object)2));
            driver.pipeInput(integerFactory.create("topic1", (Object)"C", (Object)3));
            driver.pipeInput(integerFactory.create("topic1", (Object)"D", (Object)4));
            driver.pipeInput(integerFactory.create("topic1", (Object)"A", null));
            driver.pipeInput(integerFactory.create("topic1", (Object)"B", null));
        }
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:1", "B:2", "C:3", "D:4", "A:null", "B:null"}), supplier.theCapturedProcessor().processed);
    }

    @Test
    public void kTableShouldLogAndMeterOnSkippedRecords() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic = "topic";
        builder.table("topic", this.stringConsumed);
        LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            driver.pipeInput(this.recordFactory.create("topic", null, (Object)"value"));
            LogCaptureAppender.unregister(appender);
            Assert.assertEquals((Object)1.0, (Object)StreamsTestUtils.getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
            Assert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key. topic=[topic] partition=[0] offset=[0]"));
        }
    }

    @Test
    public void testValueGetter() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.stringConsumed);
        Topology topology = builder.build();
        KTableValueGetterSupplier getterSupplier1 = table1.valueGetterSupplier();
        InternalTopologyBuilder topologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
        topologyBuilder.connectProcessorAndStateStores(table1.name, getterSupplier1.storeNames());
        try (TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(builder.build(), this.props);){
            KTableValueGetter getter1 = getterSupplier1.get();
            getter1.init(driver.setCurrentNodeForProcessorContext(table1.name));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"01"));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"01"));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"C", (Object)"01"));
            Assert.assertEquals((Object)"01", (Object)getter1.get((Object)"A"));
            Assert.assertEquals((Object)"01", (Object)getter1.get((Object)"B"));
            Assert.assertEquals((Object)"01", (Object)getter1.get((Object)"C"));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"02"));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"02"));
            Assert.assertEquals((Object)"02", (Object)getter1.get((Object)"A"));
            Assert.assertEquals((Object)"02", (Object)getter1.get((Object)"B"));
            Assert.assertEquals((Object)"01", (Object)getter1.get((Object)"C"));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"03"));
            Assert.assertEquals((Object)"03", (Object)getter1.get((Object)"A"));
            Assert.assertEquals((Object)"02", (Object)getter1.get((Object)"B"));
            Assert.assertEquals((Object)"01", (Object)getter1.get((Object)"C"));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)null));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)null));
            Assert.assertNull((Object)getter1.get((Object)"A"));
            Assert.assertNull((Object)getter1.get((Object)"B"));
            Assert.assertEquals((Object)"01", (Object)getter1.get((Object)"C"));
        }
    }

    @Test
    public void testNotSendingOldValue() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.stringConsumed);
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        Topology topology = builder.build().addProcessor("proc1", supplier, new String[]{table1.name});
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.props);){
            MockProcessor proc1 = supplier.theCapturedProcessor();
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"01"));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"01"));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"C", (Object)"01"));
            proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"02"));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"02"));
            proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)");
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"03"));
            proc1.checkAndClearProcessResult("A:(03<-null)");
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)null));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)null));
            proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
        }
    }

    @Test
    public void testSendingOldValue() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.stringConsumed);
        table1.enableSendingOldValues();
        Assert.assertTrue((boolean)table1.sendingOldValueEnabled());
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        Topology topology = builder.build().addProcessor("proc1", supplier, new String[]{table1.name});
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.props);){
            MockProcessor proc1 = supplier.theCapturedProcessor();
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"01"));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"01"));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"C", (Object)"01"));
            proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"02"));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"02"));
            proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)");
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"03"));
            proc1.checkAndClearProcessResult("A:(03<-02)");
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)null));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)null));
            proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)");
        }
    }
}

