/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals.suppress;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collection;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueChangeBuffer;
import org.apache.kafka.test.MockInternalNewProcessorContext;
import org.hamcrest.BaseMatcher;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith(value={MockitoExtension.class})
@MockitoSettings(strictness=Strictness.STRICT_STUBS)
public class KTableSuppressProcessorTest {
    private static final long ARBITRARY_LONG = 5L;
    private static final Change<Long> ARBITRARY_CHANGE = new Change((Object)7L, (Object)14L);

    @Test
    public void zeroTimeLimitShouldImmediatelyEmit() {
        Harness harness = new Harness(Suppressed.untilTimeLimit((Duration)Duration.ZERO, (Suppressed.BufferConfig)Suppressed.BufferConfig.unbounded()), Serdes.String(), Serdes.Long());
        MockInternalNewProcessorContext context = harness.context;
        long timestamp = 5L;
        context.setRecordMetadata("", 0, 0L);
        context.setTimestamp(5L);
        String key = "hey";
        Change<Long> value = ARBITRARY_CHANGE;
        harness.processor.process(new Record((Object)"hey", value, 5L));
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(1));
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward)context.forwarded().get(0);
        MatcherAssert.assertThat((Object)capturedForward.record(), (Matcher)CoreMatchers.is((Object)new Record((Object)"hey", value, 5L)));
    }

    @Test
    public void windowedZeroTimeLimitShouldImmediatelyEmit() {
        Harness harness = new Harness(Suppressed.untilTimeLimit((Duration)Duration.ZERO, (Suppressed.BufferConfig)Suppressed.BufferConfig.unbounded()), KTableSuppressProcessorTest.timeWindowedSerdeFrom(String.class, 100L), Serdes.Long());
        MockInternalNewProcessorContext context = harness.context;
        long timestamp = 5L;
        context.setRecordMetadata("", 0, 0L);
        context.setTimestamp(5L);
        Windowed key = new Windowed((Object)"hey", (Window)new TimeWindow(0L, 100L));
        Change<Long> value = ARBITRARY_CHANGE;
        harness.processor.process(new Record((Object)key, value, 5L));
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(1));
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward)context.forwarded().get(0);
        MatcherAssert.assertThat((Object)capturedForward.record(), (Matcher)CoreMatchers.is((Object)new Record((Object)key, value, 5L)));
    }

    @Test
    public void intermediateSuppressionShouldBufferAndEmitLater() {
        Harness harness = new Harness(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(1L), (Suppressed.BufferConfig)Suppressed.BufferConfig.unbounded()), Serdes.String(), Serdes.Long());
        MockInternalNewProcessorContext context = harness.context;
        long timestamp = 0L;
        context.setRecordMetadata("topic", 0, 0L);
        context.setTimestamp(0L);
        String key = "hey";
        Change value = new Change(null, (Object)1L);
        harness.processor.process(new Record((Object)"hey", (Object)value, 0L));
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(0));
        context.setRecordMetadata("topic", 0, 1L);
        context.setTimestamp(1L);
        harness.processor.process(new Record((Object)"tick", (Object)new Change(null, null), 1L));
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(1));
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward)context.forwarded().get(0);
        MatcherAssert.assertThat((Object)capturedForward.record(), (Matcher)CoreMatchers.is((Object)new Record((Object)"hey", (Object)value, 0L)));
    }

    @Test
    public void finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() {
        Harness harness = new Harness((Suppressed<Windowed<String>>)KTableSuppressProcessorTest.finalResults(Duration.ofMillis(1L)), KTableSuppressProcessorTest.timeWindowedSerdeFrom(String.class, 1L), Serdes.Long());
        MockInternalNewProcessorContext context = harness.context;
        long windowStart = 99L;
        long recordTime = 99L;
        long windowEnd = 100L;
        context.setRecordMetadata("topic", 0, 0L);
        context.setTimestamp(99L);
        Windowed key = new Windowed((Object)"hey", (Window)new TimeWindow(99L, 100L));
        Change<Long> value = ARBITRARY_CHANGE;
        harness.processor.process(new Record((Object)key, value, 99L));
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(0));
        long windowStart2 = 100L;
        long recordTime2 = 100L;
        long windowEnd2 = 101L;
        context.setRecordMetadata("topic", 0, 1L);
        context.setTimestamp(100L);
        harness.processor.process(new Record((Object)new Windowed((Object)"dummyKey1", (Window)new TimeWindow(100L, 101L)), ARBITRARY_CHANGE, 100L));
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(0));
        long windowStart3 = 101L;
        long recordTime3 = 101L;
        long windowEnd3 = 102L;
        context.setRecordMetadata("topic", 0, 1L);
        context.setTimestamp(101L);
        harness.processor.process(new Record((Object)new Windowed((Object)"dummyKey2", (Window)new TimeWindow(101L, 102L)), ARBITRARY_CHANGE, 101L));
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(1));
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward)context.forwarded().get(0);
        MatcherAssert.assertThat((Object)capturedForward.record(), (Matcher)CoreMatchers.is((Object)new Record((Object)key, value, 99L)));
    }

    @Test
    public void finalResultsWithZeroGraceShouldStillBufferUntilTheWindowEnd() {
        Harness harness = new Harness((Suppressed<Windowed<String>>)KTableSuppressProcessorTest.finalResults(Duration.ofMillis(0L)), KTableSuppressProcessorTest.timeWindowedSerdeFrom(String.class, 100L), Serdes.Long());
        MockInternalNewProcessorContext context = harness.context;
        long timestamp = 5L;
        long windowEnd = 100L;
        context.setRecordMetadata("", 0, 0L);
        context.setTimestamp(5L);
        Windowed key = new Windowed((Object)"hey", (Window)new TimeWindow(0L, 100L));
        Change<Long> value = ARBITRARY_CHANGE;
        harness.processor.process(new Record((Object)key, value, 5L));
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(0));
        context.setRecordMetadata("", 0, 1L);
        context.setTimestamp(100L);
        harness.processor.process(new Record((Object)new Windowed((Object)"dummyKey", (Window)new TimeWindow(100L, 200L)), ARBITRARY_CHANGE, 100L));
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(1));
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward)context.forwarded().get(0);
        MatcherAssert.assertThat((Object)capturedForward.record(), (Matcher)CoreMatchers.is((Object)new Record((Object)key, value, 5L)));
    }

    @Test
    public void finalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit() {
        Harness harness = new Harness((Suppressed<Windowed<String>>)KTableSuppressProcessorTest.finalResults(Duration.ofMillis(0L)), KTableSuppressProcessorTest.timeWindowedSerdeFrom(String.class, 100L), Serdes.Long());
        MockInternalNewProcessorContext context = harness.context;
        long timestamp = 100L;
        context.setRecordMetadata("", 0, 0L);
        context.setTimestamp(100L);
        Windowed key = new Windowed((Object)"hey", (Window)new TimeWindow(0L, 100L));
        Change<Long> value = ARBITRARY_CHANGE;
        harness.processor.process(new Record((Object)key, value, 100L));
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(1));
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward)context.forwarded().get(0);
        MatcherAssert.assertThat((Object)capturedForward.record(), (Matcher)CoreMatchers.is((Object)new Record((Object)key, value, 100L)));
    }

    @Test
    public void finalResultsShouldDropTombstonesForTimeWindows() {
        Harness harness = new Harness((Suppressed<Windowed<String>>)KTableSuppressProcessorTest.finalResults(Duration.ofMillis(0L)), KTableSuppressProcessorTest.timeWindowedSerdeFrom(String.class, 100L), Serdes.Long());
        MockInternalNewProcessorContext context = harness.context;
        long timestamp = 100L;
        context.setRecordMetadata("", 0, 0L);
        context.setTimestamp(100L);
        Windowed key = new Windowed((Object)"hey", (Window)new TimeWindow(0L, 100L));
        Change value = new Change(null, (Object)5L);
        harness.processor.process(new Record((Object)key, (Object)value, 100L));
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(0));
    }

    @Test
    public void finalResultsShouldDropTombstonesForSessionWindows() {
        Harness harness = new Harness(KTableSuppressProcessorTest.finalResults(Duration.ofMillis(0L)), WindowedSerdes.sessionWindowedSerdeFrom(String.class), Serdes.Long());
        MockInternalNewProcessorContext context = harness.context;
        long timestamp = 100L;
        context.setRecordMetadata("", 0, 0L);
        context.setTimestamp(100L);
        Windowed key = new Windowed((Object)"hey", (Window)new SessionWindow(0L, 0L));
        Change value = new Change(null, (Object)5L);
        harness.processor.process(new Record((Object)key, (Object)value, 100L));
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(0));
    }

    @Test
    public void suppressShouldNotDropTombstonesForTimeWindows() {
        Harness harness = new Harness(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(0L), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxRecords((long)0L)), KTableSuppressProcessorTest.timeWindowedSerdeFrom(String.class, 100L), Serdes.Long());
        MockInternalNewProcessorContext context = harness.context;
        long timestamp = 100L;
        Headers headers = new RecordHeaders().add("k", "v".getBytes(StandardCharsets.UTF_8));
        context.setRecordMetadata("", 0, 0L);
        context.setTimestamp(100L);
        context.setHeaders(headers);
        Windowed key = new Windowed((Object)"hey", (Window)new TimeWindow(0L, 100L));
        Change value = new Change(null, (Object)5L);
        harness.processor.process(new Record((Object)key, (Object)value, 100L));
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(1));
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward)context.forwarded().get(0);
        MatcherAssert.assertThat((Object)capturedForward.record(), (Matcher)CoreMatchers.is((Object)new Record((Object)key, (Object)value, 100L, headers)));
    }

    @Test
    public void suppressShouldNotDropTombstonesForSessionWindows() {
        Harness harness = new Harness(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(0L), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxRecords((long)0L)), WindowedSerdes.sessionWindowedSerdeFrom(String.class), Serdes.Long());
        MockInternalNewProcessorContext context = harness.context;
        long timestamp = 100L;
        context.setRecordMetadata("", 0, 0L);
        context.setTimestamp(100L);
        Windowed key = new Windowed((Object)"hey", (Window)new SessionWindow(0L, 0L));
        Change value = new Change(null, (Object)5L);
        harness.processor.process(new Record((Object)key, (Object)value, 100L));
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(1));
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward)context.forwarded().get(0);
        MatcherAssert.assertThat((Object)capturedForward.record(), (Matcher)CoreMatchers.is((Object)new Record((Object)key, (Object)value, 100L)));
    }

    @Test
    public void suppressShouldNotDropTombstonesForKTable() {
        Harness harness = new Harness(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(0L), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxRecords((long)0L)), Serdes.String(), Serdes.Long());
        MockInternalNewProcessorContext context = harness.context;
        long timestamp = 100L;
        context.setRecordMetadata("", 0, 0L);
        context.setTimestamp(100L);
        String key = "hey";
        Change value = new Change(null, (Object)5L);
        harness.processor.process(new Record((Object)"hey", (Object)value, 100L));
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(1));
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward)context.forwarded().get(0);
        MatcherAssert.assertThat((Object)capturedForward.record(), (Matcher)CoreMatchers.is((Object)new Record((Object)"hey", (Object)value, 100L)));
    }

    @Test
    public void suppressShouldEmitWhenOverRecordCapacity() {
        Harness harness = new Harness(Suppressed.untilTimeLimit((Duration)Duration.ofDays(100L), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxRecords((long)1L)), Serdes.String(), Serdes.Long());
        MockInternalNewProcessorContext context = harness.context;
        long timestamp = 100L;
        context.setRecordMetadata("", 0, 0L);
        context.setTimestamp(100L);
        String key = "hey";
        Change value = new Change(null, (Object)5L);
        harness.processor.process(new Record((Object)"hey", (Object)value, 100L));
        context.setRecordMetadata("", 0, 1L);
        context.setTimestamp(101L);
        harness.processor.process(new Record((Object)"dummyKey", (Object)value, 101L));
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(1));
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward)context.forwarded().get(0);
        MatcherAssert.assertThat((Object)capturedForward.record(), (Matcher)CoreMatchers.is((Object)new Record((Object)"hey", (Object)value, 100L)));
    }

    @Test
    public void suppressShouldEmitWhenOverByteCapacity() {
        Harness harness = new Harness(Suppressed.untilTimeLimit((Duration)Duration.ofDays(100L), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxBytes((long)60L)), Serdes.String(), Serdes.Long());
        MockInternalNewProcessorContext context = harness.context;
        long timestamp = 100L;
        context.setRecordMetadata("", 0, 0L);
        context.setTimestamp(100L);
        String key = "hey";
        Change value = new Change(null, (Object)5L);
        harness.processor.process(new Record((Object)"hey", (Object)value, 100L));
        context.setRecordMetadata("", 0, 1L);
        context.setTimestamp(101L);
        harness.processor.process(new Record((Object)"dummyKey", (Object)value, 101L));
        MatcherAssert.assertThat((Object)context.forwarded(), KTableSuppressProcessorTest.hasSize(1));
        MockProcessorContext.CapturedForward capturedForward = (MockProcessorContext.CapturedForward)context.forwarded().get(0);
        MatcherAssert.assertThat((Object)capturedForward.record(), (Matcher)CoreMatchers.is((Object)new Record((Object)"hey", (Object)value, 100L)));
    }

    @Test
    public void suppressShouldShutDownWhenOverRecordCapacity() {
        Harness harness = new Harness(Suppressed.untilTimeLimit((Duration)Duration.ofDays(100L), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxRecords((long)1L).shutDownWhenFull()), Serdes.String(), Serdes.Long());
        MockInternalNewProcessorContext context = harness.context;
        long timestamp = 100L;
        context.setRecordMetadata("", 0, 0L);
        context.setTimestamp(100L);
        context.setCurrentNode(new ProcessorNode("testNode"));
        String key = "hey";
        Change value = new Change(null, (Object)5L);
        harness.processor.process(new Record((Object)"hey", (Object)value, 100L));
        context.setRecordMetadata("", 0, 1L);
        context.setTimestamp(100L);
        try {
            harness.processor.process(new Record((Object)"dummyKey", (Object)value, 100L));
            Assertions.fail((String)"expected an exception");
        }
        catch (StreamsException e) {
            MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.containsString((String)"buffer exceeded its max capacity"));
        }
    }

    @Test
    public void suppressShouldShutDownWhenOverByteCapacity() {
        Harness harness = new Harness(Suppressed.untilTimeLimit((Duration)Duration.ofDays(100L), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxBytes((long)60L).shutDownWhenFull()), Serdes.String(), Serdes.Long());
        MockInternalNewProcessorContext context = harness.context;
        long timestamp = 100L;
        context.setRecordMetadata("", 0, 0L);
        context.setTimestamp(100L);
        context.setCurrentNode(new ProcessorNode("testNode"));
        String key = "hey";
        Change value = new Change(null, (Object)5L);
        harness.processor.process(new Record((Object)"hey", (Object)value, 100L));
        context.setRecordMetadata("", 0, 1L);
        context.setTimestamp(1L);
        try {
            harness.processor.process(new Record((Object)"dummyKey", (Object)value, 100L));
            Assertions.fail((String)"expected an exception");
        }
        catch (StreamsException e) {
            MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.containsString((String)"buffer exceeded its max capacity"));
        }
    }

    private static <K extends Windowed> SuppressedInternal<K> finalResults(Duration grace) {
        return ((FinalResultsSuppressionBuilder)Suppressed.untilWindowCloses((Suppressed.StrictBufferConfig)Suppressed.BufferConfig.unbounded())).buildFinalResultsSuppression(grace);
    }

    private static <E> Matcher<Collection<E>> hasSize(final int i) {
        return new BaseMatcher<Collection<E>>(){

            public void describeTo(Description description) {
                description.appendText("a collection of size " + i);
            }

            public boolean matches(Object item) {
                if (item == null) {
                    return false;
                }
                return ((Collection)item).size() == i;
            }
        };
    }

    private static <K> Serde<Windowed<K>> timeWindowedSerdeFrom(Class<K> rawType, long windowSize) {
        Serde kSerde = Serdes.serdeFrom(rawType);
        return new Serdes.WrapperSerde((Serializer)new TimeWindowedSerializer(kSerde.serializer()), (Deserializer)new TimeWindowedDeserializer(kSerde.deserializer(), Long.valueOf(windowSize)));
    }

    private static class Harness<K, V> {
        private final Processor<K, Change<V>, K, Change<V>> processor;
        private final MockInternalNewProcessorContext<K, Change<V>> context;

        Harness(Suppressed<K> suppressed, Serde<K> keySerde, Serde<V> valueSerde) {
            String storeName = "test-store";
            StateStore buffer = new InMemoryTimeOrderedKeyValueChangeBuffer.Builder("test-store", keySerde, valueSerde).withLoggingDisabled().build();
            KTableImpl parent = (KTableImpl)Mockito.mock(KTableImpl.class);
            Processor processor = new KTableSuppressProcessorSupplier((SuppressedInternal)suppressed, "test-store", parent).get();
            MockInternalNewProcessorContext context = new MockInternalNewProcessorContext();
            context.setCurrentNode(new ProcessorNode("testNode"));
            buffer.init(context, buffer);
            processor.init(context);
            this.processor = processor;
            this.context = context;
        }
    }
}

