package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskExecutionDecorationTest.class */
class StreamTaskExecutionDecorationTest {
    private CountingStreamTaskActionExecutor decorator;
    private StreamTask<Object, StreamOperator<Object>> task;
    private TaskMailboxImpl mailbox;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskExecutionDecorationTest$CountingStreamTaskActionExecutor.class */
    static class CountingStreamTaskActionExecutor extends StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor {
        private final AtomicInteger calls;

        CountingStreamTaskActionExecutor() {
            super(new Object());
            this.calls = new AtomicInteger(0);
        }

        int getCallCount() {
            return this.calls.get();
        }

        boolean wasCalled() {
            return getCallCount() > 0;
        }

        public void run(RunnableWithException runnableWithException) throws Exception {
            this.calls.incrementAndGet();
            runnableWithException.run();
        }

        public <E extends Throwable> void runThrowing(ThrowingRunnable<E> throwingRunnable) throws Throwable {
            this.calls.incrementAndGet();
            throwingRunnable.run();
        }

        public <R> R call(Callable<R> callable) throws Exception {
            this.calls.incrementAndGet();
            return callable.call();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskExecutionDecorationTest$DeclineDummyEnvironment.class */
    private static final class DeclineDummyEnvironment extends DummyEnvironment {
        DeclineDummyEnvironment() {
            super("test", 1, 0);
        }

        public void declineCheckpoint(long j, CheckpointException checkpointException) {
        }
    }

    StreamTaskExecutionDecorationTest() {
    }

    @Test
    void testAbortCheckpointOnBarrierIsDecorated() throws Exception {
        this.task.abortCheckpointOnBarrier(1L, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
        ((AbstractBooleanAssert) Assertions.assertThat(this.decorator.wasCalled()).as("execution decorator was not called", new Object[0])).isTrue();
    }

    @Test
    void testTriggerCheckpointOnBarrierIsDecorated() throws Exception {
        this.task.triggerCheckpointOnBarrier(new CheckpointMetaData(1L, 2L), new CheckpointOptions(CheckpointType.CHECKPOINT, new CheckpointStorageLocationReference(new byte[]{1})), (CheckpointMetricsBuilder) null);
        ((AbstractBooleanAssert) Assertions.assertThat(this.decorator.wasCalled()).as("execution decorator was not called", new Object[0])).isTrue();
    }

    @Test
    void testTriggerCheckpointAsyncIsDecorated() {
        this.task.triggerCheckpointAsync(new CheckpointMetaData(1L, 2L), new CheckpointOptions(CheckpointType.CHECKPOINT, new CheckpointStorageLocationReference(new byte[]{1})));
        ((AbstractBooleanAssert) Assertions.assertThat(this.mailbox.hasMail()).as("mailbox is empty", new Object[0])).isTrue();
        ((AbstractBooleanAssert) Assertions.assertThat(this.decorator.wasCalled()).as("execution decorator was called preliminary", new Object[0])).isFalse();
        this.mailbox.drain().forEach(mail -> {
            try {
                mail.run();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        ((AbstractBooleanAssert) Assertions.assertThat(this.decorator.wasCalled()).as("execution decorator was not called", new Object[0])).isTrue();
    }

    @Test
    void testMailboxExecutorIsDecorated() throws Exception {
        this.task.mailboxProcessor.getMainMailboxExecutor().execute(() -> {
            this.task.mailboxProcessor.allActionsCompleted();
        }, "");
        this.task.mailboxProcessor.runMailboxLoop();
        ((AbstractBooleanAssert) Assertions.assertThat(this.decorator.wasCalled()).as("execution decorator was not called", new Object[0])).isTrue();
    }

    @BeforeEach
    void before() throws Exception {
        this.mailbox = new TaskMailboxImpl();
        this.decorator = new CountingStreamTaskActionExecutor();
        this.task = new StreamTask<Object, StreamOperator<Object>>(new DeclineDummyEnvironment(), null, FatalExitExceptionHandler.INSTANCE, this.decorator, this.mailbox) { // from class: org.apache.flink.streaming.runtime.tasks.StreamTaskExecutionDecorationTest.1
            protected void init() {
            }

            protected void processInput(MailboxDefaultAction.Controller controller) {
            }
        };
        this.task.operatorChain = new RegularOperatorChain(this.task, new NonRecordWriter());
    }

    @AfterEach
    void after() {
        this.decorator = null;
        this.task = null;
    }
}
