/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;
import org.axonframework.tracing.NoOpSpanFactory;
import org.axonframework.tracing.Span;
import org.axonframework.tracing.SpanFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractEventBus
implements EventBus {
    private static final Logger logger = LoggerFactory.getLogger(AbstractEventBus.class);
    private final MessageMonitor<? super EventMessage<?>> messageMonitor;
    private final String eventsKey = this + "_EVENTS";
    private final Set<Consumer<List<? extends EventMessage<?>>>> eventProcessors = new CopyOnWriteArraySet();
    private final Set<MessageDispatchInterceptor<? super EventMessage<?>>> dispatchInterceptors = new CopyOnWriteArraySet();
    private final SpanFactory spanFactory;

    protected AbstractEventBus(Builder builder) {
        builder.validate();
        this.messageMonitor = builder.messageMonitor;
        this.spanFactory = builder.spanFactory;
    }

    @Override
    public Registration subscribe(@Nonnull Consumer<List<? extends EventMessage<?>>> eventProcessor) {
        if (this.eventProcessors.add(eventProcessor)) {
            if (logger.isDebugEnabled()) {
                logger.debug("EventProcessor [{}] subscribed successfully", eventProcessor);
            }
        } else {
            logger.info("EventProcessor [{}] not added. It was already subscribed", eventProcessor);
        }
        return () -> {
            if (this.eventProcessors.remove(eventProcessor)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("EventListener {} unsubscribed successfully", (Object)eventProcessor);
                }
                return true;
            }
            logger.info("EventListener {} not removed. It was already unsubscribed", (Object)eventProcessor);
            return false;
        };
    }

    @Override
    public Registration registerDispatchInterceptor(@Nonnull MessageDispatchInterceptor<? super EventMessage<?>> dispatchInterceptor) {
        this.dispatchInterceptors.add(dispatchInterceptor);
        return () -> this.dispatchInterceptors.remove(dispatchInterceptor);
    }

    @Override
    public void publish(@Nonnull List<? extends EventMessage<?>> events) {
        List eventsWithContext = events.stream().map(e -> this.spanFactory.createInternalSpan(() -> this.getClass().getSimpleName() + ".publish", (Message<?>)e).runSupplier(() -> this.spanFactory.propagateContext(e))).collect(Collectors.toList());
        List<MessageMonitor.MonitorCallback> ingested = eventsWithContext.stream().map(this.messageMonitor::onMessageIngested).collect(Collectors.toList());
        if (CurrentUnitOfWork.isStarted()) {
            UnitOfWork<?> unitOfWork = CurrentUnitOfWork.get();
            Assert.state(!unitOfWork.phase().isAfter(UnitOfWork.Phase.PREPARE_COMMIT), () -> "It is not allowed to publish events when the current Unit of Work has already been committed. Please start a new Unit of Work before publishing events.");
            Assert.state(!unitOfWork.root().phase().isAfter(UnitOfWork.Phase.PREPARE_COMMIT), () -> "It is not allowed to publish events when the root Unit of Work has already been committed.");
            unitOfWork.afterCommit((UnitOfWork<T> u) -> ingested.forEach(MessageMonitor.MonitorCallback::reportSuccess));
            unitOfWork.onRollback(uow -> ingested.forEach(message -> message.reportFailure(uow.getExecutionResult().getExceptionResult())));
            this.eventsQueue(unitOfWork).addAll(eventsWithContext);
        } else {
            try {
                this.prepareCommit(this.intercept(eventsWithContext));
                this.commit(eventsWithContext);
                this.afterCommit(eventsWithContext);
                ingested.forEach(MessageMonitor.MonitorCallback::reportSuccess);
            }
            catch (Exception e2) {
                ingested.forEach(m -> m.reportFailure(e2));
                throw e2;
            }
        }
    }

    private List<EventMessage<?>> eventsQueue(UnitOfWork<?> unitOfWork) {
        return unitOfWork.getOrComputeResource(this.eventsKey, r -> {
            Span span = this.spanFactory.createInternalSpan(() -> this.getClass().getSimpleName() + ".commit");
            ArrayList eventQueue = new ArrayList();
            unitOfWork.onPrepareCommit(u -> {
                span.start();
                if (u.parent().isPresent() && !u.parent().get().phase().isAfter(UnitOfWork.Phase.PREPARE_COMMIT)) {
                    this.eventsQueue(u.parent().get()).addAll(eventQueue);
                } else {
                    int processedItems = eventQueue.size();
                    this.doWithEvents(this::prepareCommit, this.intercept(eventQueue));
                    while (processedItems < eventQueue.size()) {
                        List<? extends EventMessage<?>> newMessages = this.intercept(eventQueue.subList(processedItems, eventQueue.size()));
                        processedItems = eventQueue.size();
                        this.doWithEvents(this::prepareCommit, newMessages);
                    }
                }
            });
            unitOfWork.onCommit(u -> {
                if (u.parent().isPresent() && !u.root().phase().isAfter(UnitOfWork.Phase.COMMIT)) {
                    u.root().onCommit(w -> this.doWithEvents(this::commit, eventQueue));
                } else {
                    this.doWithEvents(this::commit, eventQueue);
                }
            });
            unitOfWork.afterCommit((UnitOfWork<T> u) -> {
                if (u.parent().isPresent() && !u.root().phase().isAfter(UnitOfWork.Phase.AFTER_COMMIT)) {
                    u.root().afterCommit((UnitOfWork<T> w) -> this.doWithEvents(this::afterCommit, eventQueue));
                } else {
                    this.doWithEvents(this::afterCommit, eventQueue);
                }
            });
            unitOfWork.onCleanup(u -> {
                u.resources().remove(this.eventsKey);
                span.end();
            });
            return eventQueue;
        });
    }

    protected List<EventMessage<?>> queuedMessages() {
        if (!CurrentUnitOfWork.isStarted()) {
            return Collections.emptyList();
        }
        ArrayList messages = new ArrayList();
        this.addStagedMessages(CurrentUnitOfWork.get(), messages);
        return messages;
    }

    private void addStagedMessages(UnitOfWork<?> unitOfWork, List<EventMessage<?>> messages) {
        unitOfWork.parent().ifPresent(parent -> this.addStagedMessages((UnitOfWork<?>)parent, messages));
        if (unitOfWork.isRolledBack()) {
            return;
        }
        List stagedEvents = unitOfWork.getOrDefaultResource(this.eventsKey, Collections.emptyList());
        for (EventMessage stagedEvent : stagedEvents) {
            if (messages.contains(stagedEvent)) continue;
            messages.add(stagedEvent);
        }
    }

    protected List<? extends EventMessage<?>> intercept(List<? extends EventMessage<?>> events) {
        ArrayList preprocessedEvents = new ArrayList(events);
        for (MessageDispatchInterceptor<EventMessage<?>> preprocessor : this.dispatchInterceptors) {
            BiFunction<Integer, EventMessage<?>, EventMessage<?>> function = preprocessor.handle(preprocessedEvents);
            for (int i = 0; i < preprocessedEvents.size(); ++i) {
                preprocessedEvents.set(i, function.apply(i, (EventMessage<?>)preprocessedEvents.get(i)));
            }
        }
        return preprocessedEvents;
    }

    private void doWithEvents(Consumer<List<? extends EventMessage<?>>> eventsConsumer, List<? extends EventMessage<?>> events) {
        eventsConsumer.accept(events);
    }

    protected void prepareCommit(List<? extends EventMessage<?>> events) {
        this.eventProcessors.forEach(eventProcessor -> eventProcessor.accept(events));
    }

    protected void commit(List<? extends EventMessage<?>> events) {
    }

    protected void afterCommit(List<? extends EventMessage<?>> events) {
    }

    public static abstract class Builder {
        private MessageMonitor<? super EventMessage<?>> messageMonitor = NoOpMessageMonitor.INSTANCE;
        private SpanFactory spanFactory = NoOpSpanFactory.INSTANCE;

        public Builder messageMonitor(@Nonnull MessageMonitor<? super EventMessage<?>> messageMonitor) {
            BuilderUtils.assertNonNull(messageMonitor, "MessageMonitor may not be null");
            this.messageMonitor = messageMonitor;
            return this;
        }

        public Builder spanFactory(@Nonnull SpanFactory spanFactory) {
            BuilderUtils.assertNonNull(spanFactory, "SpanFactory may not be null");
            this.spanFactory = spanFactory;
            return this;
        }

        protected void validate() throws AxonConfigurationException {
        }
    }
}

