/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

public class KStreamWindowReduceTest {
    private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new StringSerializer());

    @Test
    public void shouldLogAndMeterOnNullKey() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("TOPIC", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Serialized.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.of((long)500L)).reduce((Reducer)new Reducer<String>(){

            public String apply(String value1, String value2) {
                return value1 + "+" + value2;
            }
        });
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
            driver.pipeInput(this.recordFactory.create("TOPIC", null, (Object)"asdf"));
            LogCaptureAppender.unregister(appender);
            Assert.assertEquals((Object)1.0, (Object)StreamsTestUtils.getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
            MatcherAssert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key. value=[asdf] topic=[TOPIC] partition=[0] offset=[0]"));
        }
    }
}

