/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

public class KStreamWindowAggregateTest {
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
    private final String threadId = Thread.currentThread().getName();

    @Test
    public void testAggBasic() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTable table2 = builder.stream("topic1", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(10L)).advanceBy(Duration.ofMillis(5L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((String)"topic1-Canonized").withValueSerde(Serdes.String()));
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        table2.toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic1.pipeInput((Object)"A", (Object)"1", 0L);
            inputTopic1.pipeInput((Object)"B", (Object)"2", 1L);
            inputTopic1.pipeInput((Object)"C", (Object)"3", 2L);
            inputTopic1.pipeInput((Object)"D", (Object)"4", 3L);
            inputTopic1.pipeInput((Object)"A", (Object)"1", 4L);
            inputTopic1.pipeInput((Object)"A", (Object)"1", 5L);
            inputTopic1.pipeInput((Object)"B", (Object)"2", 6L);
            inputTopic1.pipeInput((Object)"D", (Object)"4", 7L);
            inputTopic1.pipeInput((Object)"B", (Object)"2", 8L);
            inputTopic1.pipeInput((Object)"C", (Object)"3", 9L);
            inputTopic1.pipeInput((Object)"A", (Object)"1", 10L);
            inputTopic1.pipeInput((Object)"B", (Object)"2", 11L);
            inputTopic1.pipeInput((Object)"D", (Object)"4", 12L);
            inputTopic1.pipeInput((Object)"B", (Object)"2", 13L);
            inputTopic1.pipeInput((Object)"C", (Object)"3", 14L);
            inputTopic1.pipeInput((Object)"B", (Object)"1", 3L);
            inputTopic1.pipeInput((Object)"B", (Object)"2", 2L);
            inputTopic1.pipeInput((Object)"B", (Object)"3", 9L);
        }
        Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1", 0L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2", 1L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(0L, 10L)), "0+3", 2L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(0L, 10L)), "0+4", 3L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+1", 4L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+1+1", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+1", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2+2", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+2", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(0L, 10L)), "0+4+4", 7L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(5L, 15L)), "0+4", 7L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2+2+2", 8L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+2+2", 8L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(0L, 10L)), "0+3+3", 9L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(5L, 15L)), "0+3", 9L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+1+1", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(10L, 20L)), "0+1", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+2+2+2", 11L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(10L, 20L)), "0+2", 11L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(5L, 15L)), "0+4+4", 12L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(10L, 20L)), "0+4", 12L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+2+2+2+2", 13L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(10L, 20L)), "0+2+2", 13L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(5L, 15L)), "0+3+3", 14L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(10L, 20L)), "0+3", 14L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2+2+2+1", 8L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2+2+2+1+2", 8L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2+2+2+1+2+3", 9L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+2+2+2+2+3", 13L)), supplier.theCapturedProcessor().processed);
    }

    @Test
    public void testJoin() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        String topic2 = "topic2";
        KTable table1 = builder.stream("topic1", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(10L)).advanceBy(Duration.ofMillis(5L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((String)"topic1-Canonized").withValueSerde(Serdes.String()));
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        table1.toStream().process(supplier, new String[0]);
        KTable table2 = builder.stream("topic2", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(10L)).advanceBy(Duration.ofMillis(5L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as((String)"topic2-Canonized").withValueSerde(Serdes.String()));
        table2.toStream().process(supplier, new String[0]);
        table1.join(table2, (p1, p2) -> p1 + "%" + p2).toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic1.pipeInput((Object)"A", (Object)"1", 0L);
            inputTopic1.pipeInput((Object)"B", (Object)"2", 1L);
            inputTopic1.pipeInput((Object)"C", (Object)"3", 2L);
            inputTopic1.pipeInput((Object)"D", (Object)"4", 3L);
            inputTopic1.pipeInput((Object)"A", (Object)"1", 9L);
            List processors = supplier.capturedProcessors(3);
            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1", 0L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2", 1L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(0L, 10L)), "0+3", 2L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(0L, 10L)), "0+4", 3L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+1", 9L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+1", 9L));
            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp[0]);
            processors.get(2).checkAndClearProcessResult(new KeyValueTimestamp[0]);
            inputTopic1.pipeInput((Object)"A", (Object)"1", 5L);
            inputTopic1.pipeInput((Object)"B", (Object)"2", 6L);
            inputTopic1.pipeInput((Object)"D", (Object)"4", 7L);
            inputTopic1.pipeInput((Object)"B", (Object)"2", 8L);
            inputTopic1.pipeInput((Object)"C", (Object)"3", 9L);
            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+1+1", 9L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+1+1", 9L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2+2", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+2", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(0L, 10L)), "0+4+4", 7L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(5L, 15L)), "0+4", 7L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2+2+2", 8L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+2+2", 8L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(0L, 10L)), "0+3+3", 9L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(5L, 15L)), "0+3", 9L));
            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp[0]);
            processors.get(2).checkAndClearProcessResult(new KeyValueTimestamp[0]);
            inputTopic2.pipeInput((Object)"A", (Object)"a", 0L);
            inputTopic2.pipeInput((Object)"B", (Object)"b", 1L);
            inputTopic2.pipeInput((Object)"C", (Object)"c", 2L);
            inputTopic2.pipeInput((Object)"D", (Object)"d", 20L);
            inputTopic2.pipeInput((Object)"A", (Object)"a", 20L);
            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp[0]);
            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+a", 0L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+b", 1L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(0L, 10L)), "0+c", 2L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(15L, 25L)), "0+d", 20L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(20L, 30L)), "0+d", 20L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(15L, 25L)), "0+a", 20L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(20L, 30L)), "0+a", 20L));
            processors.get(2).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+1+1%0+a", 9L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2+2+2%0+b", 8L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(0L, 10L)), "0+3+3%0+c", 9L));
            inputTopic2.pipeInput((Object)"A", (Object)"a", 5L);
            inputTopic2.pipeInput((Object)"B", (Object)"b", 6L);
            inputTopic2.pipeInput((Object)"D", (Object)"d", 7L);
            inputTopic2.pipeInput((Object)"D", (Object)"d", 18L);
            inputTopic2.pipeInput((Object)"A", (Object)"a", 21L);
            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp[0]);
            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+a+a", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+a", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+b+b", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+b", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(0L, 10L)), "0+d", 7L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(5L, 15L)), "0+d", 7L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(10L, 20L)), "0+d", 18L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(15L, 25L)), "0+d+d", 20L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(15L, 25L)), "0+a+a", 21L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(20L, 30L)), "0+a+a", 21L));
            processors.get(2).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+1+1%0+a+a", 9L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+1+1%0+a", 9L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2+2+2%0+b+b", 8L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+2+2%0+b", 8L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(0L, 10L)), "0+4+4%0+d", 7L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(5L, 15L)), "0+4%0+d", 7L));
        }
    }

    @Test
    public void shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetricsVersionLatest() {
        this.shouldLogAndMeterWhenSkippingNullKey("latest");
    }

    @Test
    public void shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetricsVersion0100To24() {
        this.shouldLogAndMeterWhenSkippingNullKey("0.10.0-2.4");
    }

    private void shouldLogAndMeterWhenSkippingNullKey(String builtInMetricsVersion) {
        StreamsBuilder builder = new StreamsBuilder();
        String topic = "topic";
        builder.stream("topic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(10L)).advanceBy(Duration.ofMillis(5L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.toStringInstance("+"), Materialized.as((String)"topic1-Canonicalized").withValueSerde(Serdes.String()));
        LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
        this.props.setProperty("built.in.metrics.version", builtInMetricsVersion);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic.pipeInput(null, (Object)"1");
            LogCaptureAppender.unregister(appender);
            if ("0.10.0-2.4".equals(builtInMetricsVersion)) {
                Assert.assertEquals((Object)1.0, (Object)StreamsTestUtils.getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
            }
            MatcherAssert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key. value=[1] topic=[topic] partition=[0] offset=[0]"));
        }
    }

    @Test
    public void shouldLogAndMeterWhenSkippingExpiredWindowWithBuiltInMetricsVersionLatest() {
        this.shouldLogAndMeterWhenSkippingExpiredWindow("latest");
    }

    @Test
    public void shouldLogAndMeterWhenSkippingExpiredWindowWithBuiltInMetricsVersion0100To24() {
        this.shouldLogAndMeterWhenSkippingExpiredWindow("0.10.0-2.4");
    }

    @Deprecated
    private void shouldLogAndMeterWhenSkippingExpiredWindow(String builtInMetricsVersion) {
        StreamsBuilder builder = new StreamsBuilder();
        String topic = "topic";
        KStream stream1 = builder.stream("topic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        stream1.groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(10L)).advanceBy(Duration.ofMillis(5L)).until(100L)).aggregate(() -> "", MockAggregator.toStringInstance("+"), Materialized.as((String)"topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()).toStream().map((key, value) -> new KeyValue((Object)key.toString(), value)).to("output");
        LogCaptureAppender.setClassLoggerToDebug(KStreamWindowAggregate.class);
        LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
        this.props.setProperty("built.in.metrics.version", builtInMetricsVersion);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic.pipeInput((Object)"k", (Object)"100", 100L);
            inputTopic.pipeInput((Object)"k", (Object)"0", 0L);
            inputTopic.pipeInput((Object)"k", (Object)"1", 1L);
            inputTopic.pipeInput((Object)"k", (Object)"2", 2L);
            inputTopic.pipeInput((Object)"k", (Object)"3", 3L);
            inputTopic.pipeInput((Object)"k", (Object)"4", 4L);
            inputTopic.pipeInput((Object)"k", (Object)"5", 5L);
            inputTopic.pipeInput((Object)"k", (Object)"6", 6L);
            LogCaptureAppender.unregister(appender);
            this.assertLatenessMetrics(driver, builtInMetricsVersion, (Matcher<Object>)CoreMatchers.is((Object)7.0), (Matcher<Object>)CoreMatchers.is((Object)100.0), (Matcher<Object>)CoreMatchers.is((Object)84.875));
            MatcherAssert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItems((Object[])new String[]{"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10] streamTime=[100]", "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10] streamTime=[100]", "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10] streamTime=[100]", "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10] streamTime=[100]", "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10] streamTime=[100]", "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10] streamTime=[100]", "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10] streamTime=[100]"}));
            TestOutputTopic outputTopic = driver.createOutputTopic("output", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
            MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)"[k@95/105]", (Object)"+100", null, Long.valueOf(100L))));
            MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)"[k@100/110]", (Object)"+100", null, Long.valueOf(100L))));
            MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)"[k@5/15]", (Object)"+5", null, Long.valueOf(5L))));
            MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)"[k@5/15]", (Object)"+5+6", null, Long.valueOf(6L))));
            Assert.assertTrue((boolean)outputTopic.isEmpty());
        }
    }

    @Test
    public void shouldLogAndMeterWhenSkippingExpiredWindowByGraceWithBuiltInMetricsVersionLatest() {
        this.shouldLogAndMeterWhenSkippingExpiredWindowByGrace("latest");
    }

    @Test
    public void shouldLogAndMeterWhenSkippingExpiredWindowByGraceWithBuiltInMetricsVersion0100To24() {
        this.shouldLogAndMeterWhenSkippingExpiredWindowByGrace("0.10.0-2.4");
    }

    private void shouldLogAndMeterWhenSkippingExpiredWindowByGrace(String builtInMetricsVersion) {
        StreamsBuilder builder = new StreamsBuilder();
        String topic = "topic";
        KStream stream1 = builder.stream("topic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        stream1.groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(10L)).advanceBy(Duration.ofMillis(10L)).grace(Duration.ofMillis(90L))).aggregate(() -> "", MockAggregator.toStringInstance("+"), Materialized.as((String)"topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()).toStream().map((key, value) -> new KeyValue((Object)key.toString(), value)).to("output");
        LogCaptureAppender.setClassLoggerToDebug(KStreamWindowAggregate.class);
        LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
        this.props.setProperty("built.in.metrics.version", builtInMetricsVersion);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic.pipeInput((Object)"k", (Object)"100", 200L);
            inputTopic.pipeInput((Object)"k", (Object)"0", 100L);
            inputTopic.pipeInput((Object)"k", (Object)"1", 101L);
            inputTopic.pipeInput((Object)"k", (Object)"2", 102L);
            inputTopic.pipeInput((Object)"k", (Object)"3", 103L);
            inputTopic.pipeInput((Object)"k", (Object)"4", 104L);
            inputTopic.pipeInput((Object)"k", (Object)"5", 105L);
            inputTopic.pipeInput((Object)"k", (Object)"6", 6L);
            LogCaptureAppender.unregister(appender);
            this.assertLatenessMetrics(driver, builtInMetricsVersion, (Matcher<Object>)CoreMatchers.is((Object)7.0), (Matcher<Object>)CoreMatchers.is((Object)194.0), (Matcher<Object>)CoreMatchers.is((Object)97.375));
            MatcherAssert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItems((Object[])new String[]{"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[100,110) expiration=[110] streamTime=[200]", "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[100,110) expiration=[110] streamTime=[200]", "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[100,110) expiration=[110] streamTime=[200]", "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[100,110) expiration=[110] streamTime=[200]", "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[100,110) expiration=[110] streamTime=[200]", "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[100,110) expiration=[110] streamTime=[200]", "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[110] streamTime=[200]"}));
            TestOutputTopic outputTopic = driver.createOutputTopic("output", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
            MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)"[k@200/210]", (Object)"+100", null, Long.valueOf(200L))));
            Assert.assertTrue((boolean)outputTopic.isEmpty());
        }
    }

    private void assertLatenessMetrics(TopologyTestDriver driver, String builtInMetricsVersion, Matcher<Object> dropTotal, Matcher<Object> maxLateness, Matcher<Object> avgLateness) {
        MetricName latenessAvgMetric;
        MetricName latenessMaxMetric;
        MetricName dropRateMetric;
        MetricName dropTotalMetric;
        if ("0.10.0-2.4".equals(builtInMetricsVersion)) {
            dropTotalMetric = new MetricName("late-record-drop-total", "stream-processor-node-metrics", "The total number of dropped late records", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"client-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0"), Utils.mkEntry((Object)"processor-node-id", (Object)"KSTREAM-AGGREGATE-0000000001")}));
            dropRateMetric = new MetricName("late-record-drop-rate", "stream-processor-node-metrics", "The average number of dropped late records per second", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"client-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0"), Utils.mkEntry((Object)"processor-node-id", (Object)"KSTREAM-AGGREGATE-0000000001")}));
            latenessMaxMetric = new MetricName("record-lateness-max", "stream-task-metrics", "The observed maximum lateness of records in milliseconds, measured by comparing the record timestamp with the current stream time", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"client-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
            latenessAvgMetric = new MetricName("record-lateness-avg", "stream-task-metrics", "The observed average lateness of records in milliseconds, measured by comparing the record timestamp with the current stream time", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"client-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
        } else {
            dropTotalMetric = new MetricName("dropped-records-total", "stream-task-metrics", "The total number of dropped records", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
            dropRateMetric = new MetricName("dropped-records-rate", "stream-task-metrics", "The average number of dropped records per second", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
            latenessMaxMetric = new MetricName("record-lateness-max", "stream-task-metrics", "The observed maximum lateness of records in milliseconds, measured by comparing the record timestamp with the current stream time", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
            latenessAvgMetric = new MetricName("record-lateness-avg", "stream-task-metrics", "The observed average lateness of records in milliseconds, measured by comparing the record timestamp with the current stream time", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
        }
        MatcherAssert.assertThat((Object)((Metric)driver.metrics().get(dropTotalMetric)).metricValue(), dropTotal);
        MatcherAssert.assertThat((Object)((Metric)driver.metrics().get(dropRateMetric)).metricValue(), (Matcher)CoreMatchers.not((Object)0.0));
        MatcherAssert.assertThat((Object)((Metric)driver.metrics().get(latenessMaxMetric)).metricValue(), maxLateness);
        MatcherAssert.assertThat((Object)((Metric)driver.metrics().get(latenessAvgMetric)).metricValue(), avgLateness);
    }
}

