/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.processor.strategy;

import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory;
import org.mule.runtime.core.processor.strategy.AbstractProcessingStrategy;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.WorkQueueProcessor;
import reactor.util.concurrent.QueueSupplier;

public abstract class AbstractRingBufferProcessingStrategyFactory
implements ProcessingStrategyFactory {
    protected static String RING_BUFFER_SCHEDULER_NAME_SUFFIX = ".ring-buffer";
    public static int DEFAULT_BUFFER_SIZE = QueueSupplier.SMALL_BUFFER_SIZE;
    public static int DEFAULT_SUBSCRIBER_COUNT = 1;
    public static String DEFAULT_WAIT_STRATEGY = WaitStrategy.LITE_BLOCKING.name();
    private int bufferSize = DEFAULT_BUFFER_SIZE;
    private int subscriberCount = DEFAULT_SUBSCRIBER_COUNT;
    private String waitStrategy;

    public void setBufferSize(int bufferSize) {
        if (!QueueSupplier.isPowerOfTwo((int)bufferSize)) {
            throw new IllegalArgumentException("bufferSize must be a power of 2 : " + bufferSize);
        }
        this.bufferSize = bufferSize;
    }

    public void setSubscriberCount(int subscriberCount) {
        this.subscriberCount = subscriberCount;
    }

    public void setWaitStrategy(String waitStrategy) {
        this.waitStrategy = waitStrategy;
    }

    protected int getBufferSize() {
        return this.bufferSize;
    }

    protected int getSubscriberCount() {
        return this.subscriberCount;
    }

    protected String getWaitStrategy() {
        return this.waitStrategy;
    }

    protected static enum WaitStrategy {
        BLOCKING(reactor.util.concurrent.WaitStrategy.blocking()),
        LITE_BLOCKING(reactor.util.concurrent.WaitStrategy.liteBlocking()),
        SLEEPING(reactor.util.concurrent.WaitStrategy.sleeping()),
        BUSY_SPIN(reactor.util.concurrent.WaitStrategy.busySpin()),
        YIELDING(reactor.util.concurrent.WaitStrategy.yielding()),
        PARKING(reactor.util.concurrent.WaitStrategy.parking()),
        PHASED(reactor.util.concurrent.WaitStrategy.phasedOffLiteLock((long)200L, (long)100L, (TimeUnit)TimeUnit.MILLISECONDS));

        private reactor.util.concurrent.WaitStrategy reactorWaitStrategy;

        private WaitStrategy(reactor.util.concurrent.WaitStrategy reactorWaitStrategy) {
            this.reactorWaitStrategy = reactorWaitStrategy;
        }

        reactor.util.concurrent.WaitStrategy getReactorWaitStrategy() {
            return this.reactorWaitStrategy;
        }
    }

    protected static class RingBufferProcessingStrategy
    extends AbstractProcessingStrategy {
        private Supplier<Scheduler> ringBufferSchedulerSupplier;
        private int bufferSize;
        private int subscribers;
        private WaitStrategy waitStrategy = WaitStrategy.valueOf(DEFAULT_WAIT_STRATEGY);
        private MuleContext muleContext;

        public RingBufferProcessingStrategy(Supplier<Scheduler> ringBufferSchedulerSupplier, int bufferSize, int subscribers, String waitStrategy, MuleContext muleContext) {
            this.ringBufferSchedulerSupplier = ringBufferSchedulerSupplier;
            this.bufferSize = bufferSize;
            this.subscribers = subscribers;
            if (waitStrategy != null) {
                this.waitStrategy = WaitStrategy.valueOf(waitStrategy);
            }
            this.muleContext = muleContext;
        }

        @Override
        public Sink createSink(FlowConstruct flowConstruct, Function<Publisher<Event>, Publisher<Event>> function) {
            WorkQueueProcessor processor = WorkQueueProcessor.share((ExecutorService)((ExecutorService)this.ringBufferSchedulerSupplier.get()), (int)this.bufferSize, (reactor.util.concurrent.WaitStrategy)this.waitStrategy.getReactorWaitStrategy(), (boolean)false);
            ArrayList<Disposable> disposables = new ArrayList<Disposable>();
            for (int i = 0; i < this.subscribers; ++i) {
                disposables.add(processor.transform(function).retry().subscribe());
            }
            return new AbstractProcessingStrategy.ReactorSink(processor.connectSink(), () -> disposables.forEach(disposable -> disposable.dispose()), this.createOnEventConsumer());
        }

        protected MuleContext getMuleContext() {
            return this.muleContext;
        }
    }
}

