/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.construct;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.Predicate;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.LifecycleException;
import org.mule.runtime.core.AbstractAnnotatedObject;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.EventContext;
import org.mule.runtime.core.api.GlobalNameableObject;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.connector.ConnectException;
import org.mule.runtime.core.api.construct.FlowConstructInvalidException;
import org.mule.runtime.core.api.construct.Pipeline;
import org.mule.runtime.core.api.processor.DefaultMessageProcessorPathElement;
import org.mule.runtime.core.api.processor.InternalMessageProcessor;
import org.mule.runtime.core.api.processor.MessageProcessorBuilder;
import org.mule.runtime.core.api.processor.MessageProcessorChain;
import org.mule.runtime.core.api.processor.MessageProcessorChainBuilder;
import org.mule.runtime.core.api.processor.MessageProcessorContainer;
import org.mule.runtime.core.api.processor.MessageProcessorPathElement;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.api.scheduler.SchedulerService;
import org.mule.runtime.core.api.source.ClusterizableMessageSource;
import org.mule.runtime.core.api.source.CompositeMessageSource;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.source.NonBlockingMessageSource;
import org.mule.runtime.core.api.transport.LegacyInboundEndpoint;
import org.mule.runtime.core.config.i18n.CoreMessages;
import org.mule.runtime.core.context.notification.PipelineMessageNotification;
import org.mule.runtime.core.exception.MessagingException;
import org.mule.runtime.core.internal.construct.AbstractFlowConstruct;
import org.mule.runtime.core.internal.streaming.StreamingManagerAdapter;
import org.mule.runtime.core.internal.util.rx.Operators;
import org.mule.runtime.core.processor.AbstractRequestResponseMessageProcessor;
import org.mule.runtime.core.processor.IdempotentRedeliveryPolicy;
import org.mule.runtime.core.processor.chain.DefaultMessageProcessorChainBuilder;
import org.mule.runtime.core.processor.strategy.DefaultFlowProcessingStrategyFactory;
import org.mule.runtime.core.processor.strategy.LegacyAsynchronousProcessingStrategyFactory;
import org.mule.runtime.core.processor.strategy.LegacyDefaultFlowProcessingStrategyFactory;
import org.mule.runtime.core.processor.strategy.LegacyNonBlockingProcessingStrategyFactory;
import org.mule.runtime.core.processor.strategy.LegacySynchronousProcessingStrategyFactory;
import org.mule.runtime.core.processor.strategy.SynchronousProcessingStrategyFactory;
import org.mule.runtime.core.source.ClusterizableMessageSourceWrapper;
import org.mule.runtime.core.source.polling.PollingMessageSource;
import org.mule.runtime.core.transaction.TransactionCoordination;
import org.mule.runtime.core.util.NotificationUtils;
import org.mule.runtime.core.util.concurrent.ThreadNameHelper;
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public abstract class AbstractPipeline
extends AbstractFlowConstruct
implements Pipeline {
    protected MessageSource messageSource;
    protected MessageProcessorChain pipeline;
    protected final SchedulerService schedulerService;
    protected StreamingManagerAdapter streamingManager;
    protected List<Processor> messageProcessors = Collections.emptyList();
    private NotificationUtils.PathResolver flowMap;
    protected ProcessingStrategyFactory processingStrategyFactory;
    protected ProcessingStrategy processingStrategy;
    private boolean canProcessMessage = false;
    private Cache<String, EventContext> eventContextCache = CacheBuilder.newBuilder().weakValues().build();
    protected Sink sink;
    private static final Predicate sourceCompatibleWithAsync = new Predicate(){

        public boolean evaluate(Object messageSource) {
            if (messageSource instanceof LegacyInboundEndpoint) {
                return ((LegacyInboundEndpoint)messageSource).isCompatibleWithAsync();
            }
            if (messageSource instanceof CompositeMessageSource) {
                return CollectionUtils.selectRejected(((CompositeMessageSource)messageSource).getSources(), (Predicate)this).isEmpty();
            }
            return true;
        }
    };

    public AbstractPipeline(String name, MuleContext muleContext) {
        super(name, muleContext);
        this.schedulerService = muleContext.getSchedulerService();
        this.initialiseProcessingStrategy();
    }

    protected MessageProcessorChain createPipeline() throws MuleException {
        DefaultMessageProcessorChainBuilder builder = new DefaultMessageProcessorChainBuilder();
        builder.setName("'" + this.getName() + "' processor chain");
        this.configurePreProcessors(builder);
        this.configureMessageProcessors(builder);
        this.configurePostProcessors(builder);
        return builder.build();
    }

    protected ProcessingStrategyFactory createDefaultProcessingStrategyFactory() {
        return new SynchronousProcessingStrategyFactory();
    }

    private void initialiseProcessingStrategy() {
        if (this.processingStrategy == null) {
            if (this.processingStrategyFactory == null) {
                ProcessingStrategyFactory defaultProcessingStrategyFactory = this.muleContext.getConfiguration().getDefaultProcessingStrategyFactory();
                this.processingStrategyFactory = defaultProcessingStrategyFactory == null ? this.createDefaultProcessingStrategyFactory() : defaultProcessingStrategyFactory;
            }
            this.processingStrategy = this.processingStrategyFactory.create(this.muleContext, ThreadNameHelper.getPrefix(this.muleContext) + this.getName());
        }
        boolean userConfiguredProcessingStrategy = !(this.getProcessingStrategyFactory() instanceof DefaultFlowProcessingStrategyFactory);
        boolean redeliveryHandlerConfigured = this.isRedeliveryPolicyConfigured();
        if (!userConfiguredProcessingStrategy && redeliveryHandlerConfigured) {
            this.processingStrategy = new SynchronousProcessingStrategyFactory().create(this.muleContext, ThreadNameHelper.getPrefix(this.muleContext) + this.getName());
            if (LOGGER.isWarnEnabled()) {
                LOGGER.warn("Using message redelivery and on-error-propagate requires synchronous processing strategy. Processing strategy re-configured to synchronous");
            }
        }
    }

    protected void configurePreProcessors(MessageProcessorChainBuilder builder) throws MuleException {
        builder.chain(new ProcessorStartCompleteProcessor());
    }

    protected void configurePostProcessors(MessageProcessorChainBuilder builder) throws MuleException {
        builder.chain(new ProcessEndProcessor());
    }

    @Override
    public void setMessageProcessors(List<Processor> messageProcessors) {
        this.messageProcessors = messageProcessors;
    }

    @Override
    public List<Processor> getMessageProcessors() {
        return this.messageProcessors;
    }

    @Override
    public MessageSource getMessageSource() {
        return this.messageSource;
    }

    @Override
    public void setMessageSource(MessageSource messageSource) {
        this.messageSource = messageSource instanceof ClusterizableMessageSource ? new ClusterizableMessageSourceWrapper(this.muleContext, (ClusterizableMessageSource)messageSource, this) : messageSource;
    }

    @Override
    public boolean isSynchronous() {
        return this.processingStrategy.isSynchronous();
    }

    @Override
    public ProcessingStrategyFactory getProcessingStrategyFactory() {
        return this.processingStrategyFactory;
    }

    @Override
    public void setProcessingStrategyFactory(ProcessingStrategyFactory processingStrategyFactory) {
        this.processingStrategyFactory = processingStrategyFactory;
        this.processingStrategy = null;
    }

    @Override
    public ProcessingStrategy getProcessingStrategy() {
        return this.processingStrategy;
    }

    @Override
    protected void doInitialise() throws MuleException {
        super.doInitialise();
        this.initialiseProcessingStrategy();
        this.streamingManager = this.muleContext.getRegistry().lookupObject(StreamingManagerAdapter.class);
        this.pipeline = this.createPipeline();
        if (this.messageSource != null) {
            this.messageSource.setListener(new Processor(){

                @Override
                public Event process(Event event) throws MuleException {
                    if (AbstractPipeline.this.useBlockingCodePath()) {
                        return AbstractPipeline.this.pipeline.process(event);
                    }
                    try {
                        return (Event)Mono.just((Object)event).transform((Function)this).block();
                    }
                    catch (Exception e) {
                        throw org.mule.runtime.core.api.rx.Exceptions.rxExceptionToMuleException(e);
                    }
                }

                @Override
                public Publisher<Event> apply(Publisher<Event> publisher) {
                    if (AbstractPipeline.this.processingStrategy == LegacySynchronousProcessingStrategyFactory.LEGACY_SYNCHRONOUS_PROCESSING_STRATEGY_INSTANCE) {
                        return Flux.from(publisher).handle(Operators.nullSafeMap(org.mule.runtime.core.api.rx.Exceptions.checkedFunction(request -> AbstractPipeline.this.pipeline.process((Event)request))));
                    }
                    return Flux.from(publisher).doOnNext(AbstractPipeline.this.assertStarted()).doOnNext(event -> AbstractPipeline.this.sink.accept((Event)event)).flatMap(event -> Mono.from((Publisher)event.getContext()));
                }
            });
        }
        this.injectFlowConstructMuleContext(this.messageSource);
        this.injectExceptionHandler(this.messageSource);
        this.injectFlowConstructMuleContext(this.pipeline);
        this.injectExceptionHandler(this.pipeline);
        this.initialiseIfInitialisable(this.messageSource);
        this.initialiseIfInitialisable(this.pipeline);
        this.createFlowMap();
    }

    protected Function<Publisher<Event>, Publisher<Event>> processFlowFunction() {
        return stream -> Flux.from((Publisher)stream).transform(this.processingStrategy.onPipeline(this, this.pipeline)).doOnNext(response -> {
            response.getContext().success((Event)response);
            this.streamingManager.success((Event)response);
        }).doOnError(MessagingException.class, this.handleError()).doOnError(Exceptions.EventDroppedException.class, ede -> {
            ede.getEvent().getContext().success();
            this.streamingManager.error(ede.getEvent());
        }).doOnError(org.mule.runtime.core.api.rx.Exceptions.UNEXPECTED_EXCEPTION_PREDICATE, throwable -> LOGGER.error("Unhandled exception in async processing " + throwable));
    }

    private Consumer<MessagingException> handleError() {
        if (this.messageSource instanceof LegacyInboundEndpoint || this.messageSource instanceof PollingMessageSource) {
            return me -> {
                me.getEvent().getContext().error((Throwable)((Object)me));
                this.streamingManager.error(me.getEvent());
            };
        }
        return me -> Mono.defer(() -> Mono.from(this.getExceptionListener().apply((MessagingException)((Object)me)))).doOnNext(event -> {
            event.getContext().success((Event)event);
            this.streamingManager.success((Event)event);
        }).doOnError(throwable -> {
            me.getEvent().getContext().error((Throwable)throwable);
            this.streamingManager.error(me.getEvent());
        }).subscribe();
    }

    protected void configureMessageProcessors(MessageProcessorChainBuilder builder) throws MuleException {
        for (Processor processor : this.getMessageProcessors()) {
            if (processor instanceof Processor) {
                builder.chain(processor);
                continue;
            }
            if (processor instanceof MessageProcessorBuilder) {
                builder.chain((MessageProcessorBuilder)((Object)processor));
                continue;
            }
            throw new IllegalArgumentException("MessageProcessorBuilder should only have MessageProcessor's or MessageProcessorBuilder's configured");
        }
    }

    @Override
    public void validateConstruct() throws FlowConstructInvalidException {
        super.validateConstruct();
        boolean userConfiguredProcessingStrategy = !(this.getProcessingStrategyFactory() instanceof DefaultFlowProcessingStrategyFactory);
        boolean userConfiguredAsyncProcessingStrategy = this.getProcessingStrategyFactory() instanceof LegacyAsynchronousProcessingStrategyFactory && userConfiguredProcessingStrategy;
        boolean isCompatibleWithAsync = sourceCompatibleWithAsync.evaluate((Object)this.messageSource);
        if (userConfiguredAsyncProcessingStrategy && this.messageSource != null && !isCompatibleWithAsync) {
            throw new FlowConstructInvalidException(CoreMessages.createStaticMessage((String)"One of the message sources configured on this Flow is not compatible with an asynchronous processing strategy.  Either because it is request-response, has a transaction defined, or messaging redelivered is configured."), this);
        }
        if (this.getProcessingStrategyFactory() instanceof LegacyNonBlockingProcessingStrategyFactory && this.messageSource != null && !(this.messageSource instanceof NonBlockingMessageSource)) {
            throw new FlowConstructInvalidException(CoreMessages.createStaticMessage((String)String.format("The non-blocking processing strategy (%s) currently only supports non-blocking messages sources (source is %s).", this.getProcessingStrategyFactory().toString(), this.messageSource.toString())), this);
        }
    }

    protected boolean isRedeliveryPolicyConfigured() {
        if (this.getMessageProcessors().isEmpty()) {
            return false;
        }
        return this.getMessageProcessors().get(0) instanceof IdempotentRedeliveryPolicy;
    }

    @Override
    protected void doStart() throws MuleException {
        super.doStart();
        this.startIfStartable(this.processingStrategy);
        this.sink = this.processingStrategy.createSink(this, this.processFlowFunction());
        this.startIfStartable(this.pipeline);
        this.canProcessMessage = true;
        if (this.muleContext.isStarted()) {
            try {
                this.startIfStartable(this.messageSource);
            }
            catch (ConnectException ce) {
                throw ce;
            }
            catch (MuleException e) {
                this.doStop();
                throw e;
            }
        }
    }

    private void createFlowMap() {
        DefaultMessageProcessorPathElement pipeLinePathElement = new DefaultMessageProcessorPathElement(null, this.getName());
        this.addMessageProcessorPathElements(pipeLinePathElement);
        this.flowMap = NotificationUtils.buildPathResolver(pipeLinePathElement);
    }

    @Override
    public void addMessageProcessorPathElements(MessageProcessorPathElement pathElement) {
        String processorsPrefix = "processors";
        String esPrefix = "es";
        NotificationUtils.addMessageProcessorPathElements(this.pipeline, pathElement.addChild(processorsPrefix));
        if (this.exceptionListener instanceof MessageProcessorContainer) {
            String esGlobalName = this.getExceptionStrategyGlobalName();
            MessageProcessorPathElement exceptionStrategyPathElement = pathElement;
            if (esGlobalName != null) {
                exceptionStrategyPathElement = exceptionStrategyPathElement.addChild(esGlobalName);
            }
            exceptionStrategyPathElement = exceptionStrategyPathElement.addChild(esPrefix);
            ((MessageProcessorContainer)((Object)this.exceptionListener)).addMessageProcessorPathElements(exceptionStrategyPathElement);
        }
    }

    private String getExceptionStrategyGlobalName() {
        String globalName = null;
        if (this.exceptionListener instanceof GlobalNameableObject) {
            globalName = ((GlobalNameableObject)((Object)this.exceptionListener)).getGlobalName();
        }
        return globalName;
    }

    @Override
    public String getProcessorPath(Processor processor) {
        return this.flowMap.resolvePath(processor);
    }

    public Consumer<Event> assertStarted() {
        return event -> {
            if (!this.canProcessMessage) {
                throw Exceptions.propagate((Throwable)((Object)new MessagingException((Event)event, new LifecycleException(CoreMessages.isStopped(this.getName()), (Object)event.getMessage()))));
            }
        };
    }

    @Override
    protected void doStop() throws MuleException {
        try {
            this.stopIfStoppable(this.messageSource);
        }
        finally {
            this.canProcessMessage = false;
        }
        this.disposeIfDisposable(this.sink);
        this.sink = null;
        this.stopIfStoppable(this.processingStrategy);
        this.stopIfStoppable(this.pipeline);
        super.doStop();
    }

    @Override
    protected void doDispose() {
        this.disposeIfDisposable(this.pipeline);
        this.disposeIfDisposable(this.messageSource);
        super.doDispose();
    }

    protected boolean useBlockingCodePath() {
        return TransactionCoordination.isTransactionActive() && (this.processingStrategyFactory instanceof DefaultFlowProcessingStrategyFactory || this.processingStrategyFactory instanceof LegacyDefaultFlowProcessingStrategyFactory || this.processingStrategyFactory instanceof SynchronousProcessingStrategyFactory) || this.processingStrategy == LegacySynchronousProcessingStrategyFactory.LEGACY_SYNCHRONOUS_PROCESSING_STRATEGY_INSTANCE;
    }

    @Override
    public Map<String, EventContext> getSerializationEventContextCache() {
        return this.eventContextCache.asMap();
    }

    private class ProcessorStartCompleteProcessor
    extends AbstractRequestResponseMessageProcessor
    implements InternalMessageProcessor {
        private ProcessorStartCompleteProcessor() {
        }

        @Override
        protected Event processRequest(Event event) throws MuleException {
            this.muleContext.getNotificationManager().fireNotification(new PipelineMessageNotification(AbstractPipeline.this, event, 1801));
            return super.processRequest(event);
        }

        @Override
        protected void processFinally(Event event, MessagingException exception) {
            this.muleContext.getNotificationManager().fireNotification(new PipelineMessageNotification(AbstractPipeline.this, event, 1804, exception));
        }
    }

    private class ProcessEndProcessor
    extends AbstractAnnotatedObject
    implements Processor,
    InternalMessageProcessor {
        private ProcessEndProcessor() {
        }

        @Override
        public Event process(Event event) throws MuleException {
            AbstractPipeline.this.muleContext.getNotificationManager().fireNotification(new PipelineMessageNotification(AbstractPipeline.this, event, 1802));
            return event;
        }
    }
}

