/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks.mailbox;

import java.io.Closeable;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxMetricsController;
import org.apache.flink.streaming.runtime.tasks.mailbox.PeriodTimer;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.WrappingRuntimeException;
import org.apache.flink.util.function.RunnableWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class MailboxProcessor
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(MailboxProcessor.class);
    protected final TaskMailbox mailbox;
    protected final MailboxDefaultAction mailboxDefaultAction;
    private boolean mailboxLoopRunning;
    private boolean suspended;
    private DefaultActionSuspension suspendedDefaultAction;
    private final StreamTaskActionExecutor actionExecutor;
    private final MailboxMetricsController mailboxMetricsControl;

    @VisibleForTesting
    public MailboxProcessor() {
        this(MailboxDefaultAction.Controller::suspendDefaultAction);
    }

    public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) {
        this(mailboxDefaultAction, StreamTaskActionExecutor.IMMEDIATE);
    }

    public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction, StreamTaskActionExecutor actionExecutor) {
        this(mailboxDefaultAction, new TaskMailboxImpl(Thread.currentThread()), actionExecutor);
    }

    public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction, TaskMailbox mailbox, StreamTaskActionExecutor actionExecutor) {
        this(mailboxDefaultAction, mailbox, actionExecutor, new MailboxMetricsController(new DescriptiveStatisticsHistogram(10), (Counter)new SimpleCounter()));
    }

    public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction, TaskMailbox mailbox, StreamTaskActionExecutor actionExecutor, MailboxMetricsController mailboxMetricsControl) {
        this.mailboxDefaultAction = Preconditions.checkNotNull(mailboxDefaultAction);
        this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
        this.mailbox = Preconditions.checkNotNull(mailbox);
        this.mailboxLoopRunning = true;
        this.suspendedDefaultAction = null;
        this.mailboxMetricsControl = mailboxMetricsControl;
    }

    public MailboxExecutor getMainMailboxExecutor() {
        return new MailboxExecutorImpl(this.mailbox, -1, this.actionExecutor);
    }

    public MailboxExecutor getMailboxExecutor(int priority) {
        return new MailboxExecutorImpl(this.mailbox, priority, this.actionExecutor, this);
    }

    @VisibleForTesting
    public MailboxMetricsController getMailboxMetricsControl() {
        return this.mailboxMetricsControl;
    }

    public void prepareClose() {
        this.mailbox.quiesce();
    }

    @Override
    public void close() {
        List<Mail> droppedMails = this.mailbox.close();
        if (!droppedMails.isEmpty()) {
            LOG.debug("Closing the mailbox dropped mails {}.", droppedMails);
            Optional<RuntimeException> maybeErr = Optional.empty();
            for (Mail droppedMail : droppedMails) {
                try {
                    droppedMail.tryCancel(false);
                }
                catch (RuntimeException x) {
                    maybeErr = Optional.of(ExceptionUtils.firstOrSuppressed(x, maybeErr.orElse(null)));
                }
            }
            maybeErr.ifPresent(e -> {
                throw e;
            });
        }
    }

    public void drain() throws Exception {
        for (Mail mail : this.mailbox.drain()) {
            this.runMail(mail);
        }
    }

    public void runMailboxLoop() throws Exception {
        this.suspended = !this.mailboxLoopRunning;
        TaskMailbox localMailbox = this.mailbox;
        Preconditions.checkState(localMailbox.isMailboxThread(), "Method must be executed by declared mailbox thread!");
        assert (localMailbox.getState() == TaskMailbox.State.OPEN) : "Mailbox must be opened!";
        MailboxController mailboxController = new MailboxController(this);
        while (this.isNextLoopPossible()) {
            this.processMail(localMailbox, false);
            if (!this.isNextLoopPossible()) continue;
            this.mailboxDefaultAction.runDefaultAction(mailboxController);
        }
    }

    public void suspend() {
        this.sendPoisonMail(() -> {
            this.suspended = true;
        });
    }

    @VisibleForTesting
    public boolean runSingleMailboxLoop() throws Exception {
        this.suspended = !this.mailboxLoopRunning;
        boolean processed = this.processMail(this.mailbox, true);
        if (this.isDefaultActionAvailable() && this.isNextLoopPossible()) {
            this.mailboxDefaultAction.runDefaultAction(new MailboxController(this));
            processed = true;
        }
        return processed;
    }

    @VisibleForTesting
    public boolean runMailboxStep() throws Exception {
        boolean bl = this.suspended = !this.mailboxLoopRunning;
        if (this.processMail(this.mailbox, true)) {
            return true;
        }
        if (this.isDefaultActionAvailable() && this.isNextLoopPossible()) {
            this.mailboxDefaultAction.runDefaultAction(new MailboxController(this));
            return true;
        }
        return false;
    }

    public boolean isMailboxThread() {
        return this.mailbox.isMailboxThread();
    }

    public void reportThrowable(Throwable throwable) {
        this.sendControlMail(() -> {
            if (throwable instanceof Exception) {
                throw (Exception)throwable;
            }
            if (throwable instanceof Error) {
                throw (Error)throwable;
            }
            throw WrappingRuntimeException.wrapIfNecessary(throwable);
        }, "Report throwable %s", throwable);
    }

    public void allActionsCompleted() {
        this.sendPoisonMail(() -> {
            this.mailboxLoopRunning = false;
            this.suspended = true;
        });
    }

    private void sendPoisonMail(RunnableWithException mail) {
        this.mailbox.runExclusively(() -> {
            if (this.mailbox.getState() == TaskMailbox.State.OPEN) {
                this.sendControlMail(mail, "poison mail", new Object[0]);
            }
        });
    }

    private void sendControlMail(RunnableWithException mail, String descriptionFormat, Object ... descriptionArgs) {
        this.mailbox.putFirst(new Mail(mail, Integer.MAX_VALUE, descriptionFormat, descriptionArgs));
    }

    private boolean processMail(TaskMailbox mailbox, boolean singleStep) throws Exception {
        boolean processed;
        boolean isBatchAvailable = mailbox.createBatch();
        boolean bl = processed = isBatchAvailable && this.processMailsNonBlocking(singleStep);
        if (singleStep) {
            return processed;
        }
        return processed |= this.processMailsWhenDefaultActionUnavailable();
    }

    private boolean processMailsWhenDefaultActionUnavailable() throws Exception {
        boolean processedSomething = false;
        while (!this.isDefaultActionAvailable() && this.isNextLoopPossible()) {
            Optional<Mail> maybeMail = this.mailbox.tryTake(-1);
            if (!maybeMail.isPresent()) {
                maybeMail = Optional.of(this.mailbox.take(-1));
            }
            this.maybePauseIdleTimer();
            this.runMail(maybeMail.get());
            this.maybeRestartIdleTimer();
            processedSomething = true;
        }
        return processedSomething;
    }

    private boolean processMailsNonBlocking(boolean singleStep) throws Exception {
        Optional<Mail> maybeMail;
        long processedMails = 0L;
        while (this.isNextLoopPossible() && (maybeMail = this.mailbox.tryTakeFromBatch()).isPresent()) {
            if (processedMails++ == 0L) {
                this.maybePauseIdleTimer();
            }
            this.runMail(maybeMail.get());
            if (!singleStep) continue;
        }
        if (processedMails > 0L) {
            this.maybeRestartIdleTimer();
            return true;
        }
        return false;
    }

    private void runMail(Mail mail) throws Exception {
        this.mailboxMetricsControl.getMailCounter().inc();
        mail.run();
        if (!this.suspended && !this.mailboxMetricsControl.isLatencyMeasurementStarted() && this.mailboxMetricsControl.isLatencyMeasurementSetup()) {
            this.mailboxMetricsControl.startLatencyMeasurement();
        }
    }

    private void maybePauseIdleTimer() {
        if (this.suspendedDefaultAction != null && this.suspendedDefaultAction.suspensionTimer != null) {
            this.suspendedDefaultAction.suspensionTimer.markEnd();
        }
    }

    private void maybeRestartIdleTimer() {
        if (this.suspendedDefaultAction != null && this.suspendedDefaultAction.suspensionTimer != null) {
            this.suspendedDefaultAction.suspensionTimer.markStart();
        }
    }

    private MailboxDefaultAction.Suspension suspendDefaultAction(@Nullable PeriodTimer suspensionTimer) {
        Preconditions.checkState(this.mailbox.isMailboxThread(), "Suspending must only be called from the mailbox thread!");
        Preconditions.checkState(this.suspendedDefaultAction == null, "Default action has already been suspended");
        if (this.suspendedDefaultAction == null) {
            this.suspendedDefaultAction = new DefaultActionSuspension(suspensionTimer);
        }
        return this.suspendedDefaultAction;
    }

    @VisibleForTesting
    public boolean isDefaultActionAvailable() {
        return this.suspendedDefaultAction == null;
    }

    private boolean isNextLoopPossible() {
        return !this.suspended;
    }

    @VisibleForTesting
    public boolean isMailboxLoopRunning() {
        return this.mailboxLoopRunning;
    }

    public boolean hasMail() {
        return this.mailbox.hasMail();
    }

    private final class DefaultActionSuspension
    implements MailboxDefaultAction.Suspension {
        @Nullable
        private final PeriodTimer suspensionTimer;

        public DefaultActionSuspension(PeriodTimer suspensionTimer) {
            this.suspensionTimer = suspensionTimer;
        }

        @Override
        public void resume() {
            if (MailboxProcessor.this.mailbox.isMailboxThread()) {
                this.resumeInternal();
            } else {
                try {
                    MailboxProcessor.this.sendControlMail(this::resumeInternal, "resume default action", new Object[0]);
                }
                catch (TaskMailbox.MailboxClosedException mailboxClosedException) {
                    // empty catch block
                }
            }
        }

        private void resumeInternal() {
            if (MailboxProcessor.this.suspendedDefaultAction == this) {
                MailboxProcessor.this.suspendedDefaultAction = null;
            }
        }
    }

    protected static final class MailboxController
    implements MailboxDefaultAction.Controller {
        private final MailboxProcessor mailboxProcessor;

        protected MailboxController(MailboxProcessor mailboxProcessor) {
            this.mailboxProcessor = mailboxProcessor;
        }

        @Override
        public void allActionsCompleted() {
            this.mailboxProcessor.allActionsCompleted();
        }

        @Override
        public MailboxDefaultAction.Suspension suspendDefaultAction(PeriodTimer suspensionPeriodTimer) {
            return this.mailboxProcessor.suspendDefaultAction(suspensionPeriodTimer);
        }

        @Override
        public MailboxDefaultAction.Suspension suspendDefaultAction() {
            return this.mailboxProcessor.suspendDefaultAction(null);
        }
    }
}

