package org.apache.flink.streaming.runtime.tasks.mailbox;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.RunnableWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.class */
class MailboxExecutorImplTest {
    public static final int DEFAULT_PRIORITY = 0;
    private MailboxExecutor mailboxExecutor;
    private ExecutorService otherThreadExecutor;
    private TaskMailboxImpl mailbox;
    private MailboxProcessor mailboxProcessor;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest$TestRunnable.class */
    static class TestRunnable implements RunnableWithException {
        private Thread executedByThread = null;

        TestRunnable() {
        }

        public void run() {
            Preconditions.checkState(!isExecuted(), "Runnable was already executed before by " + this.executedByThread);
            this.executedByThread = Thread.currentThread();
        }

        boolean isExecuted() {
            return this.executedByThread != null;
        }

        Thread wasExecutedBy() {
            return this.executedByThread;
        }
    }

    MailboxExecutorImplTest() {
    }

    @BeforeEach
    void setUp() throws Exception {
        this.mailbox = new TaskMailboxImpl();
        this.mailboxExecutor = new MailboxExecutorImpl(this.mailbox, 0, StreamTaskActionExecutor.IMMEDIATE);
        this.otherThreadExecutor = Executors.newSingleThreadScheduledExecutor();
        this.mailboxProcessor = new MailboxProcessor(controller -> {
        }, this.mailbox, StreamTaskActionExecutor.IMMEDIATE);
    }

    @AfterEach
    void tearDown() {
        this.otherThreadExecutor.shutdown();
        try {
            if (!this.otherThreadExecutor.awaitTermination(60L, TimeUnit.SECONDS)) {
                this.otherThreadExecutor.shutdownNow();
                if (!this.otherThreadExecutor.awaitTermination(60L, TimeUnit.SECONDS)) {
                    throw new IllegalStateException("Thread pool did not terminate on time!");
                }
            }
        } catch (InterruptedException e) {
            this.otherThreadExecutor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    @Test
    void testIsIdle() throws Exception {
        MailboxProcessor mailboxProcessor = new MailboxProcessor((v0) -> {
            v0.suspendDefaultAction();
        });
        MailboxExecutorImpl mailboxExecutor = mailboxProcessor.getMailboxExecutor(0);
        Assertions.assertThat(mailboxExecutor.isIdle()).isFalse();
        mailboxProcessor.runMailboxStep();
        mailboxProcessor.allActionsCompleted();
        do {
        } while (mailboxProcessor.runMailboxStep());
        Assertions.assertThat(mailboxExecutor.isIdle()).isTrue();
        mailboxExecutor.execute(() -> {
        }, "");
        Assertions.assertThat(mailboxExecutor.isIdle()).isFalse();
        mailboxProcessor.mailbox.drain();
        mailboxProcessor.mailbox.quiesce();
        Assertions.assertThat(mailboxExecutor.isIdle()).isFalse();
    }

    @Test
    void testOperations() throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CompletableFuture.runAsync(() -> {
            this.mailboxExecutor.execute(() -> {
                atomicBoolean.set(true);
            }, "");
        }, this.otherThreadExecutor).get();
        this.mailbox.take(0).run();
        Assertions.assertThat(atomicBoolean).isTrue();
    }

    @Test
    void testClose() throws Exception {
        TestRunnable testRunnable = new TestRunnable();
        TestRunnable testRunnable2 = new TestRunnable();
        this.mailboxExecutor.execute(testRunnable, "yieldRun");
        Future future = (Future) CompletableFuture.supplyAsync(() -> {
            return this.mailboxExecutor.submit(testRunnable2, "leftoverRun");
        }, this.otherThreadExecutor).get();
        Assertions.assertThat(this.mailboxExecutor.tryYield()).isTrue();
        Assertions.assertThat(testRunnable.wasExecutedBy()).isEqualTo(Thread.currentThread());
        Assertions.assertThat(future).isNotDone().isNotCancelled();
        this.mailboxProcessor.close();
        Assertions.assertThat(future).isCancelled();
        MailboxExecutor mailboxExecutor = this.mailboxExecutor;
        mailboxExecutor.getClass();
        Assertions.assertThatThrownBy(mailboxExecutor::tryYield).as("yielding should not work after shutdown().", new Object[0]).isInstanceOf(TaskMailbox.MailboxClosedException.class);
        MailboxExecutor mailboxExecutor2 = this.mailboxExecutor;
        mailboxExecutor2.getClass();
        Assertions.assertThatThrownBy(mailboxExecutor2::yield).as("yielding should not work after shutdown().", new Object[0]).isInstanceOf(TaskMailbox.MailboxClosedException.class);
    }

    @Test
    void testTryYield() throws Exception {
        TestRunnable testRunnable = new TestRunnable();
        CompletableFuture.runAsync(() -> {
            this.mailboxExecutor.execute(testRunnable, "testRunnable");
        }, this.otherThreadExecutor).get();
        Assertions.assertThat(this.mailboxExecutor.tryYield()).isTrue();
        Assertions.assertThat(this.mailboxExecutor.tryYield()).isFalse();
        Assertions.assertThat(testRunnable.wasExecutedBy()).isEqualTo(Thread.currentThread());
    }

    @Test
    void testYield() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        TestRunnable testRunnable = new TestRunnable();
        Thread thread = new Thread(() -> {
            try {
                this.mailboxExecutor.execute(testRunnable, "testRunnable");
            } catch (Exception e) {
                atomicReference.set(e);
            }
        });
        thread.start();
        this.mailboxExecutor.yield();
        thread.join();
        Assertions.assertThat((Throwable) atomicReference.get()).isNull();
        Assertions.assertThat(testRunnable.wasExecutedBy()).isEqualTo(Thread.currentThread());
    }
}
