/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals.graph;

import java.time.Duration;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
import org.apache.kafka.streams.kstream.internals.graph.GraphGraceSearchUtil;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

public class GraphGraceSearchUtilTest {
    @Test
    public void shouldThrowOnNull() {
        try {
            GraphGraceSearchUtil.findAndVerifyWindowGrace(null);
            Assert.fail((String)"Should have thrown.");
        }
        catch (TopologyException e) {
            MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.is((Object)"Invalid topology: Window close time is only defined for windowed computations. Got []."));
        }
    }

    @Test
    public void shouldFailIfThereIsNoGraceAncestor() {
        StatefulProcessorNode gracelessAncestor = new StatefulProcessorNode("stateful", new ProcessorParameters(() -> new Processor<String, Long>(){

            public void init(ProcessorContext context) {
            }

            public void process(String key, Long value) {
            }

            public void close() {
            }
        }, "dummy"), (StoreBuilder)null);
        ProcessorGraphNode node = new ProcessorGraphNode("stateless", null);
        gracelessAncestor.addChild((GraphNode)node);
        try {
            GraphGraceSearchUtil.findAndVerifyWindowGrace((GraphNode)node);
            Assert.fail((String)"should have thrown.");
        }
        catch (TopologyException e) {
            MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.is((Object)"Invalid topology: Window close time is only defined for windowed computations. Got [stateful->stateless]."));
        }
    }

    @Test
    public void shouldExtractGraceFromKStreamWindowAggregateNode() {
        TimeWindows windows = TimeWindows.of((Duration)Duration.ofMillis(10L)).grace(Duration.ofMillis(1234L));
        StatefulProcessorNode node = new StatefulProcessorNode("asdf", new ProcessorParameters((ProcessorSupplier)new KStreamWindowAggregate((Windows)windows, "asdf", null, null), "asdf"), (StoreBuilder)null);
        long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace((GraphNode)node);
        MatcherAssert.assertThat((Object)extracted, (Matcher)CoreMatchers.is((Object)windows.gracePeriodMs()));
    }

    @Test
    public void shouldExtractGraceFromKStreamSessionWindowAggregateNode() {
        SessionWindows windows = SessionWindows.with((Duration)Duration.ofMillis(10L)).grace(Duration.ofMillis(1234L));
        StatefulProcessorNode node = new StatefulProcessorNode("asdf", new ProcessorParameters((ProcessorSupplier)new KStreamSessionWindowAggregate(windows, "asdf", null, null, null), "asdf"), (StoreBuilder)null);
        long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace((GraphNode)node);
        MatcherAssert.assertThat((Object)extracted, (Matcher)CoreMatchers.is((Object)(windows.gracePeriodMs() + windows.inactivityGap())));
    }

    @Test
    public void shouldExtractGraceFromSessionAncestorThroughStatefulParent() {
        SessionWindows windows = SessionWindows.with((Duration)Duration.ofMillis(10L)).grace(Duration.ofMillis(1234L));
        StatefulProcessorNode graceGrandparent = new StatefulProcessorNode("asdf", new ProcessorParameters((ProcessorSupplier)new KStreamSessionWindowAggregate(windows, "asdf", null, null, null), "asdf"), (StoreBuilder)null);
        StatefulProcessorNode statefulParent = new StatefulProcessorNode("stateful", new ProcessorParameters(() -> new Processor<String, Long>(){

            public void init(ProcessorContext context) {
            }

            public void process(String key, Long value) {
            }

            public void close() {
            }
        }, "dummy"), (StoreBuilder)null);
        graceGrandparent.addChild((GraphNode)statefulParent);
        ProcessorGraphNode node = new ProcessorGraphNode("stateless", null);
        statefulParent.addChild((GraphNode)node);
        long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace((GraphNode)node);
        MatcherAssert.assertThat((Object)extracted, (Matcher)CoreMatchers.is((Object)(windows.gracePeriodMs() + windows.inactivityGap())));
    }

    @Test
    public void shouldExtractGraceFromSessionAncestorThroughStatelessParent() {
        SessionWindows windows = SessionWindows.with((Duration)Duration.ofMillis(10L)).grace(Duration.ofMillis(1234L));
        StatefulProcessorNode graceGrandparent = new StatefulProcessorNode("asdf", new ProcessorParameters((ProcessorSupplier)new KStreamSessionWindowAggregate(windows, "asdf", null, null, null), "asdf"), (StoreBuilder)null);
        ProcessorGraphNode statelessParent = new ProcessorGraphNode("stateless", null);
        graceGrandparent.addChild((GraphNode)statelessParent);
        ProcessorGraphNode node = new ProcessorGraphNode("stateless", null);
        statelessParent.addChild((GraphNode)node);
        long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace((GraphNode)node);
        MatcherAssert.assertThat((Object)extracted, (Matcher)CoreMatchers.is((Object)(windows.gracePeriodMs() + windows.inactivityGap())));
    }

    @Test
    public void shouldUseMaxIfMultiParentsDoNotAgreeOnGrace() {
        StatefulProcessorNode leftParent = new StatefulProcessorNode("asdf", new ProcessorParameters((ProcessorSupplier)new KStreamSessionWindowAggregate(SessionWindows.with((Duration)Duration.ofMillis(10L)).grace(Duration.ofMillis(1234L)), "asdf", null, null, null), "asdf"), (StoreBuilder)null);
        StatefulProcessorNode rightParent = new StatefulProcessorNode("asdf", new ProcessorParameters((ProcessorSupplier)new KStreamWindowAggregate((Windows)TimeWindows.of((Duration)Duration.ofMillis(10L)).grace(Duration.ofMillis(4321L)), "asdf", null, null), "asdf"), (StoreBuilder)null);
        ProcessorGraphNode node = new ProcessorGraphNode("stateless", null);
        leftParent.addChild((GraphNode)node);
        rightParent.addChild((GraphNode)node);
        long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace((GraphNode)node);
        MatcherAssert.assertThat((Object)extracted, (Matcher)CoreMatchers.is((Object)4321L));
    }
}

