/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.processor.strategy;

import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory;
import org.mule.runtime.core.api.scheduler.Scheduler;
import org.mule.runtime.core.processor.strategy.MultiReactorProcessingStrategyFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

public class ProactorProcessingStrategyFactory
implements ProcessingStrategyFactory {
    @Override
    public ProcessingStrategy create(MuleContext muleContext) {
        return new ProactorProcessingStrategy(() -> muleContext.getSchedulerService().cpuLightScheduler(), () -> muleContext.getSchedulerService().ioScheduler(), () -> muleContext.getSchedulerService().cpuIntensiveScheduler(), scheduler -> scheduler.stop(muleContext.getConfiguration().getShutdownTimeout(), TimeUnit.MILLISECONDS), muleContext);
    }

    static class ProactorProcessingStrategy
    extends MultiReactorProcessingStrategyFactory.MultiReactorProcessingStrategy {
        private Supplier<Scheduler> blockingSchedulerSupplier;
        private Supplier<Scheduler> cpuIntensiveSchedulerSupplier;
        private Scheduler blockingScheduler;
        private Scheduler cpuIntensiveScheduler;

        public ProactorProcessingStrategy(Supplier<Scheduler> cpuLightSchedulerSupplier, Supplier<Scheduler> blockingSchedulerSupplier, Supplier<Scheduler> cpuIntensiveSchedulerSupplier, Consumer<Scheduler> schedulerStopper, MuleContext muleContext) {
            super(cpuLightSchedulerSupplier, schedulerStopper, muleContext);
            this.blockingSchedulerSupplier = blockingSchedulerSupplier;
            this.cpuIntensiveSchedulerSupplier = cpuIntensiveSchedulerSupplier;
        }

        @Override
        public void start() throws MuleException {
            super.start();
            this.blockingScheduler = this.blockingSchedulerSupplier.get();
            this.cpuIntensiveScheduler = this.cpuIntensiveSchedulerSupplier.get();
        }

        @Override
        public void stop() throws MuleException {
            if (this.blockingScheduler != null) {
                this.getSchedulerStopper().accept(this.blockingScheduler);
            }
            if (this.cpuIntensiveScheduler != null) {
                this.getSchedulerStopper().accept(this.cpuIntensiveScheduler);
            }
            super.stop();
        }

        @Override
        public Function<Publisher<Event>, Publisher<Event>> onProcessor(Processor messageProcessor, Function<Publisher<Event>, Publisher<Event>> processorFunction) {
            if (messageProcessor.getProccesingType() == ReactiveProcessor.ProcessingType.BLOCKING) {
                return this.proactor(processorFunction, this.blockingScheduler);
            }
            if (messageProcessor.getProccesingType() == ReactiveProcessor.ProcessingType.CPU_INTENSIVE) {
                return this.proactor(processorFunction, this.cpuIntensiveScheduler);
            }
            return publisher -> Flux.from((Publisher)publisher).transform(processorFunction);
        }

        private Function<Publisher<Event>, Publisher<Event>> proactor(Function<Publisher<Event>, Publisher<Event>> processorFunction, Scheduler scheduler) {
            return publisher -> Flux.from((Publisher)publisher).publishOn(this.createReactorScheduler(scheduler)).transform(processorFunction).publishOn(this.createReactorScheduler(this.cpuLightScheduler));
        }
    }
}

