/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.source;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
import org.apache.flink.streaming.api.operators.source.OnEventTestWatermarkGenerator;
import org.apache.flink.streaming.api.operators.source.OnPeriodicTestWatermarkGenerator;
import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class SourceOperatorEventTimeTest {
    private final boolean emitProgressiveWatermarks;

    @Parameterized.Parameters(name="Emit progressive watermarks: {0}")
    public static Collection<Boolean> parameters() {
        return Arrays.asList(true, false);
    }

    public SourceOperatorEventTimeTest(boolean emitProgressiveWatermarks) {
        this.emitProgressiveWatermarks = emitProgressiveWatermarks;
    }

    @Test
    public void testMainOutputPeriodicWatermarks() throws Exception {
        WatermarkStrategy watermarkStrategy = WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new OnPeriodicTestWatermarkGenerator());
        List<org.apache.flink.api.common.eventtime.Watermark> result = this.testSequenceOfWatermarks(this.emitProgressiveWatermarks, (WatermarkStrategy<Integer>)watermarkStrategy, output -> output.collect((Object)0, 100L), output -> output.collect((Object)0, 120L), output -> output.collect((Object)0, 110L));
        this.assertWatermarksOrEmpty(result, new org.apache.flink.api.common.eventtime.Watermark(100L), new org.apache.flink.api.common.eventtime.Watermark(120L));
    }

    @Test
    public void testMainOutputEventWatermarks() throws Exception {
        WatermarkStrategy watermarkStrategy = WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new OnEventTestWatermarkGenerator());
        List<org.apache.flink.api.common.eventtime.Watermark> result = this.testSequenceOfWatermarks(this.emitProgressiveWatermarks, (WatermarkStrategy<Integer>)watermarkStrategy, output -> output.collect((Object)0, 100L), output -> output.collect((Object)0, 120L), output -> output.collect((Object)0, 110L));
        this.assertWatermarksOrEmpty(result, new org.apache.flink.api.common.eventtime.Watermark(100L), new org.apache.flink.api.common.eventtime.Watermark(120L));
    }

    @Test
    public void testPerSplitOutputPeriodicWatermarks() throws Exception {
        WatermarkStrategy watermarkStrategy = WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new OnPeriodicTestWatermarkGenerator());
        List<org.apache.flink.api.common.eventtime.Watermark> result = this.testSequenceOfWatermarks(this.emitProgressiveWatermarks, (WatermarkStrategy<Integer>)watermarkStrategy, output -> {
            output.createOutputForSplit("A");
            output.createOutputForSplit("B");
        }, output -> output.createOutputForSplit("A").collect((Object)0, 100L), output -> output.createOutputForSplit("B").collect((Object)0, 200L), output -> output.createOutputForSplit("A").collect((Object)0, 150L), output -> output.releaseOutputForSplit("A"), output -> output.createOutputForSplit("B").collect((Object)0, 200L));
        this.assertWatermarksOrEmpty(result, new org.apache.flink.api.common.eventtime.Watermark(100L), new org.apache.flink.api.common.eventtime.Watermark(150L), new org.apache.flink.api.common.eventtime.Watermark(200L));
    }

    @Test
    public void testPerSplitOutputEventWatermarks() throws Exception {
        WatermarkStrategy watermarkStrategy = WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new OnEventTestWatermarkGenerator());
        List<org.apache.flink.api.common.eventtime.Watermark> result = this.testSequenceOfWatermarks(this.emitProgressiveWatermarks, (WatermarkStrategy<Integer>)watermarkStrategy, output -> {
            output.createOutputForSplit("one");
            output.createOutputForSplit("two");
        }, output -> output.createOutputForSplit("one").collect((Object)0, 100L), output -> output.createOutputForSplit("two").collect((Object)0, 200L), output -> output.createOutputForSplit("one").collect((Object)0, 150L), output -> output.releaseOutputForSplit("one"), output -> output.createOutputForSplit("two").collect((Object)0, 200L));
        this.assertWatermarksOrEmpty(result, new org.apache.flink.api.common.eventtime.Watermark(100L), new org.apache.flink.api.common.eventtime.Watermark(150L), new org.apache.flink.api.common.eventtime.Watermark(200L));
    }

    private void assertWatermarksOrEmpty(List<org.apache.flink.api.common.eventtime.Watermark> actualWatermarks, org.apache.flink.api.common.eventtime.Watermark ... expectedWatermarks) {
        if (this.emitProgressiveWatermarks) {
            ArrayList watermarks = Lists.newArrayList((Object[])expectedWatermarks);
            Assert.assertThat(actualWatermarks, (Matcher)Matchers.contains((Object[])watermarks.toArray()));
        } else {
            Assert.assertThat(actualWatermarks, (Matcher)Matchers.hasSize((int)0));
        }
    }

    @SafeVarargs
    private final List<org.apache.flink.api.common.eventtime.Watermark> testSequenceOfWatermarks(boolean emitProgressiveWatermarks, WatermarkStrategy<Integer> watermarkStrategy, Consumer<ReaderOutput<Integer>> ... actions) throws Exception {
        List<Object> allEvents = this.testSequenceOfEvents(emitProgressiveWatermarks, watermarkStrategy, actions);
        return allEvents.stream().filter(evt -> evt instanceof Watermark).map(evt -> new org.apache.flink.api.common.eventtime.Watermark(((Watermark)evt).getTimestamp())).collect(Collectors.toList());
    }

    @SafeVarargs
    private final List<Object> testSequenceOfEvents(boolean emitProgressiveWatermarks, WatermarkStrategy<Integer> watermarkStrategy, Consumer<ReaderOutput<Integer>> ... actions) throws Exception {
        CollectingDataOutput out = new CollectingDataOutput();
        TestProcessingTimeService timeService = new TestProcessingTimeService();
        timeService.setCurrentTime(Integer.MAX_VALUE);
        InterpretingSourceReader reader = new InterpretingSourceReader((Consumer[])actions);
        SourceOperator<Integer, MockSourceSplit> sourceOperator = SourceOperatorEventTimeTest.createTestOperator(reader, watermarkStrategy, (ProcessingTimeService)timeService, emitProgressiveWatermarks);
        while (sourceOperator.emitNext(out) != DataInputStatus.END_OF_INPUT) {
            timeService.setCurrentTime(timeService.getCurrentProcessingTime() + 100L);
        }
        return out.events;
    }

    private static <T> SourceOperator<T, MockSourceSplit> createTestOperator(SourceReader<T, MockSourceSplit> reader, WatermarkStrategy<T> watermarkStrategy, ProcessingTimeService timeService, boolean emitProgressiveWatermarks) throws Exception {
        OperatorStateBackend operatorStateStore = new MemoryStateBackend().createOperatorStateBackend((Environment)new MockEnvironmentBuilder().build(), "test-operator", Collections.emptyList(), new CloseableRegistry());
        StateInitializationContextImpl stateContext = new StateInitializationContextImpl(null, (OperatorStateStore)operatorStateStore, null, null, null);
        TestingSourceOperator<T> sourceOperator = new TestingSourceOperator<T>(reader, watermarkStrategy, timeService, emitProgressiveWatermarks);
        sourceOperator.setup((StreamTask)new SourceOperatorStreamTask((Environment)new StreamMockEnvironment(new Configuration(), new Configuration(), new ExecutionConfig(), 1L, new MockInputSplitProvider(), 1, (TaskStateManager)new TestTaskStateManager())), new MockStreamConfig(new Configuration(), 1), new MockOutput(new ArrayList()));
        sourceOperator.initializeState((StateInitializationContext)stateContext);
        sourceOperator.open();
        return sourceOperator;
    }

    private static final class InterpretingSourceReader
    implements SourceReader<Integer, MockSourceSplit> {
        private final Iterator<Consumer<ReaderOutput<Integer>>> actions;

        @SafeVarargs
        private InterpretingSourceReader(Consumer<ReaderOutput<Integer>> ... actions) {
            this.actions = Arrays.asList(actions).iterator();
        }

        public void start() {
        }

        public InputStatus pollNext(ReaderOutput<Integer> output) {
            if (this.actions.hasNext()) {
                this.actions.next().accept(output);
                return InputStatus.MORE_AVAILABLE;
            }
            return InputStatus.END_OF_INPUT;
        }

        public List<MockSourceSplit> snapshotState(long checkpointId) {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<Void> isAvailable() {
            return CompletableFuture.completedFuture(null);
        }

        public void addSplits(List<MockSourceSplit> splits) {
        }

        public void notifyNoMoreSplits() {
        }

        public void close() {
        }
    }
}

