package org.apache.flink.streaming.runtime.tasks;

import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.class */
class SourceTaskTerminationTest {
    private static OneShotLatch ready;
    private static MultiShotLatch runLoopStart;
    private static MultiShotLatch runLoopEnd;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest$LockStepSourceWithOneWmPerElement.class */
    public static class LockStepSourceWithOneWmPerElement implements SourceFunction<Long> {
        private volatile boolean isRunning;

        private LockStepSourceWithOneWmPerElement() {
        }

        public void run(SourceFunction.SourceContext<Long> sourceContext) throws Exception {
            long j = 1;
            this.isRunning = true;
            SourceTaskTerminationTest.ready.trigger();
            while (this.isRunning) {
                SourceTaskTerminationTest.runLoopStart.await();
                if (this.isRunning) {
                    sourceContext.emitWatermark(new Watermark(j));
                    long j2 = j;
                    j = j2 + 1;
                    sourceContext.collect(Long.valueOf(j2));
                }
                SourceTaskTerminationTest.runLoopEnd.trigger();
            }
        }

        public void cancel() {
            this.isRunning = false;
            SourceTaskTerminationTest.runLoopStart.trigger();
        }
    }

    SourceTaskTerminationTest() {
    }

    @BeforeEach
    void initialize() {
        ready = new OneShotLatch();
        runLoopStart = new MultiShotLatch();
        runLoopEnd = new MultiShotLatch();
    }

    @Test
    void testStopWithSavepointWithMaxWatermark() throws Exception {
        stopWithSavepointStreamTaskTestHelper(true);
    }

    @Test
    void testStopWithSavepointWithoutMaxWatermark() throws Exception {
        stopWithSavepointStreamTaskTestHelper(false);
    }

    private void stopWithSavepointStreamTaskTestHelper(boolean z) throws Exception {
        StreamTaskMailboxTestHarness<Long> sourceStreamTaskTestHarness = getSourceStreamTaskTestHarness();
        Throwable th = null;
        try {
            try {
                StreamTask<Long, ?> streamTask = sourceStreamTaskTestHarness.getStreamTask();
                sourceStreamTaskTestHarness.processAll();
                emitAndVerifyWatermarkAndElement(sourceStreamTaskTestHarness, 1L);
                emitAndVerifyWatermarkAndElement(sourceStreamTaskTestHarness, 2L);
                CompletableFuture triggerCheckpointAsync = streamTask.triggerCheckpointAsync(new CheckpointMetaData(31L, 900L), CheckpointOptions.forCheckpointWithDefaultLocation());
                triggerCheckpointAsync.getClass();
                sourceStreamTaskTestHarness.processUntil(triggerCheckpointAsync::isDone);
                verifyCheckpointBarrier(sourceStreamTaskTestHarness.getOutput(), 31L);
                emitAndVerifyWatermarkAndElement(sourceStreamTaskTestHarness, 3L);
                CompletableFuture triggerCheckpointAsync2 = streamTask.triggerCheckpointAsync(new CheckpointMetaData(34L, 900L), new CheckpointOptions(z ? SavepointType.terminate(SavepointFormatType.CANONICAL) : SavepointType.suspend(SavepointFormatType.CANONICAL), CheckpointStorageLocationReference.getDefault()));
                triggerCheckpointAsync2.getClass();
                sourceStreamTaskTestHarness.processUntil(triggerCheckpointAsync2::isDone);
                if (z) {
                    verifyWatermark(sourceStreamTaskTestHarness.getOutput(), Watermark.MAX_WATERMARK);
                }
                verifyEvent(sourceStreamTaskTestHarness.getOutput(), new EndOfData(z ? StopMode.DRAIN : StopMode.NO_DRAIN));
                verifyCheckpointBarrier(sourceStreamTaskTestHarness.getOutput(), 34L);
                waitForSynchronousSavepointIdToBeSet(streamTask);
                Assertions.assertThat(streamTask.getSynchronousSavepointId()).isPresent();
                Future notifyCheckpointCompleteAsync = streamTask.notifyCheckpointCompleteAsync(34L);
                notifyCheckpointCompleteAsync.getClass();
                sourceStreamTaskTestHarness.processUntil(notifyCheckpointCompleteAsync::isDone);
                sourceStreamTaskTestHarness.waitForTaskCompletion();
                if (sourceStreamTaskTestHarness != null) {
                    if (0 == 0) {
                        sourceStreamTaskTestHarness.close();
                        return;
                    }
                    try {
                        sourceStreamTaskTestHarness.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (sourceStreamTaskTestHarness != null) {
                if (th != null) {
                    try {
                        sourceStreamTaskTestHarness.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    sourceStreamTaskTestHarness.close();
                }
            }
            throw th4;
        }
    }

    private StreamTaskMailboxTestHarness<Long> getSourceStreamTaskTestHarness() throws Exception {
        return new StreamTaskMailboxTestHarnessBuilder(SourceStreamTask::new, BasicTypeInfo.LONG_TYPE_INFO).setCollectNetworkEvents().modifyExecutionConfig(executionConfig -> {
            executionConfig.setLatencyTrackingInterval(-1L);
        }).setupOutputForSingletonOperatorChain((StreamOperator<?>) new StreamSource(new LockStepSourceWithOneWmPerElement())).build();
    }

    private void waitForSynchronousSavepointIdToBeSet(StreamTask streamTask) throws InterruptedException {
        while (!streamTask.getSynchronousSavepointId().isPresent()) {
            Thread.sleep(10L);
        }
    }

    private void emitAndVerifyWatermarkAndElement(StreamTaskMailboxTestHarness<Long> streamTaskMailboxTestHarness, long j) throws Exception {
        runLoopStart.trigger();
        runLoopEnd.await();
        streamTaskMailboxTestHarness.processAll();
        verifyWatermark(streamTaskMailboxTestHarness.getOutput(), new Watermark(j));
        verifyNextElement(streamTaskMailboxTestHarness.getOutput(), j);
    }

    private void verifyNextElement(Queue<Object> queue, long j) {
        Object remove = queue.remove();
        Assertions.assertThat(remove).as("next element is not an event", new Object[0]).isInstanceOf(StreamRecord.class);
        Assertions.assertThat((Long) ((StreamRecord) remove).getValue()).as("wrong event", new Object[0]).isEqualTo(j);
    }

    private void verifyWatermark(Queue<Object> queue, Watermark watermark) {
        Object remove = queue.remove();
        Assertions.assertThat(remove).as("next element is not an event", new Object[0]).isInstanceOf(Watermark.class);
        Assertions.assertThat(remove).as("wrong watermark", new Object[0]).isEqualTo(watermark);
    }

    private void verifyEvent(Queue<Object> queue, AbstractEvent abstractEvent) {
        Assertions.assertThat(queue.remove()).isInstanceOf(abstractEvent.getClass()).isEqualTo(abstractEvent);
    }

    private void verifyCheckpointBarrier(Queue<Object> queue, long j) {
        Object remove = queue.remove();
        Assertions.assertThat(remove).as("next element is not a checkpoint barrier", new Object[0]).isInstanceOf(CheckpointBarrier.class);
        Assertions.assertThat(((CheckpointBarrier) remove).getId()).as("wrong checkpoint id", new Object[0]).isEqualTo(j);
    }
}
