/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.Queue;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SourceTaskTerminationTest
extends TestLogger {
    private static OneShotLatch ready;
    private static MultiShotLatch runLoopStart;
    private static MultiShotLatch runLoopEnd;

    @Before
    public void initialize() {
        ready = new OneShotLatch();
        runLoopStart = new MultiShotLatch();
        runLoopEnd = new MultiShotLatch();
    }

    @Test
    public void testStopWithSavepointWithMaxWatermark() throws Exception {
        this.stopWithSavepointStreamTaskTestHelper(true);
    }

    @Test
    public void testStopWithSavepointWithoutMaxWatermark() throws Exception {
        this.stopWithSavepointStreamTaskTestHelper(false);
    }

    private void stopWithSavepointStreamTaskTestHelper(boolean shouldTerminate) throws Exception {
        long syncSavepointId = 34L;
        try (StreamTaskMailboxTestHarness<Long> srcTaskTestHarness = this.getSourceStreamTaskTestHarness();){
            StreamTask<Long, ?> srcTask = srcTaskTestHarness.getStreamTask();
            srcTaskTestHarness.processAll();
            this.emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 1L);
            this.emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 2L);
            srcTaskTestHarness.processUntil(srcTask.triggerCheckpointAsync(new CheckpointMetaData(31L, 900L), CheckpointOptions.forCheckpointWithDefaultLocation())::isDone);
            this.verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), 31L);
            this.emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 3L);
            srcTaskTestHarness.processUntil(srcTask.triggerCheckpointAsync(new CheckpointMetaData(34L, 900L), new CheckpointOptions(shouldTerminate ? CheckpointType.SAVEPOINT_TERMINATE : CheckpointType.SAVEPOINT_SUSPEND, CheckpointStorageLocationReference.getDefault()))::isDone);
            if (shouldTerminate) {
                this.verifyWatermark(srcTaskTestHarness.getOutput(), Watermark.MAX_WATERMARK);
                this.verifyEvent(srcTaskTestHarness.getOutput(), (AbstractEvent)EndOfData.INSTANCE);
            }
            this.verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), 34L);
            this.waitForSynchronousSavepointIdToBeSet(srcTask);
            Assert.assertTrue((boolean)srcTask.getSynchronousSavepointId().isPresent());
            srcTaskTestHarness.processUntil(srcTask.notifyCheckpointCompleteAsync(34L)::isDone);
            if (!shouldTerminate) {
                Assert.assertFalse((boolean)srcTask.getSynchronousSavepointId().isPresent());
            }
            srcTaskTestHarness.waitForTaskCompletion();
        }
    }

    private StreamTaskMailboxTestHarness<Long> getSourceStreamTaskTestHarness() throws Exception {
        StreamTaskMailboxTestHarness<Long> testHarness = new StreamTaskMailboxTestHarnessBuilder(SourceStreamTask::new, BasicTypeInfo.LONG_TYPE_INFO).setCollectNetworkEvents().modifyExecutionConfig(config -> config.setLatencyTrackingInterval(-1L)).setupOutputForSingletonOperatorChain((StreamOperator<?>)new StreamSource((SourceFunction)new LockStepSourceWithOneWmPerElement())).build();
        return testHarness;
    }

    private void waitForSynchronousSavepointIdToBeSet(StreamTask streamTaskUnderTest) throws InterruptedException {
        while (!streamTaskUnderTest.getSynchronousSavepointId().isPresent()) {
            Thread.sleep(10L);
        }
    }

    private void emitAndVerifyWatermarkAndElement(StreamTaskMailboxTestHarness<Long> srcTaskTestHarness, long expectedElement) throws Exception {
        runLoopStart.trigger();
        runLoopEnd.await();
        srcTaskTestHarness.processAll();
        this.verifyWatermark(srcTaskTestHarness.getOutput(), new Watermark(expectedElement));
        this.verifyNextElement(srcTaskTestHarness.getOutput(), expectedElement);
    }

    private void verifyNextElement(Queue<Object> output, long expectedElement) {
        Object next = output.remove();
        Assert.assertTrue((String)"next element is not an event", (boolean)(next instanceof StreamRecord));
        Assert.assertEquals((String)"wrong event", (long)expectedElement, (long)((Long)((StreamRecord)next).getValue()));
    }

    private void verifyWatermark(Queue<Object> output, Watermark expectedWatermark) {
        Object next = output.remove();
        Assert.assertTrue((String)"next element is not a watermark", (boolean)(next instanceof Watermark));
        Assert.assertEquals((String)"wrong watermark", (Object)expectedWatermark, (Object)next);
    }

    private void verifyEvent(Queue<Object> output, AbstractEvent expectedEvent) {
        Object next = output.remove();
        Assert.assertTrue((boolean)expectedEvent.getClass().isInstance(next));
        Assert.assertEquals((Object)expectedEvent, (Object)next);
    }

    private void verifyCheckpointBarrier(Queue<Object> output, long checkpointId) {
        Object next = output.remove();
        Assert.assertTrue((String)"next element is not a checkpoint barrier", (boolean)(next instanceof CheckpointBarrier));
        Assert.assertEquals((String)"wrong checkpoint id", (long)checkpointId, (long)((CheckpointBarrier)next).getId());
    }

    private static class LockStepSourceWithOneWmPerElement
    implements SourceFunction<Long> {
        private volatile boolean isRunning;

        private LockStepSourceWithOneWmPerElement() {
        }

        public void run(SourceFunction.SourceContext<Long> ctx) throws Exception {
            long element = 1L;
            this.isRunning = true;
            ready.trigger();
            while (this.isRunning) {
                runLoopStart.await();
                if (this.isRunning) {
                    ctx.emitWatermark(new Watermark(element));
                    ctx.collect((Object)element++);
                }
                runLoopEnd.trigger();
            }
        }

        public void cancel() {
            this.isRunning = false;
            runLoopStart.trigger();
        }
    }
}

