/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyTestDriverWrapper;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

public class KTableSourceTest {
    private final Consumed<String, String> stringConsumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new StringSerializer(), 0L);
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());

    @Test
    public void testKTable() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTable table1 = builder.table("topic1", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        table1.toStream().process(supplier, new String[0]);
        ConsumerRecordFactory integerFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new IntegerSerializer(), 0L);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            driver.pipeInput(integerFactory.create("topic1", (Object)"A", (Object)1, 10L));
            driver.pipeInput(integerFactory.create("topic1", (Object)"B", (Object)2, 11L));
            driver.pipeInput(integerFactory.create("topic1", (Object)"C", (Object)3, 12L));
            driver.pipeInput(integerFactory.create("topic1", (Object)"D", (Object)4, 13L));
            driver.pipeInput(integerFactory.create("topic1", (Object)"A", null, 14L));
            driver.pipeInput(integerFactory.create("topic1", (Object)"B", null, 15L));
        }
        Assert.assertEquals(Arrays.asList("A:1 (ts: 10)", "B:2 (ts: 11)", "C:3 (ts: 12)", "D:4 (ts: 13)", "A:null (ts: 14)", "B:null (ts: 15)"), supplier.theCapturedProcessor().processed);
    }

    @Test
    public void kTableShouldLogAndMeterOnSkippedRecords() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic = "topic";
        builder.table("topic", this.stringConsumed);
        LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            driver.pipeInput(this.recordFactory.create("topic", null, (Object)"value"));
            LogCaptureAppender.unregister(appender);
            Assert.assertEquals((Object)1.0, (Object)StreamsTestUtils.getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
            MatcherAssert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key. topic=[topic] partition=[0] offset=[0]"));
        }
    }

    @Test
    public void testValueGetter() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.stringConsumed, Materialized.as((String)"store"));
        Topology topology = builder.build();
        KTableValueGetterSupplier getterSupplier1 = table1.valueGetterSupplier();
        InternalTopologyBuilder topologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
        topologyBuilder.connectProcessorAndStateStores(table1.name, getterSupplier1.storeNames());
        try (TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(builder.build(), this.props);){
            KTableValueGetter getter1 = getterSupplier1.get();
            getter1.init(driver.setCurrentNodeForProcessorContext(table1.name));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"01", 10L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"01", 20L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"C", (Object)"01", 15L));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"01", (long)10L), (Object)getter1.get((Object)"A"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"01", (long)20L), (Object)getter1.get((Object)"B"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"01", (long)15L), (Object)getter1.get((Object)"C"));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"02", 30L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"02", 5L));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"02", (long)30L), (Object)getter1.get((Object)"A"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"02", (long)5L), (Object)getter1.get((Object)"B"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"01", (long)15L), (Object)getter1.get((Object)"C"));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"03", 29L));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"03", (long)29L), (Object)getter1.get((Object)"A"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"02", (long)5L), (Object)getter1.get((Object)"B"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"01", (long)15L), (Object)getter1.get((Object)"C"));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)null, 50L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)null, 3L));
            Assert.assertNull((Object)getter1.get((Object)"A"));
            Assert.assertNull((Object)getter1.get((Object)"B"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"01", (long)15L), (Object)getter1.get((Object)"C"));
        }
    }

    @Test
    public void testNotSendingOldValue() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.stringConsumed);
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        Topology topology = builder.build().addProcessor("proc1", supplier, new String[]{table1.name});
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.props);){
            MockProcessor proc1 = supplier.theCapturedProcessor();
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"01", 10L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"01", 20L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"C", (Object)"01", 15L));
            proc1.checkAndClearProcessResult("A:(01<-null) (ts: 10)", "B:(01<-null) (ts: 20)", "C:(01<-null) (ts: 15)");
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"02", 8L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"02", 22L));
            proc1.checkAndClearProcessResult("A:(02<-null) (ts: 8)", "B:(02<-null) (ts: 22)");
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"03", 12L));
            proc1.checkAndClearProcessResult("A:(03<-null) (ts: 12)");
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)null, 15L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)null, 20L));
            proc1.checkAndClearProcessResult("A:(null<-null) (ts: 15)", "B:(null<-null) (ts: 20)");
        }
    }

    @Test
    public void testSendingOldValue() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.stringConsumed);
        table1.enableSendingOldValues();
        Assert.assertTrue((boolean)table1.sendingOldValueEnabled());
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        Topology topology = builder.build().addProcessor("proc1", supplier, new String[]{table1.name});
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.props);){
            MockProcessor proc1 = supplier.theCapturedProcessor();
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"01", 10L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"01", 20L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"C", (Object)"01", 15L));
            proc1.checkAndClearProcessResult("A:(01<-null) (ts: 10)", "B:(01<-null) (ts: 20)", "C:(01<-null) (ts: 15)");
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"02", 8L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"02", 22L));
            proc1.checkAndClearProcessResult("A:(02<-01) (ts: 8)", "B:(02<-01) (ts: 22)");
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"03", 12L));
            proc1.checkAndClearProcessResult("A:(03<-02) (ts: 12)");
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)null, 15L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)null, 20L));
            proc1.checkAndClearProcessResult("A:(null<-03) (ts: 15)", "B:(null<-02) (ts: 20)");
        }
    }
}

