/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks.mailbox;

import java.io.Closeable;
import java.util.List;
import java.util.Optional;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.WrappingRuntimeException;
import org.apache.flink.util.function.RunnableWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class MailboxProcessor
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(MailboxProcessor.class);
    private final TaskMailbox mailbox;
    private final MailboxDefaultAction mailboxDefaultAction;
    private final MailboxExecutor mainMailboxExecutor;
    private boolean mailboxLoopRunning;
    private MailboxDefaultAction.Suspension suspendedDefaultAction;
    private final StreamTaskActionExecutor actionExecutor;

    public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) {
        this(mailboxDefaultAction, StreamTaskActionExecutor.IMMEDIATE);
    }

    public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction, StreamTaskActionExecutor actionExecutor) {
        this(mailboxDefaultAction, new TaskMailboxImpl(Thread.currentThread()), actionExecutor);
    }

    public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction, TaskMailbox mailbox, StreamTaskActionExecutor actionExecutor) {
        this(mailboxDefaultAction, actionExecutor, mailbox, new MailboxExecutorImpl(mailbox, -1, actionExecutor));
    }

    public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction, StreamTaskActionExecutor actionExecutor, TaskMailbox mailbox, MailboxExecutor mainMailboxExecutor) {
        this.mailboxDefaultAction = (MailboxDefaultAction)Preconditions.checkNotNull((Object)mailboxDefaultAction);
        this.actionExecutor = (StreamTaskActionExecutor)Preconditions.checkNotNull((Object)actionExecutor);
        this.mailbox = (TaskMailbox)Preconditions.checkNotNull((Object)mailbox);
        this.mainMailboxExecutor = (MailboxExecutor)Preconditions.checkNotNull((Object)mainMailboxExecutor);
        this.mailboxLoopRunning = true;
        this.suspendedDefaultAction = null;
    }

    public MailboxExecutor getMainMailboxExecutor() {
        return this.mainMailboxExecutor;
    }

    public MailboxExecutor getMailboxExecutor(int priority) {
        return new MailboxExecutorImpl(this.mailbox, priority, this.actionExecutor);
    }

    public void prepareClose() {
        this.mailbox.quiesce();
    }

    @Override
    public void close() {
        List<Mail> droppedMails = this.mailbox.close();
        if (!droppedMails.isEmpty()) {
            LOG.debug("Closing the mailbox dropped mails {}.", droppedMails);
            Optional<Throwable> maybeErr = Optional.empty();
            for (Mail droppedMail : droppedMails) {
                try {
                    droppedMail.tryCancel(false);
                }
                catch (RuntimeException x) {
                    maybeErr = Optional.of(ExceptionUtils.firstOrSuppressed((Throwable)x, (Throwable)maybeErr.orElse(null)));
                }
            }
            maybeErr.ifPresent(e -> {
                throw e;
            });
        }
    }

    public void drain() throws Exception {
        for (Mail mail : this.mailbox.drain()) {
            mail.run();
        }
    }

    public void runMailboxLoop() throws Exception {
        TaskMailbox localMailbox = this.mailbox;
        Preconditions.checkState((boolean)localMailbox.isMailboxThread(), (Object)"Method must be executed by declared mailbox thread!");
        assert (localMailbox.getState() == TaskMailbox.State.OPEN) : "Mailbox must be opened!";
        MailboxController defaultActionContext = new MailboxController(this);
        while (this.processMail(localMailbox)) {
            this.mailboxDefaultAction.runDefaultAction(defaultActionContext);
        }
    }

    public void reportThrowable(Throwable throwable) {
        this.sendControlMail(() -> {
            if (throwable instanceof Exception) {
                throw (Exception)throwable;
            }
            if (throwable instanceof Error) {
                throw (Error)throwable;
            }
            throw WrappingRuntimeException.wrapIfNecessary((Throwable)throwable);
        }, "Report throwable %s", throwable);
    }

    public void allActionsCompleted() {
        this.mailbox.runExclusively(() -> {
            if (this.mailbox.getState() == TaskMailbox.State.OPEN) {
                this.sendControlMail(() -> {
                    this.mailboxLoopRunning = false;
                }, "poison mail", new Object[0]);
            }
        });
    }

    private void sendControlMail(RunnableWithException mail, String descriptionFormat, Object ... descriptionArgs) {
        this.mailbox.putFirst(new Mail(mail, Integer.MAX_VALUE, descriptionFormat, descriptionArgs));
    }

    private boolean processMail(TaskMailbox mailbox) throws Exception {
        Optional<Mail> maybeMail;
        if (!mailbox.createBatch()) {
            return true;
        }
        while (this.isMailboxLoopRunning() && (maybeMail = mailbox.tryTakeFromBatch()).isPresent()) {
            maybeMail.get().run();
        }
        while (this.isDefaultActionUnavailable() && this.isMailboxLoopRunning()) {
            mailbox.take(-1).run();
        }
        return this.isMailboxLoopRunning();
    }

    private MailboxDefaultAction.Suspension suspendDefaultAction() {
        Preconditions.checkState((boolean)this.mailbox.isMailboxThread(), (Object)"Suspending must only be called from the mailbox thread!");
        if (this.suspendedDefaultAction == null) {
            this.suspendedDefaultAction = new DefaultActionSuspension();
            this.ensureControlFlowSignalCheck();
        }
        return this.suspendedDefaultAction;
    }

    @VisibleForTesting
    public boolean isDefaultActionUnavailable() {
        return this.suspendedDefaultAction != null;
    }

    private boolean isMailboxLoopRunning() {
        return this.mailboxLoopRunning;
    }

    private void ensureControlFlowSignalCheck() {
        if (!this.mailbox.hasMail()) {
            this.sendControlMail(() -> {}, "signal check", new Object[0]);
        }
    }

    private final class DefaultActionSuspension
    implements MailboxDefaultAction.Suspension {
        private DefaultActionSuspension() {
        }

        @Override
        public void resume() {
            if (MailboxProcessor.this.mailbox.isMailboxThread()) {
                this.resumeInternal();
            } else {
                MailboxProcessor.this.sendControlMail(this::resumeInternal, "resume default action", new Object[0]);
            }
        }

        private void resumeInternal() {
            if (MailboxProcessor.this.suspendedDefaultAction == this) {
                MailboxProcessor.this.suspendedDefaultAction = null;
            }
        }
    }

    private static final class MailboxController
    implements MailboxDefaultAction.Controller {
        private final MailboxProcessor mailboxProcessor;

        private MailboxController(MailboxProcessor mailboxProcessor) {
            this.mailboxProcessor = mailboxProcessor;
        }

        @Override
        public void allActionsCompleted() {
            this.mailboxProcessor.allActionsCompleted();
        }

        @Override
        public MailboxDefaultAction.Suspension suspendDefaultAction() {
            return this.mailboxProcessor.suspendDefaultAction();
        }
    }
}

