/*
 * Decompiled with CFR 0.152.
 */
package reactor.core.dispatch;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.dispatch.AbstractLifecycleDispatcher;
import reactor.core.dispatch.MultiThreadDispatcher;
import reactor.core.dispatch.wait.WaitingMood;
import reactor.core.support.NamedDaemonThreadFactory;
import reactor.fn.Consumer;
import reactor.jarjar.com.lmax.disruptor.BlockingWaitStrategy;
import reactor.jarjar.com.lmax.disruptor.EventFactory;
import reactor.jarjar.com.lmax.disruptor.ExceptionHandler;
import reactor.jarjar.com.lmax.disruptor.InsufficientCapacityException;
import reactor.jarjar.com.lmax.disruptor.RingBuffer;
import reactor.jarjar.com.lmax.disruptor.WaitStrategy;
import reactor.jarjar.com.lmax.disruptor.WorkHandler;
import reactor.jarjar.com.lmax.disruptor.dsl.Disruptor;
import reactor.jarjar.com.lmax.disruptor.dsl.ProducerType;

public final class WorkQueueDispatcher
extends MultiThreadDispatcher
implements WaitingMood {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final ExecutorService executor;
    private final Disruptor<WorkQueueTask> disruptor;
    private final WaitingMood waitingMood;
    private final RingBuffer<WorkQueueTask> ringBuffer;

    public WorkQueueDispatcher(String name, int poolSize, int backlog, Consumer<Throwable> uncaughtExceptionHandler) {
        this(name, poolSize, backlog, uncaughtExceptionHandler, ProducerType.MULTI, new BlockingWaitStrategy());
    }

    public WorkQueueDispatcher(String name, int poolSize, int backlog, final Consumer<Throwable> uncaughtExceptionHandler, ProducerType producerType, WaitStrategy waitStrategy) {
        super(poolSize, backlog);
        this.waitingMood = WaitingMood.class.isAssignableFrom(waitStrategy.getClass()) ? (WaitingMood)((Object)waitStrategy) : null;
        this.executor = Executors.newFixedThreadPool(poolSize, new NamedDaemonThreadFactory(name, this.getContext()));
        this.disruptor = new Disruptor<WorkQueueTask>(new EventFactory<WorkQueueTask>(){

            @Override
            public WorkQueueTask newInstance() {
                return new WorkQueueTask();
            }
        }, backlog, this.executor, producerType, waitStrategy);
        this.disruptor.handleExceptionsWith(new ExceptionHandler(){

            public void handleEventException(Throwable ex, long sequence, Object event) {
                this.handleOnStartException(ex);
            }

            @Override
            public void handleOnStartException(Throwable ex) {
                if (null != uncaughtExceptionHandler) {
                    uncaughtExceptionHandler.accept(ex);
                } else {
                    WorkQueueDispatcher.this.log.error(ex.getMessage(), ex);
                }
            }

            @Override
            public void handleOnShutdownException(Throwable ex) {
                this.handleOnStartException(ex);
            }
        });
        WorkHandler[] workHandlers = new WorkHandler[poolSize];
        for (int i = 0; i < poolSize; ++i) {
            workHandlers[i] = new WorkHandler<WorkQueueTask>(){

                @Override
                public void onEvent(WorkQueueTask task) throws Exception {
                    task.run();
                }
            };
        }
        this.disruptor.handleEventsWithWorkerPool(workHandlers);
        this.ringBuffer = this.disruptor.start();
    }

    @Override
    public boolean awaitAndShutdown(long timeout, TimeUnit timeUnit) {
        try {
            this.executor.shutdown();
            this.disruptor.shutdown(timeout, timeUnit);
            super.shutdown();
            this.executor.awaitTermination(timeout, timeUnit);
        }
        catch (Exception e) {
            return false;
        }
        return true;
    }

    @Override
    public void shutdown() {
        this.executor.shutdown();
        this.disruptor.shutdown();
        super.shutdown();
    }

    @Override
    public void forceShutdown() {
        this.executor.shutdownNow();
        this.disruptor.halt();
        super.forceShutdown();
    }

    @Override
    public void nervous() {
        if (this.waitingMood != null) {
            this.waitingMood.nervous();
        }
    }

    @Override
    public void calm() {
        if (this.waitingMood != null) {
            this.waitingMood.calm();
        }
    }

    @Override
    public long remainingSlots() {
        return this.ringBuffer.remainingCapacity();
    }

    @Override
    protected AbstractLifecycleDispatcher.Task allocateTask() {
        long seqId = this.ringBuffer.next();
        return this.ringBuffer.get(seqId).setSequenceId(seqId);
    }

    @Override
    protected AbstractLifecycleDispatcher.Task tryAllocateTask() throws reactor.core.processor.InsufficientCapacityException {
        try {
            long seqId = this.ringBuffer.tryNext();
            return this.ringBuffer.get(seqId).setSequenceId(seqId);
        }
        catch (InsufficientCapacityException e) {
            throw reactor.core.processor.InsufficientCapacityException.get();
        }
    }

    @Override
    protected void execute(AbstractLifecycleDispatcher.Task task) {
        this.ringBuffer.publish(((WorkQueueTask)task).getSequenceId());
    }

    private class WorkQueueTask
    extends MultiThreadDispatcher.MultiThreadTask {
        private long sequenceId;

        private WorkQueueTask() {
            super(WorkQueueDispatcher.this);
        }

        public long getSequenceId() {
            return this.sequenceId;
        }

        public WorkQueueTask setSequenceId(long sequenceId) {
            this.sequenceId = sequenceId;
            return this;
        }
    }
}

