/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.processor.chain;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.interception.ProcessorInterceptor;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.core.AbstractAnnotatedObject;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.construct.MessageProcessorPathResolver;
import org.mule.runtime.core.api.construct.Pipeline;
import org.mule.runtime.core.api.exception.MessagingExceptionHandler;
import org.mule.runtime.core.api.exception.MessagingExceptionHandlerAware;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.processor.MessageProcessorChain;
import org.mule.runtime.core.api.processor.MessageProcessorPathElement;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.context.notification.MessageProcessorNotification;
import org.mule.runtime.core.context.notification.ServerNotificationManager;
import org.mule.runtime.core.exception.MessagingException;
import org.mule.runtime.core.execution.MessageProcessorExecutionTemplate;
import org.mule.runtime.core.processor.interceptor.ReactiveInterceptorAdapter;
import org.mule.runtime.core.util.ExceptionUtils;
import org.mule.runtime.core.util.NotificationUtils;
import org.mule.runtime.core.util.StringUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

public abstract class AbstractMessageProcessorChain
extends AbstractAnnotatedObject
implements MessageProcessorChain {
    private static final Logger log = LoggerFactory.getLogger(AbstractMessageProcessorChain.class);
    protected String name;
    protected List<Processor> processors;
    protected MuleContext muleContext;
    protected FlowConstruct flowConstruct;
    protected MessageProcessorExecutionTemplate messageProcessorExecutionTemplate = MessageProcessorExecutionTemplate.createExecutionTemplate();

    public AbstractMessageProcessorChain(List<Processor> processors) {
        this(null, processors);
    }

    public AbstractMessageProcessorChain(String name, List<Processor> processors) {
        this.name = name;
        this.processors = processors;
    }

    @Override
    public Event process(Event event) throws MuleException {
        if (log.isDebugEnabled()) {
            log.debug(String.format("Invoking %s with event %s", this, event));
        }
        if (event == null) {
            return null;
        }
        return this.doProcess(event);
    }

    protected Event doProcess(Event event) throws MuleException {
        for (Processor processor : this.getProcessorsToExecute()) {
            Event.setCurrentEvent(event);
            if ((event = this.messageProcessorExecutionTemplate.execute(processor, event)) != null) continue;
            return null;
        }
        return event;
    }

    @Override
    public Publisher<Event> apply(Publisher<Event> publisher) {
        List<BiFunction<Processor, Function<Publisher<Event>, Publisher<Event>>, Function<Publisher<Event>, Publisher<Event>>>> interceptorsToBeExecuted = this.resolveInterceptors();
        Flux stream = Flux.from(publisher);
        Iterator<Processor> iterator = this.getProcessorsToExecute().iterator();
        while (iterator.hasNext()) {
            Processor processor;
            Function<Publisher<Event>, Publisher<Event>> processorFunction = processor = iterator.next();
            for (BiFunction<Processor, Function<Publisher<Event>, Publisher<Event>>, Function<Publisher<Event>, Publisher<Event>>> interceptor : interceptorsToBeExecuted) {
                processorFunction = interceptor.apply(processor, processorFunction);
            }
            stream = stream.transform((Function)processorFunction);
        }
        return stream;
    }

    private List<BiFunction<Processor, Function<Publisher<Event>, Publisher<Event>>, Function<Publisher<Event>, Publisher<Event>>>> resolveInterceptors() {
        ArrayList<BiFunction<Processor, Function, Function>> interceptors = new ArrayList<BiFunction<Processor, Function, Function>>();
        if (this.flowConstruct instanceof Pipeline) {
            interceptors.add((processor, next) -> ((Pipeline)this.flowConstruct).getProcessingStrategy().onProcessor((Processor)processor, (Function<Publisher<Event>, Publisher<Event>>)next));
        }
        interceptors.add((processor, next) -> stream -> Flux.from((Publisher)stream).transform(next).mapError(MessagingException.class, this.handleMessagingException((Processor)processor)));
        interceptors.add((processor, next) -> stream -> Flux.from((Publisher)stream).doOnNext(this.preNotification((Processor)processor)).transform(next).doOnNext(this.postNotification((Processor)processor)).doOnError(MessagingException.class, this.errorNotification((Processor)processor)));
        interceptors.add((processor, next) -> stream -> Flux.from((Publisher)stream).doOnNext(event -> Event.setCurrentEvent(event)).transform(next).doOnNext(result -> Event.setCurrentEvent(result)));
        List<ProcessorInterceptor> interceptionHandlerChain = this.muleContext.getProcessorInterceptorManager().getInterceptors();
        LinkedList<BiFunction<Processor, Function<Publisher<Event>, Publisher<Event>>, Function<Publisher<Event>, Publisher<Event>>>> interceptorsToBeExecuted = new LinkedList<BiFunction<Processor, Function<Publisher<Event>, Publisher<Event>>, Function<Publisher<Event>, Publisher<Event>>>>();
        interceptionHandlerChain.stream().forEach(interceptionHandler -> {
            ReactiveInterceptorAdapter reactiveInterceptorAdapter = new ReactiveInterceptorAdapter((ProcessorInterceptor)interceptionHandler);
            reactiveInterceptorAdapter.setFlowConstruct(this.flowConstruct);
            interceptorsToBeExecuted.add(0, reactiveInterceptorAdapter);
        });
        interceptorsToBeExecuted.addAll(0, interceptors);
        return interceptorsToBeExecuted;
    }

    private Function<MessagingException, MessagingException> handleMessagingException(Processor processor) {
        return exception -> {
            Processor failing = exception.getFailingMessageProcessor();
            if (failing == null) {
                failing = processor;
                exception = new MessagingException(exception.getI18nMessage(), exception.getEvent(), exception.getCause(), processor);
            }
            exception.setProcessedEvent(ExceptionUtils.createErrorEvent(exception.getEvent(), processor, exception, this.muleContext.getErrorTypeLocator()));
            return ExceptionUtils.putContext(exception, failing, exception.getEvent(), this.flowConstruct, this.muleContext);
        };
    }

    private Consumer<Event> preNotification(Processor processor) {
        return event -> {
            if (event.isNotificationsEnabled()) {
                this.fireNotification(this.muleContext.getNotificationManager(), this.flowConstruct, (Event)event, processor, null, 1601);
            }
        };
    }

    private Consumer<Event> postNotification(Processor processor) {
        return event -> {
            if (event.isNotificationsEnabled()) {
                this.fireNotification(this.muleContext.getNotificationManager(), this.flowConstruct, (Event)event, processor, null, 1602);
            }
        };
    }

    private Consumer<MessagingException> errorNotification(Processor processor) {
        return exception -> {
            if (exception.getEvent().isNotificationsEnabled()) {
                this.fireNotification(this.muleContext.getNotificationManager(), this.flowConstruct, exception.getEvent(), processor, (MessagingException)((Object)exception), 1602);
            }
        };
    }

    private void fireNotification(ServerNotificationManager serverNotificationManager, FlowConstruct flowConstruct, Event event, Processor processor, MessagingException exceptionThrown, int action) {
        if (serverNotificationManager != null && serverNotificationManager.isNotificationEnabled(MessageProcessorNotification.class) && flowConstruct instanceof MessageProcessorPathResolver && ((MessageProcessorPathResolver)((Object)flowConstruct)).getProcessorPath(processor) != null) {
            serverNotificationManager.fireNotification(new MessageProcessorNotification(flowConstruct, event, processor, exceptionThrown, action));
        }
    }

    protected List<Processor> getProcessorsToExecute() {
        return this.processors;
    }

    public String toString() {
        StringBuilder string = new StringBuilder();
        string.append(this.getClass().getSimpleName());
        if (StringUtils.isNotBlank((String)this.name)) {
            string.append(String.format(" '%s' ", this.name));
        }
        Iterator<Processor> mpIterator = this.processors.iterator();
        String nl = String.format("%n", new Object[0]);
        if (mpIterator.hasNext()) {
            string.append(String.format("%n[ ", new Object[0]));
            while (mpIterator.hasNext()) {
                Processor mp = mpIterator.next();
                String indented = StringUtils.replace((String)mp.toString(), (String)nl, (String)String.format("%n  ", new Object[0]));
                string.append(String.format("%n  %s", indented));
                if (!mpIterator.hasNext()) continue;
                string.append(", ");
            }
            string.append(String.format("%n]", new Object[0]));
        }
        return string.toString();
    }

    @Override
    public List<Processor> getMessageProcessors() {
        return this.processors;
    }

    protected List<Processor> getMessageProcessorsForLifecycle() {
        return this.processors;
    }

    @Override
    public void addMessageProcessorPathElements(MessageProcessorPathElement pathElement) {
        NotificationUtils.addMessageProcessorPathElements(this.getMessageProcessors(), pathElement);
    }

    @Override
    public void setMessagingExceptionHandler(MessagingExceptionHandler messagingExceptionHandler) {
        for (Processor processor : this.processors) {
            if (!(processor instanceof MessagingExceptionHandlerAware)) continue;
            ((MessagingExceptionHandlerAware)((Object)processor)).setMessagingExceptionHandler(messagingExceptionHandler);
        }
    }

    @Override
    public void setMuleContext(MuleContext muleContext) {
        this.muleContext = muleContext;
        this.messageProcessorExecutionTemplate.setMuleContext(muleContext);
        LifecycleUtils.setMuleContextIfNeeded(this.getMessageProcessorsForLifecycle(), muleContext);
    }

    @Override
    public void setFlowConstruct(FlowConstruct flowConstruct) {
        this.flowConstruct = flowConstruct;
        this.messageProcessorExecutionTemplate.setFlowConstruct(flowConstruct);
        LifecycleUtils.setFlowConstructIfNeeded(this.getMessageProcessorsForLifecycle(), flowConstruct);
    }

    public void initialise() throws InitialisationException {
        LifecycleUtils.initialiseIfNeeded(this.getMessageProcessorsForLifecycle());
    }

    public void start() throws MuleException {
        ArrayList<Processor> startedProcessors = new ArrayList<Processor>();
        try {
            for (Processor processor : this.getMessageProcessorsForLifecycle()) {
                if (!(processor instanceof Startable)) continue;
                ((Startable)processor).start();
                startedProcessors.add(processor);
            }
        }
        catch (MuleException e) {
            LifecycleUtils.stopIfNeeded(this.getMessageProcessorsForLifecycle());
            throw e;
        }
    }

    public void stop() throws MuleException {
        LifecycleUtils.stopIfNeeded(this.getMessageProcessorsForLifecycle());
    }

    public void dispose() {
        LifecycleUtils.disposeIfNeeded(this.getMessageProcessorsForLifecycle(), log);
    }
}

