/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.processor.strategy;

import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.core.api.DefaultMuleException;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.exception.MessagingExceptionHandler;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory;
import org.mule.runtime.core.api.scheduler.Scheduler;
import org.mule.runtime.core.processor.strategy.AbstractSchedulingProcessingStrategy;
import org.mule.runtime.core.transaction.TransactionCoordination;
import org.mule.runtime.core.util.Predicate;
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;

public class MultiReactorProcessingStrategyFactory
implements ProcessingStrategyFactory {
    @Override
    public ProcessingStrategy create(MuleContext muleContext) {
        return new MultiReactorProcessingStrategy(() -> muleContext.getSchedulerService().customScheduler("event-loop", 1), scheduler -> scheduler.stop(muleContext.getConfiguration().getShutdownTimeout(), TimeUnit.MILLISECONDS), muleContext);
    }

    static class MultiReactorProcessingStrategy
    extends AbstractSchedulingProcessingStrategy {
        private Supplier<Scheduler> cpuLightSchedulerSupplier;
        protected Scheduler cpuLightScheduler;

        public MultiReactorProcessingStrategy(Supplier<Scheduler> cpuLightSchedulerSupplier, Consumer<Scheduler> schedulerStopper, MuleContext muleContext) {
            super(schedulerStopper, muleContext);
            this.cpuLightSchedulerSupplier = cpuLightSchedulerSupplier;
        }

        public void start() throws MuleException {
            this.cpuLightScheduler = this.cpuLightSchedulerSupplier.get();
        }

        public void stop() throws MuleException {
            if (this.cpuLightScheduler != null) {
                this.getSchedulerStopper().accept(this.cpuLightScheduler);
            }
        }

        @Override
        public Function<Publisher<Event>, Publisher<Event>> onPipeline(FlowConstruct flowConstruct, Function<Publisher<Event>, Publisher<Event>> pipelineFunction, MessagingExceptionHandler messagingExceptionHandler) {
            return publisher -> Flux.from((Publisher)publisher).doOnNext(this.assertCanProcess()).publishOn(this.createReactorScheduler(this.cpuLightScheduler)).transform(pipelineFunction);
        }

        protected Consumer<Event> assertCanProcess() {
            return event -> {
                if (TransactionCoordination.isTransactionActive()) {
                    throw Exceptions.propagate((Throwable)((Object)new DefaultMuleException(I18nMessageFactory.createStaticMessage((String)"Unable to process a transactional flow asynchronously"))));
                }
            };
        }

        @Override
        protected Predicate<Scheduler> scheduleOverridePredicate() {
            return scheduler -> false;
        }
    }
}

