/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorNodePunctuator;
import org.apache.kafka.streams.processor.internals.PunctuationQueue;
import org.apache.kafka.streams.processor.internals.PunctuationSchedule;
import org.apache.kafka.test.MockProcessorNode;
import org.junit.Assert;
import org.junit.Test;

public class PunctuationQueueTest {
    private final MockProcessorNode<String, String> node = new MockProcessorNode();
    private final PunctuationQueue queue = new PunctuationQueue();
    private final Punctuator punctuator = new Punctuator(){

        public void punctuate(long timestamp) {
            ((PunctuationQueueTest)PunctuationQueueTest.this).node.mockProcessor.punctuatedStreamTime.add(timestamp);
        }
    };

    @Test
    public void testPunctuationInterval() {
        PunctuationSchedule sched = new PunctuationSchedule(this.node, 0L, 100L, this.punctuator);
        long now = sched.timestamp - 100L;
        this.queue.schedule(sched);
        ProcessorNodePunctuator processorNodePunctuator = new ProcessorNodePunctuator(){

            public void punctuate(ProcessorNode node, long timestamp, PunctuationType type, Punctuator punctuator) {
                punctuator.punctuate(timestamp);
            }
        };
        this.queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)0L, (long)this.node.mockProcessor.punctuatedStreamTime.size());
        this.queue.mayPunctuate(now + 99L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)0L, (long)this.node.mockProcessor.punctuatedStreamTime.size());
        this.queue.mayPunctuate(now + 100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)1L, (long)this.node.mockProcessor.punctuatedStreamTime.size());
        this.queue.mayPunctuate(now + 199L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)1L, (long)this.node.mockProcessor.punctuatedStreamTime.size());
        this.queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)2L, (long)this.node.mockProcessor.punctuatedStreamTime.size());
        this.queue.mayPunctuate(now + 1001L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)3L, (long)this.node.mockProcessor.punctuatedStreamTime.size());
        this.queue.mayPunctuate(now + 1002L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)3L, (long)this.node.mockProcessor.punctuatedStreamTime.size());
        this.queue.mayPunctuate(now + 1100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)4L, (long)this.node.mockProcessor.punctuatedStreamTime.size());
    }

    @Test
    public void testPunctuationIntervalCustomAlignment() {
        PunctuationSchedule sched = new PunctuationSchedule(this.node, 50L, 100L, this.punctuator);
        long now = sched.timestamp - 50L;
        this.queue.schedule(sched);
        ProcessorNodePunctuator processorNodePunctuator = new ProcessorNodePunctuator(){

            public void punctuate(ProcessorNode node, long timestamp, PunctuationType type, Punctuator punctuator) {
                punctuator.punctuate(timestamp);
            }
        };
        this.queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)0L, (long)this.node.mockProcessor.punctuatedStreamTime.size());
        this.queue.mayPunctuate(now + 49L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)0L, (long)this.node.mockProcessor.punctuatedStreamTime.size());
        this.queue.mayPunctuate(now + 50L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)1L, (long)this.node.mockProcessor.punctuatedStreamTime.size());
        this.queue.mayPunctuate(now + 149L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)1L, (long)this.node.mockProcessor.punctuatedStreamTime.size());
        this.queue.mayPunctuate(now + 150L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)2L, (long)this.node.mockProcessor.punctuatedStreamTime.size());
        this.queue.mayPunctuate(now + 1051L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)3L, (long)this.node.mockProcessor.punctuatedStreamTime.size());
        this.queue.mayPunctuate(now + 1052L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)3L, (long)this.node.mockProcessor.punctuatedStreamTime.size());
        this.queue.mayPunctuate(now + 1150L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)4L, (long)this.node.mockProcessor.punctuatedStreamTime.size());
    }

    @Test
    public void testPunctuationIntervalCancelFromPunctuator() {
        PunctuationSchedule sched = new PunctuationSchedule(this.node, 0L, 100L, this.punctuator);
        long now = sched.timestamp - 100L;
        final Cancellable cancellable = this.queue.schedule(sched);
        ProcessorNodePunctuator processorNodePunctuator = new ProcessorNodePunctuator(){

            public void punctuate(ProcessorNode node, long timestamp, PunctuationType type, Punctuator punctuator) {
                punctuator.punctuate(timestamp);
                cancellable.cancel();
            }
        };
        this.queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)0L, (long)this.node.mockProcessor.punctuatedStreamTime.size());
        this.queue.mayPunctuate(now + 100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)1L, (long)this.node.mockProcessor.punctuatedStreamTime.size());
        this.queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)1L, (long)this.node.mockProcessor.punctuatedStreamTime.size());
    }

    private static class TestProcessor
    extends AbstractProcessor<String, String> {
        private TestProcessor() {
        }

        public void init(ProcessorContext context) {
        }

        public void process(String key, String value) {
        }

        public void close() {
        }
    }
}

