/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.processor.strategy;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.core.api.DefaultMuleException;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.exception.MessagingExceptionHandler;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory;
import org.mule.runtime.core.api.scheduler.Scheduler;
import org.mule.runtime.core.processor.strategy.AbstractSchedulingProcessingStrategy;
import org.mule.runtime.core.transaction.TransactionCoordination;
import org.mule.runtime.core.util.Predicate;
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class WorkQueueProcessingStrategyFactory
implements ProcessingStrategyFactory {
    public static final String TRANSACTIONAL_ERROR_MESSAGE = "Unable to process a transactional flow asynchronously";
    private int maxThreads;

    @Override
    public ProcessingStrategy create(MuleContext muleContext) {
        return new WorkQueueProcessingStrategy(() -> muleContext.getSchedulerService().ioScheduler(), this.maxThreads, scheduler -> scheduler.stop(muleContext.getConfiguration().getShutdownTimeout(), TimeUnit.MILLISECONDS), muleContext);
    }

    static class WorkQueueProcessingStrategy
    extends AbstractSchedulingProcessingStrategy {
        private Supplier<Scheduler> schedulerSupplier;
        private Scheduler scheduler;

        public WorkQueueProcessingStrategy(Supplier<Scheduler> schedulerSupplier, int maxThreads, Consumer<Scheduler> schedulerStopper, MuleContext muleContext) {
            super(schedulerStopper, muleContext);
            this.schedulerSupplier = schedulerSupplier;
        }

        @Override
        public Function<Publisher<Event>, Publisher<Event>> onPipeline(FlowConstruct flowConstruct, Function<Publisher<Event>, Publisher<Event>> pipelineFunction, MessagingExceptionHandler messagingExceptionHandler) {
            return publisher -> Flux.from((Publisher)publisher).doOnNext(this.assertCanProcessAsync()).publishOn(Schedulers.fromExecutorService((ExecutorService)this.scheduler)).transform(pipelineFunction);
        }

        public void start() throws MuleException {
            this.scheduler = this.schedulerSupplier.get();
        }

        public void stop() throws MuleException {
            if (this.scheduler != null) {
                this.getSchedulerStopper().accept(this.scheduler);
            }
        }

        private Consumer<Event> assertCanProcessAsync() {
            return event -> {
                if (TransactionCoordination.isTransactionActive()) {
                    throw Exceptions.propagate((Throwable)((Object)new DefaultMuleException(I18nMessageFactory.createStaticMessage((String)WorkQueueProcessingStrategyFactory.TRANSACTIONAL_ERROR_MESSAGE))));
                }
            };
        }

        @Override
        protected Predicate<Scheduler> scheduleOverridePredicate() {
            return scheduler -> false;
        }
    }
}

