/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.construct;

import java.util.Optional;
import java.util.function.Function;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.core.api.DefaultMuleException;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.connector.ReplyToHandler;
import org.mule.runtime.core.api.processor.DynamicPipeline;
import org.mule.runtime.core.api.processor.DynamicPipelineBuilder;
import org.mule.runtime.core.api.processor.DynamicPipelineException;
import org.mule.runtime.core.api.processor.MessageProcessorChainBuilder;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory;
import org.mule.runtime.core.config.i18n.CoreMessages;
import org.mule.runtime.core.construct.AbstractPipeline;
import org.mule.runtime.core.construct.DynamicPipelineMessageProcessor;
import org.mule.runtime.core.construct.processor.FlowConstructStatisticsMessageProcessor;
import org.mule.runtime.core.exception.MessagingException;
import org.mule.runtime.core.execution.ErrorHandlingExecutionTemplate;
import org.mule.runtime.core.interceptor.ProcessingTimeInterceptor;
import org.mule.runtime.core.management.stats.FlowConstructStatistics;
import org.mule.runtime.core.processor.strategy.DefaultFlowProcessingStrategyFactory;
import org.mule.runtime.core.routing.requestreply.AsyncReplyToPropertyRequestReplyReplier;
import org.mule.runtime.core.transaction.TransactionCoordination;
import org.mule.runtime.core.util.rx.Exceptions;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class Flow
extends AbstractPipeline
implements Processor,
DynamicPipeline {
    private DynamicPipelineMessageProcessor dynamicPipelineMessageProcessor;

    public Flow(String name, MuleContext muleContext) {
        super(name, muleContext);
    }

    @Override
    public Event process(Event event) throws MuleException {
        if (TransactionCoordination.isTransactionActive()) {
            Event newEvent = this.createMuleEventForCurrentFlow(event, event.getReplyToDestination(), event.getReplyToHandler());
            try {
                ErrorHandlingExecutionTemplate executionTemplate = ErrorHandlingExecutionTemplate.createErrorHandlingExecutionTemplate(this.muleContext, this, this.getExceptionListener());
                Event result = executionTemplate.execute(() -> this.pipeline.process(newEvent));
                return this.createReturnEventForParentFlowConstruct(result, event);
            }
            catch (MessagingException e) {
                e.setProcessedEvent(this.createReturnEventForParentFlowConstruct(e.getEvent(), event));
                throw e;
            }
            catch (Exception e) {
                this.resetRequestContextEvent(event);
                throw new DefaultMuleException(CoreMessages.createStaticMessage((String)"Flow execution exception"), (Throwable)e);
            }
        }
        try {
            return (Event)Mono.just((Object)event).transform((Function)this).block();
        }
        catch (Exception e) {
            throw Exceptions.rxExceptionToMuleException(e);
        }
    }

    @Override
    public Publisher<Event> apply(Publisher<Event> publisher) {
        if (TransactionCoordination.isTransactionActive()) {
            return Processor.super.apply(publisher);
        }
        return Flux.from(publisher).concatMap(event -> Flux.just((Object)event).map(request -> this.createMuleEventForCurrentFlow((Event)request, request.getReplyToDestination(), request.getReplyToHandler())).transform((Function)this.pipeline).onErrorResumeWith(MessagingException.class, (Function)this.getExceptionListener()).doOnError(exception -> {
            if (!(exception instanceof MessagingException)) {
                LOGGER.error("Unhandled exception in async processing " + exception);
            }
        }).map(respone -> this.createReturnEventForParentFlowConstruct((Event)respone, (Event)event)));
    }

    private Event createMuleEventForCurrentFlow(Event event, Object replyToDestination, ReplyToHandler replyToHandler) {
        replyToHandler = null;
        event = Event.builder(event).flow(this).replyToHandler(replyToHandler).replyToDestination(replyToDestination).synchronous(event.isSynchronous() || this.isSynchronous()).build();
        this.resetRequestContextEvent(event);
        return event;
    }

    private Event createReturnEventForParentFlowConstruct(Event result, Event original) {
        if (result != null) {
            Optional errorOptional = result.getError();
            result = Event.builder(result).flow(original.getFlowConstruct()).replyToHandler(original.getReplyToHandler()).replyToDestination(original.getReplyToDestination()).synchronous(original.isSynchronous()).error(errorOptional.orElse(null)).build();
        }
        this.resetRequestContextEvent(result);
        return result;
    }

    private void resetRequestContextEvent(Event event) {
        Event.setCurrentEvent(event);
    }

    @Override
    protected void configurePreProcessors(MessageProcessorChainBuilder builder) throws MuleException {
        super.configurePreProcessors(builder);
        builder.chain(new AbstractPipeline.ProcessIfPipelineStartedMessageProcessor());
        builder.chain(new ProcessingTimeInterceptor());
        builder.chain(new FlowConstructStatisticsMessageProcessor());
        this.dynamicPipelineMessageProcessor = new DynamicPipelineMessageProcessor(this);
        builder.chain(this.dynamicPipelineMessageProcessor);
    }

    @Override
    protected void configurePostProcessors(MessageProcessorChainBuilder builder) throws MuleException {
        builder.chain(new AsyncReplyToPropertyRequestReplyReplier());
        super.configurePostProcessors(builder);
    }

    @Override
    protected ProcessingStrategyFactory createDefaultProcessingStrategyFactory() {
        return new DefaultFlowProcessingStrategyFactory();
    }

    @Override
    public String getConstructType() {
        return "Flow";
    }

    @Override
    protected void configureStatistics() {
        this.statistics = new FlowConstructStatistics(this.getConstructType(), this.name);
        this.statistics.setEnabled(this.muleContext.getStatistics().isEnabled());
        this.muleContext.getStatistics().add(this.statistics);
    }

    @Override
    public DynamicPipelineBuilder dynamicPipeline(String id) throws DynamicPipelineException {
        return this.dynamicPipelineMessageProcessor.dynamicPipeline(id);
    }

    @Override
    public boolean isSynchronous() {
        return this.getProcessingStrategy() != null ? this.getProcessingStrategy().isSynchronous() : true;
    }
}

