/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.routing;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.util.Preconditions;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.processor.MessageProcessorPathElement;
import org.mule.runtime.core.api.processor.MessageProcessors;
import org.mule.runtime.core.api.processor.MessageRouter;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.routing.AggregationContext;
import org.mule.runtime.core.api.routing.RoutePathNotFoundException;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.config.i18n.CoreMessages;
import org.mule.runtime.core.processor.AbstractMessageProcessorOwner;
import org.mule.runtime.core.routing.AbstractRoutingStrategy;
import org.mule.runtime.core.routing.AggregationStrategy;
import org.mule.runtime.core.routing.CollectAllAggregationStrategy;
import org.mule.runtime.core.util.NotificationUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ScatterGatherRouter
extends AbstractMessageProcessorOwner
implements MessageRouter {
    private static final Logger logger = LoggerFactory.getLogger(ScatterGatherRouter.class);
    private boolean parallel = true;
    private long timeout = 0L;
    private List<Processor> routes = new ArrayList<Processor>();
    private boolean initialised = false;
    private List<Processor> routeChains = Collections.emptyList();
    private AggregationStrategy aggregationStrategy;

    @Override
    public Event process(Event event) throws MuleException {
        try {
            return (Event)Mono.just((Object)event).transform((Function)this).block();
        }
        catch (Throwable throwable) {
            throw Exceptions.rxExceptionToMuleException(throwable);
        }
    }

    private void assertMorethanOneRoute() throws RoutePathNotFoundException {
        if (CollectionUtils.isEmpty(this.routes)) {
            throw new RoutePathNotFoundException(CoreMessages.noEndpointsForRouter(), null);
        }
    }

    @Override
    public Publisher<Event> apply(Publisher<Event> publisher) {
        return Flux.from(publisher).doOnNext(Exceptions.checkedConsumer(event -> {
            this.assertMorethanOneRoute();
            AbstractRoutingStrategy.validateMessageIsNotConsumable(event, event.getMessage());
        })).concatMap(event -> Flux.from((Publisher)Flux.fromIterable(this.routeChains).concatMap(processor -> Flux.just((Object)event).transform((Function)processor))).collectList().map(Exceptions.checkedFunction(list -> this.aggregationStrategy.aggregate(new AggregationContext((Event)event, (List<Event>)list)))));
    }

    @Override
    public void initialise() throws InitialisationException {
        try {
            this.buildRouteChains();
            if (this.aggregationStrategy == null) {
                this.aggregationStrategy = new CollectAllAggregationStrategy();
            }
            if (this.timeout <= 0L) {
                this.timeout = Long.MAX_VALUE;
            }
        }
        catch (Exception e) {
            throw new InitialisationException((Throwable)e, (Initialisable)this);
        }
        super.initialise();
        this.initialised = true;
    }

    @Override
    public void addRoute(Processor processor) throws MuleException {
        this.checkNotInitialised();
        this.routes.add(processor);
    }

    @Override
    public void removeRoute(Processor processor) throws MuleException {
        this.checkNotInitialised();
        this.routes.remove(processor);
    }

    private void buildRouteChains() {
        Preconditions.checkState((this.routes.size() > 1 ? 1 : 0) != 0, (String)"At least 2 routes are required for ScatterGather");
        this.routeChains = this.routes.stream().map(route -> MessageProcessors.newChain(MessageProcessors.newExplicitChain(route))).collect(Collectors.toList());
    }

    private void checkNotInitialised() {
        Preconditions.checkState((!this.initialised ? 1 : 0) != 0, (String)"<scatter-gather> router is not dynamic. Cannot modify routes after initialisation");
    }

    @Override
    protected List<Processor> getOwnedMessageProcessors() {
        return this.routeChains;
    }

    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
        this.aggregationStrategy = aggregationStrategy;
    }

    public void setParallel(boolean parallel) {
        this.parallel = parallel;
    }

    public void setTimeout(long timeout) {
        this.timeout = timeout;
    }

    public void setRoutes(List<Processor> routes) {
        this.routes = routes;
    }

    @Override
    public void addMessageProcessorPathElements(MessageProcessorPathElement pathElement) {
        pathElement = pathElement.addChild(this);
        for (Processor route : this.routeChains) {
            NotificationUtils.addMessageProcessorPathElements(route, pathElement);
        }
    }
}

