/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks.mailbox;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.FutureAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class MailboxExecutorImplTest {
    public static final int DEFAULT_PRIORITY = 0;
    private MailboxExecutor mailboxExecutor;
    private ExecutorService otherThreadExecutor;
    private TaskMailboxImpl mailbox;
    private MailboxProcessor mailboxProcessor;

    MailboxExecutorImplTest() {
    }

    @BeforeEach
    void setUp() throws Exception {
        this.mailbox = new TaskMailboxImpl();
        this.mailboxExecutor = new MailboxExecutorImpl((TaskMailbox)this.mailbox, 0, StreamTaskActionExecutor.IMMEDIATE);
        this.otherThreadExecutor = Executors.newSingleThreadScheduledExecutor();
        this.mailboxProcessor = new MailboxProcessor(c -> {}, (TaskMailbox)this.mailbox, StreamTaskActionExecutor.IMMEDIATE);
    }

    @AfterEach
    void tearDown() {
        this.otherThreadExecutor.shutdown();
        try {
            if (!this.otherThreadExecutor.awaitTermination(60L, TimeUnit.SECONDS)) {
                this.otherThreadExecutor.shutdownNow();
                if (!this.otherThreadExecutor.awaitTermination(60L, TimeUnit.SECONDS)) {
                    throw new IllegalStateException("Thread pool did not terminate on time!");
                }
            }
        }
        catch (InterruptedException ie) {
            this.otherThreadExecutor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    @Test
    void testIsIdle() throws Exception {
        MailboxProcessor processor = new MailboxProcessor(MailboxDefaultAction.Controller::suspendDefaultAction);
        MailboxExecutorImpl executor = (MailboxExecutorImpl)processor.getMailboxExecutor(0);
        Assertions.assertThat((boolean)executor.isIdle()).isFalse();
        processor.runMailboxStep();
        processor.allActionsCompleted();
        while (processor.runMailboxStep()) {
        }
        Assertions.assertThat((boolean)executor.isIdle()).isTrue();
        executor.execute(() -> {}, "");
        Assertions.assertThat((boolean)executor.isIdle()).isFalse();
        processor.mailbox.drain();
        processor.mailbox.quiesce();
        Assertions.assertThat((boolean)executor.isIdle()).isFalse();
    }

    @Test
    void testOperations() throws Exception {
        AtomicBoolean wasExecuted = new AtomicBoolean(false);
        CompletableFuture.runAsync(() -> this.mailboxExecutor.execute(() -> wasExecuted.set(true), ""), this.otherThreadExecutor).get();
        this.mailbox.take(0).run();
        Assertions.assertThat((AtomicBoolean)wasExecuted).isTrue();
    }

    @Test
    void testClose() throws Exception {
        TestRunnable yieldRun = new TestRunnable();
        TestRunnable leftoverRun = new TestRunnable();
        this.mailboxExecutor.execute((ThrowingRunnable)yieldRun, "yieldRun");
        Future leftoverFuture = CompletableFuture.supplyAsync(() -> this.mailboxExecutor.submit((RunnableWithException)leftoverRun, "leftoverRun"), this.otherThreadExecutor).get();
        Assertions.assertThat((boolean)this.mailboxExecutor.tryYield()).isTrue();
        Assertions.assertThat((Object)yieldRun.wasExecutedBy()).isEqualTo((Object)Thread.currentThread());
        ((FutureAssert)Assertions.assertThat((Future)leftoverFuture).isNotDone()).isNotCancelled();
        this.mailboxProcessor.close();
        Assertions.assertThat((Future)leftoverFuture).isCancelled();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((MailboxExecutor)this.mailboxExecutor).tryYield()).as("yielding should not work after shutdown().", new Object[0])).isInstanceOf(TaskMailbox.MailboxClosedException.class);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((MailboxExecutor)this.mailboxExecutor).yield()).as("yielding should not work after shutdown().", new Object[0])).isInstanceOf(TaskMailbox.MailboxClosedException.class);
    }

    @Test
    void testTryYield() throws Exception {
        TestRunnable testRunnable = new TestRunnable();
        CompletableFuture.runAsync(() -> this.mailboxExecutor.execute((ThrowingRunnable)testRunnable, "testRunnable"), this.otherThreadExecutor).get();
        Assertions.assertThat((boolean)this.mailboxExecutor.tryYield()).isTrue();
        Assertions.assertThat((boolean)this.mailboxExecutor.tryYield()).isFalse();
        Assertions.assertThat((Object)testRunnable.wasExecutedBy()).isEqualTo((Object)Thread.currentThread());
    }

    @Test
    void testYield() throws Exception {
        AtomicReference exceptionReference = new AtomicReference();
        TestRunnable testRunnable = new TestRunnable();
        Thread submitThread = new Thread(() -> {
            try {
                this.mailboxExecutor.execute((ThrowingRunnable)testRunnable, "testRunnable");
            }
            catch (Exception e) {
                exceptionReference.set(e);
            }
        });
        submitThread.start();
        this.mailboxExecutor.yield();
        submitThread.join();
        Assertions.assertThat((Throwable)((Throwable)exceptionReference.get())).isNull();
        Assertions.assertThat((Object)testRunnable.wasExecutedBy()).isEqualTo((Object)Thread.currentThread());
    }

    static class TestRunnable
    implements RunnableWithException {
        private Thread executedByThread = null;

        TestRunnable() {
        }

        public void run() {
            Preconditions.checkState((!this.isExecuted() ? 1 : 0) != 0, (Object)("Runnable was already executed before by " + this.executedByThread));
            this.executedByThread = Thread.currentThread();
        }

        boolean isExecuted() {
            return this.executedByThread != null;
        }

        Thread wasExecutedBy() {
            return this.executedByThread;
        }
    }
}

