/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling.async;

import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Consumer;
import org.axonframework.eventhandling.EventMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventProcessorTask
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(EventProcessorTask.class);
    private final ShutdownCallback shutDownCallback;
    private final Executor executor;
    private final Deque<ProcessingTask> taskQueue;
    private boolean isScheduled = false;
    private volatile boolean cleanedUp;
    private final Object runnerMonitor = new Object();

    public EventProcessorTask(Executor executor, ShutdownCallback shutDownCallback) {
        this.taskQueue = new LinkedList<ProcessingTask>();
        this.shutDownCallback = shutDownCallback;
        this.executor = executor;
    }

    public synchronized boolean scheduleEvents(List<? extends EventMessage<?>> events, Consumer<List<? extends EventMessage<?>>> processor) {
        if (this.cleanedUp) {
            return false;
        }
        this.taskQueue.add(new ProcessingTask(events, processor));
        if (!this.isScheduled) {
            this.isScheduled = true;
            this.executor.execute(this);
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        Object object = this.runnerMonitor;
        synchronized (object) {
            boolean mayContinue = true;
            int itemsAtStart = this.taskQueue.size();
            int processedItems = 0;
            while (mayContinue) {
                this.processNextTask();
                mayContinue = ++processedItems < itemsAtStart && !this.taskQueue.isEmpty() || !this.yieldProcessing();
            }
        }
    }

    private void processNextTask() {
        ProcessingTask task = this.nextTask();
        task.processor.accept(task.events);
    }

    private synchronized ProcessingTask nextTask() {
        return this.taskQueue.poll();
    }

    private synchronized boolean yieldProcessing() {
        if (this.taskQueue.isEmpty()) {
            this.cleanUp();
        } else {
            try {
                this.executor.execute(this);
                if (logger.isDebugEnabled()) {
                    logger.debug("Processing of event listener yielded.");
                }
            }
            catch (RejectedExecutionException e) {
                logger.info("Processing of event listener could not yield. Executor refused the task.");
                return false;
            }
        }
        return true;
    }

    private synchronized void cleanUp() {
        this.isScheduled = false;
        this.cleanedUp = true;
        this.shutDownCallback.afterShutdown(this);
    }

    @FunctionalInterface
    public static interface ShutdownCallback {
        public void afterShutdown(EventProcessorTask var1);
    }

    private static class ProcessingTask {
        private final List<? extends EventMessage<?>> events;
        private final Consumer<List<? extends EventMessage<?>>> processor;

        public ProcessingTask(List<? extends EventMessage<?>> events, Consumer<List<? extends EventMessage<?>>> processor) {
            this.events = events;
            this.processor = processor;
        }
    }
}

