/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling;

import jakarta.annotation.Nonnull;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.DefaultEventProcessorSpanFactory;
import org.axonframework.eventhandling.ErrorContext;
import org.axonframework.eventhandling.ErrorHandler;
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventProcessingException;
import org.axonframework.eventhandling.EventProcessorSpanFactory;
import org.axonframework.eventhandling.PropagatingErrorHandler;
import org.axonframework.eventhandling.Segment;
import org.axonframework.messaging.DefaultInterceptorChain;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.unitofwork.ProcessingContext;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;
import org.axonframework.tracing.NoOpSpanFactory;

public final class EventProcessorOperations {
    private static final List<Segment> ROOT_SEGMENT = Collections.singletonList(Segment.ROOT_SEGMENT);
    private final String name;
    private final EventHandlerInvoker eventHandlerInvoker;
    private final ErrorHandler errorHandler;
    private final MessageMonitor<? super EventMessage<?>> messageMonitor;
    private final List<MessageHandlerInterceptor<? super EventMessage<?>>> interceptors = new CopyOnWriteArrayList();
    private final EventProcessorSpanFactory spanFactory;
    private final boolean streamingProcessor;

    public EventProcessorOperations(Builder builder) {
        builder.validate();
        this.name = builder.name;
        this.eventHandlerInvoker = builder.eventHandlerInvoker;
        this.errorHandler = builder.errorHandler;
        this.messageMonitor = builder.messageMonitor;
        this.spanFactory = builder.spanFactory;
        this.streamingProcessor = builder.streamingProcessor;
    }

    public String name() {
        return this.name;
    }

    public Registration registerHandlerInterceptor(@Nonnull MessageHandlerInterceptor<? super EventMessage<?>> interceptor) {
        this.interceptors.add(interceptor);
        return () -> this.interceptors.remove(interceptor);
    }

    public List<MessageHandlerInterceptor<? super EventMessage<?>>> handlerInterceptors() {
        return Collections.unmodifiableList(this.interceptors);
    }

    public String toString() {
        return this.name();
    }

    public boolean canHandle(EventMessage<?> eventMessage, @Nonnull ProcessingContext context, Segment segment) throws Exception {
        try {
            return this.eventHandlerInvoker.canHandle(eventMessage, context, segment);
        }
        catch (Exception e) {
            this.errorHandler.handleError(new ErrorContext(this.name(), e, Collections.singletonList(eventMessage)));
            return false;
        }
    }

    public boolean canHandleType(Class<?> payloadType) {
        try {
            return this.eventHandlerInvoker.canHandleType(payloadType);
        }
        catch (Exception e) {
            return false;
        }
    }

    public void processInUnitOfWork(List<? extends EventMessage<?>> eventMessages, UnitOfWork unitOfWork) throws Exception {
        this.processInUnitOfWork(eventMessages, unitOfWork, ROOT_SEGMENT).join();
    }

    public CompletableFuture<Void> processInUnitOfWork(List<? extends EventMessage<?>> eventMessages, UnitOfWork unitOfWork, Collection<Segment> processingSegments) throws Exception {
        unitOfWork.onInvocation(processingContext -> {
            CompletionStage<Object> result = CompletableFuture.completedFuture(null);
            for (EventMessage message : eventMessages) {
                result = result.thenCompose(v -> this.spanFactory.createProcessEventSpan(this.streamingProcessor, message).runSupplierAsync(() -> this.processMessage(processingSegments, (ProcessingContext)processingContext, message)));
            }
            return result;
        });
        return this.spanFactory.createBatchSpan(this.streamingProcessor, eventMessages).runSupplierAsync(() -> unitOfWork.execute().exceptionally(e -> {
            try {
                Throwable cause = e instanceof CompletionException ? e.getCause() : e;
                this.errorHandler.handleError(new ErrorContext(this.name(), cause, eventMessages));
            }
            catch (RuntimeException ex) {
                throw ex;
            }
            catch (Exception ex) {
                throw new EventProcessingException("Exception occurred while processing events", ex);
            }
            return null;
        }));
    }

    private MessageStream.Empty<?> processMessageInUnitOfWork(Collection<Segment> processingSegments, EventMessage<?> message, ProcessingContext processingContext, MessageMonitor.MonitorCallback monitorCallback) throws Exception {
        try {
            for (Segment processingSegment : processingSegments) {
                this.eventHandlerInvoker.handle(message, processingContext, processingSegment);
            }
            monitorCallback.reportSuccess();
            return MessageStream.empty();
        }
        catch (Exception exception) {
            monitorCallback.reportFailure(exception);
            throw exception;
        }
    }

    private CompletableFuture<Void> processMessage(Collection<Segment> processingSegments, ProcessingContext processingContext, EventMessage<?> message) {
        try {
            MessageMonitor.MonitorCallback monitorCallback = this.messageMonitor.onMessageIngested(message);
            DefaultInterceptorChain chain = new DefaultInterceptorChain(null, this.interceptors, (msg, ctx) -> this.processMessageInUnitOfWork(processingSegments, (EventMessage<?>)msg, ctx, monitorCallback));
            return chain.proceed(message, processingContext).ignoreEntries().asCompletableFuture().thenApply(e -> null);
        }
        catch (Exception e2) {
            return CompletableFuture.failedFuture(e2);
        }
    }

    public EventHandlerInvoker eventHandlerInvoker() {
        return this.eventHandlerInvoker;
    }

    public void reportIgnored(EventMessage<?> eventMessage) {
        this.messageMonitor.onMessageIngested(eventMessage).reportIgnored();
    }

    public static final class Builder {
        private String name;
        private EventHandlerInvoker eventHandlerInvoker;
        private ErrorHandler errorHandler = PropagatingErrorHandler.INSTANCE;
        private MessageMonitor<? super EventMessage<?>> messageMonitor = NoOpMessageMonitor.INSTANCE;
        private EventProcessorSpanFactory spanFactory = DefaultEventProcessorSpanFactory.builder().spanFactory(NoOpSpanFactory.INSTANCE).build();
        private boolean streamingProcessor = false;

        public Builder name(@Nonnull String name) {
            this.assertEventProcessorName(name, "The EventProcessor name may not be null or empty");
            this.name = name;
            return this;
        }

        public Builder eventHandlerInvoker(@Nonnull EventHandlerInvoker eventHandlerInvoker) {
            BuilderUtils.assertNonNull(eventHandlerInvoker, "EventHandlerInvoker may not be null");
            this.eventHandlerInvoker = eventHandlerInvoker;
            return this;
        }

        public Builder errorHandler(@Nonnull ErrorHandler errorHandler) {
            BuilderUtils.assertNonNull(errorHandler, "ErrorHandler may not be null");
            this.errorHandler = errorHandler;
            return this;
        }

        public Builder messageMonitor(@Nonnull MessageMonitor<? super EventMessage<?>> messageMonitor) {
            BuilderUtils.assertNonNull(messageMonitor, "MessageMonitor may not be null");
            this.messageMonitor = messageMonitor;
            return this;
        }

        public Builder spanFactory(@Nonnull EventProcessorSpanFactory spanFactory) {
            BuilderUtils.assertNonNull(spanFactory, "SpanFactory may not be null");
            this.spanFactory = spanFactory;
            return this;
        }

        public Builder streamingProcessor(boolean streamingProcessor) {
            this.streamingProcessor = streamingProcessor;
            return this;
        }

        private void validate() throws AxonConfigurationException {
            this.assertEventProcessorName(this.name, "The EventProcessor name is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.eventHandlerInvoker, "The EventHandlerInvoker is a hard requirement and should be provided");
        }

        private void assertEventProcessorName(String eventProcessorName, String exceptionMessage) {
            BuilderUtils.assertThat(eventProcessorName, name -> Objects.nonNull(name) && !"".equals(name), exceptionMessage);
        }

        public EventProcessorOperations build() {
            return new EventProcessorOperations(this);
        }
    }
}

