/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

public class KStreamWindowAggregateTest {
    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new StringSerializer());
    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());

    @Test
    public void testAggBasic() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTable table2 = builder.stream("topic1", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Serialized.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.of((long)10L).advanceBy(5L)).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((String)"topic1-Canonized").withValueSerde(Serdes.String()));
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        table2.toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props, 0L);){
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"1", 0L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"2", 1L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"C", (Object)"3", 2L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"D", (Object)"4", 3L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"1", 4L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"1", 5L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"2", 6L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"D", (Object)"4", 7L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"2", 8L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"C", (Object)"3", 9L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"1", 10L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"2", 11L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"D", (Object)"4", 12L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"2", 13L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"C", (Object)"3", 14L));
        }
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"[A@0/10]:0+1", "[B@0/10]:0+2", "[C@0/10]:0+3", "[D@0/10]:0+4", "[A@0/10]:0+1+1", "[A@0/10]:0+1+1+1", "[A@5/15]:0+1", "[B@0/10]:0+2+2", "[B@5/15]:0+2", "[D@0/10]:0+4+4", "[D@5/15]:0+4", "[B@0/10]:0+2+2+2", "[B@5/15]:0+2+2", "[C@0/10]:0+3+3", "[C@5/15]:0+3", "[A@5/15]:0+1+1", "[A@10/20]:0+1", "[B@5/15]:0+2+2+2", "[B@10/20]:0+2", "[D@5/15]:0+4+4", "[D@10/20]:0+4", "[B@5/15]:0+2+2+2+2", "[B@10/20]:0+2+2", "[C@5/15]:0+3+3", "[C@10/20]:0+3"}), supplier.theCapturedProcessor().processed);
    }

    @Test
    public void testJoin() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        String topic2 = "topic2";
        KTable table1 = builder.stream("topic1", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Serialized.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.of((long)10L).advanceBy(5L)).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((String)"topic1-Canonized").withValueSerde(Serdes.String()));
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        table1.toStream().process(supplier, new String[0]);
        KTable table2 = builder.stream("topic2", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Serialized.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.of((long)10L).advanceBy(5L)).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((String)"topic2-Canonized").withValueSerde(Serdes.String()));
        table2.toStream().process(supplier, new String[0]);
        table1.join(table2, (ValueJoiner)new ValueJoiner<String, String, String>(){

            public String apply(String p1, String p2) {
                return p1 + "%" + p2;
            }
        }).toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props, 0L);){
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"1", 0L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"2", 1L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"C", (Object)"3", 2L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"D", (Object)"4", 3L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"1", 4L));
            List processors = supplier.capturedProcessors(3);
            processors.get(0).checkAndClearProcessResult("[A@0/10]:0+1", "[B@0/10]:0+2", "[C@0/10]:0+3", "[D@0/10]:0+4", "[A@0/10]:0+1+1");
            processors.get(1).checkAndClearProcessResult(new String[0]);
            processors.get(2).checkAndClearProcessResult(new String[0]);
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"A", (Object)"1", 5L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"2", 6L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"D", (Object)"4", 7L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"B", (Object)"2", 8L));
            driver.pipeInput(this.recordFactory.create("topic1", (Object)"C", (Object)"3", 9L));
            processors.get(0).checkAndClearProcessResult("[A@0/10]:0+1+1+1", "[A@5/15]:0+1", "[B@0/10]:0+2+2", "[B@5/15]:0+2", "[D@0/10]:0+4+4", "[D@5/15]:0+4", "[B@0/10]:0+2+2+2", "[B@5/15]:0+2+2", "[C@0/10]:0+3+3", "[C@5/15]:0+3");
            processors.get(1).checkAndClearProcessResult(new String[0]);
            processors.get(2).checkAndClearProcessResult(new String[0]);
            driver.pipeInput(this.recordFactory.create("topic2", (Object)"A", (Object)"a", 0L));
            driver.pipeInput(this.recordFactory.create("topic2", (Object)"B", (Object)"b", 1L));
            driver.pipeInput(this.recordFactory.create("topic2", (Object)"C", (Object)"c", 2L));
            driver.pipeInput(this.recordFactory.create("topic2", (Object)"D", (Object)"d", 3L));
            driver.pipeInput(this.recordFactory.create("topic2", (Object)"A", (Object)"a", 4L));
            processors.get(0).checkAndClearProcessResult(new String[0]);
            processors.get(1).checkAndClearProcessResult("[A@0/10]:0+a", "[B@0/10]:0+b", "[C@0/10]:0+c", "[D@0/10]:0+d", "[A@0/10]:0+a+a");
            processors.get(2).checkAndClearProcessResult("[A@0/10]:0+1+1+1%0+a", "[B@0/10]:0+2+2+2%0+b", "[C@0/10]:0+3+3%0+c", "[D@0/10]:0+4+4%0+d", "[A@0/10]:0+1+1+1%0+a+a");
            driver.pipeInput(this.recordFactory.create("topic2", (Object)"A", (Object)"a", 5L));
            driver.pipeInput(this.recordFactory.create("topic2", (Object)"B", (Object)"b", 6L));
            driver.pipeInput(this.recordFactory.create("topic2", (Object)"D", (Object)"d", 7L));
            driver.pipeInput(this.recordFactory.create("topic2", (Object)"B", (Object)"b", 8L));
            driver.pipeInput(this.recordFactory.create("topic2", (Object)"C", (Object)"c", 9L));
            processors.get(0).checkAndClearProcessResult(new String[0]);
            processors.get(1).checkAndClearProcessResult("[A@0/10]:0+a+a+a", "[A@5/15]:0+a", "[B@0/10]:0+b+b", "[B@5/15]:0+b", "[D@0/10]:0+d+d", "[D@5/15]:0+d", "[B@0/10]:0+b+b+b", "[B@5/15]:0+b+b", "[C@0/10]:0+c+c", "[C@5/15]:0+c");
            processors.get(2).checkAndClearProcessResult("[A@0/10]:0+1+1+1%0+a+a+a", "[A@5/15]:0+1%0+a", "[B@0/10]:0+2+2+2%0+b+b", "[B@5/15]:0+2+2%0+b", "[D@0/10]:0+4+4%0+d+d", "[D@5/15]:0+4%0+d", "[B@0/10]:0+2+2+2%0+b+b+b", "[B@5/15]:0+2+2%0+b+b", "[C@0/10]:0+3+3%0+c+c", "[C@5/15]:0+3%0+c");
        }
    }

    @Test
    public void shouldLogAndMeterWhenSkippingNullKey() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic = "topic";
        KStream stream1 = builder.stream("topic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        stream1.groupByKey(Serialized.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.of((long)10L).advanceBy(5L)).aggregate(MockInitializer.STRING_INIT, MockAggregator.toStringInstance("+"), Materialized.as((String)"topic1-Canonicalized").withValueSerde(Serdes.String()));
        LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props, 0L);){
            driver.pipeInput(this.recordFactory.create("topic", null, (Object)"1"));
            LogCaptureAppender.unregister(appender);
            Assert.assertEquals((Object)1.0, (Object)StreamsTestUtils.getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
            MatcherAssert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key. value=[1] topic=[topic] partition=[0] offset=[0]"));
        }
    }
}

