/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyTestDriverWrapper;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

public class KTableSourceTest {
    private final Consumed<String, String> stringConsumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());

    @Test
    public void testKTable() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTable table1 = builder.table("topic1", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        table1.toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic1", (Serializer)new StringSerializer(), (Serializer)new IntegerSerializer());
            inputTopic.pipeInput((Object)"A", (Object)1, 10L);
            inputTopic.pipeInput((Object)"B", (Object)2, 11L);
            inputTopic.pipeInput((Object)"C", (Object)3, 12L);
            inputTopic.pipeInput((Object)"D", (Object)4, 13L);
            inputTopic.pipeInput((Object)"A", null, 14L);
            inputTopic.pipeInput((Object)"B", null, 15L);
        }
        Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<String, Integer>("A", 1, 10L), new KeyValueTimestamp<String, Integer>("B", 2, 11L), new KeyValueTimestamp<String, Integer>("C", 3, 12L), new KeyValueTimestamp<String, Integer>("D", 4, 13L), new KeyValueTimestamp<String, Object>("A", null, 14L), new KeyValueTimestamp<String, Object>("B", null, 15L)), supplier.theCapturedProcessor().processed());
    }

    @Ignore
    @Test
    public void testKTableSourceEmitOnChange() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        builder.table("topic1", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()), Materialized.as((String)"store")).toStream().to("output");
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic1", (Serializer)new StringSerializer(), (Serializer)new IntegerSerializer());
            TestOutputTopic outputTopic = driver.createOutputTopic("output", (Deserializer)new StringDeserializer(), (Deserializer)new IntegerDeserializer());
            inputTopic.pipeInput((Object)"A", (Object)1, 10L);
            inputTopic.pipeInput((Object)"B", (Object)2, 11L);
            inputTopic.pipeInput((Object)"A", (Object)1, 12L);
            inputTopic.pipeInput((Object)"B", (Object)3, 13L);
            inputTopic.pipeInput((Object)"A", (Object)1, 9L);
            Assert.assertEquals((Object)1.0, (Object)StreamsTestUtils.getMetricByName(driver.metrics(), "idempotent-update-skip-total", "stream-processor-node-metrics").metricValue());
            Assert.assertEquals(Arrays.asList(new TestRecord((Object)"A", (Object)1, Instant.ofEpochMilli(10L)), new TestRecord((Object)"B", (Object)2, Instant.ofEpochMilli(11L)), new TestRecord((Object)"B", (Object)3, Instant.ofEpochMilli(13L)), new TestRecord((Object)"A", (Object)1, Instant.ofEpochMilli(9L))), (Object)outputTopic.readRecordsToList());
        }
    }

    @Test
    public void kTableShouldLogAndMeterOnSkippedRecords() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic = "topic";
        builder.table("topic", this.stringConsumed);
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KTableSource.class);
             TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            inputTopic.pipeInput(null, (Object)"value");
            MatcherAssert.assertThat(appender.getEvents().stream().filter(e -> e.getLevel().equals("WARN")).map(LogCaptureAppender.Event::getMessage).collect(Collectors.toList()), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key. topic=[topic] partition=[0] offset=[0]"));
        }
    }

    @Test
    public void kTableShouldLogOnOutOfOrder() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic = "topic";
        builder.table("topic", this.stringConsumed, Materialized.as((String)"store"));
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KTableSource.class);
             TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            inputTopic.pipeInput((Object)"key", (Object)"value", 10L);
            inputTopic.pipeInput((Object)"key", (Object)"value", 5L);
            MatcherAssert.assertThat(appender.getEvents().stream().filter(e -> e.getLevel().equals("WARN")).map(LogCaptureAppender.Event::getMessage).collect(Collectors.toList()), (Matcher)CoreMatchers.hasItem((Object)"Detected out-of-order KTable update for store, old timestamp=[10] new timestamp=[5]. topic=[topic] partition=[0] offset=[1]."));
        }
    }

    @Test
    public void testValueGetter() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.stringConsumed, Materialized.as((String)"store"));
        Topology topology = builder.build();
        KTableValueGetterSupplier getterSupplier1 = table1.valueGetterSupplier();
        InternalTopologyBuilder topologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
        topologyBuilder.connectProcessorAndStateStores(table1.name, getterSupplier1.storeNames());
        try (TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(builder.build(), this.props);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            KTableValueGetter getter1 = getterSupplier1.get();
            getter1.init(driver.setCurrentNodeForProcessorContext(table1.name));
            inputTopic1.pipeInput((Object)"A", (Object)"01", 10L);
            inputTopic1.pipeInput((Object)"B", (Object)"01", 20L);
            inputTopic1.pipeInput((Object)"C", (Object)"01", 15L);
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"01", (long)10L), (Object)getter1.get((Object)"A"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"01", (long)20L), (Object)getter1.get((Object)"B"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"01", (long)15L), (Object)getter1.get((Object)"C"));
            inputTopic1.pipeInput((Object)"A", (Object)"02", 30L);
            inputTopic1.pipeInput((Object)"B", (Object)"02", 5L);
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"02", (long)30L), (Object)getter1.get((Object)"A"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"02", (long)5L), (Object)getter1.get((Object)"B"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"01", (long)15L), (Object)getter1.get((Object)"C"));
            inputTopic1.pipeInput((Object)"A", (Object)"03", 29L);
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"03", (long)29L), (Object)getter1.get((Object)"A"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"02", (long)5L), (Object)getter1.get((Object)"B"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"01", (long)15L), (Object)getter1.get((Object)"C"));
            inputTopic1.pipeInput((Object)"A", null, 50L);
            inputTopic1.pipeInput((Object)"B", null, 3L);
            Assert.assertNull((Object)getter1.get((Object)"A"));
            Assert.assertNull((Object)getter1.get((Object)"B"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)"01", (long)15L), (Object)getter1.get((Object)"C"));
        }
    }

    @Test
    public void testNotSendingOldValue() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.stringConsumed);
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        Topology topology = builder.build().addProcessor("proc1", supplier, new String[]{table1.name});
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.props);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor proc1 = supplier.theCapturedProcessor();
            inputTopic1.pipeInput((Object)"A", (Object)"01", 10L);
            inputTopic1.pipeInput((Object)"B", (Object)"01", 20L);
            inputTopic1.pipeInput((Object)"C", (Object)"01", 15L);
            proc1.checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change((Object)"01", null), 10L), new KeyValueTimestamp<String, Change>("B", new Change((Object)"01", null), 20L), new KeyValueTimestamp<String, Change>("C", new Change((Object)"01", null), 15L));
            inputTopic1.pipeInput((Object)"A", (Object)"02", 8L);
            inputTopic1.pipeInput((Object)"B", (Object)"02", 22L);
            proc1.checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change((Object)"02", null), 8L), new KeyValueTimestamp<String, Change>("B", new Change((Object)"02", null), 22L));
            inputTopic1.pipeInput((Object)"A", (Object)"03", 12L);
            proc1.checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change((Object)"03", null), 12L));
            inputTopic1.pipeInput((Object)"A", null, 15L);
            inputTopic1.pipeInput((Object)"B", null, 20L);
            proc1.checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change(null, null), 15L), new KeyValueTimestamp<String, Change>("B", new Change(null, null), 20L));
        }
    }

    @Test
    public void testSendingOldValue() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.stringConsumed);
        table1.enableSendingOldValues(true);
        Assert.assertTrue((boolean)table1.sendingOldValueEnabled());
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        Topology topology = builder.build().addProcessor("proc1", supplier, new String[]{table1.name});
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.props);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor proc1 = supplier.theCapturedProcessor();
            inputTopic1.pipeInput((Object)"A", (Object)"01", 10L);
            inputTopic1.pipeInput((Object)"B", (Object)"01", 20L);
            inputTopic1.pipeInput((Object)"C", (Object)"01", 15L);
            proc1.checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change((Object)"01", null), 10L), new KeyValueTimestamp<String, Change>("B", new Change((Object)"01", null), 20L), new KeyValueTimestamp<String, Change>("C", new Change((Object)"01", null), 15L));
            inputTopic1.pipeInput((Object)"A", (Object)"02", 8L);
            inputTopic1.pipeInput((Object)"B", (Object)"02", 22L);
            proc1.checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change((Object)"02", (Object)"01"), 8L), new KeyValueTimestamp<String, Change>("B", new Change((Object)"02", (Object)"01"), 22L));
            inputTopic1.pipeInput((Object)"A", (Object)"03", 12L);
            proc1.checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change((Object)"03", (Object)"02"), 12L));
            inputTopic1.pipeInput((Object)"A", null, 15L);
            inputTopic1.pipeInput((Object)"B", null, 20L);
            proc1.checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change(null, (Object)"03"), 15L), new KeyValueTimestamp<String, Change>("B", new Change(null, (Object)"02"), 20L));
        }
    }
}

