/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.queryhandling;

import jakarta.annotation.Nonnull;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.ObjectUtils;
import org.axonframework.common.Registration;
import org.axonframework.common.transaction.NoTransactionManager;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.messaging.ClassBasedMessageTypeResolver;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.MessageType;
import org.axonframework.messaging.MessageTypeResolver;
import org.axonframework.messaging.ResultMessage;
import org.axonframework.messaging.responsetypes.ResponseType;
import org.axonframework.messaging.unitofwork.LegacyDefaultUnitOfWork;
import org.axonframework.messaging.unitofwork.LegacyUnitOfWork;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;
import org.axonframework.queryhandling.DefaultQueryBusSpanFactory;
import org.axonframework.queryhandling.DefaultQueryUpdateEmitterSpanFactory;
import org.axonframework.queryhandling.DefaultSubscriptionQueryResult;
import org.axonframework.queryhandling.GenericQueryResponseMessage;
import org.axonframework.queryhandling.LoggingQueryInvocationErrorHandler;
import org.axonframework.queryhandling.NoHandlerForQueryException;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryBusSpanFactory;
import org.axonframework.queryhandling.QueryExecutionException;
import org.axonframework.queryhandling.QueryInvocationErrorHandler;
import org.axonframework.queryhandling.QueryMessage;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.QuerySubscription;
import org.axonframework.queryhandling.QueryUpdateEmitter;
import org.axonframework.queryhandling.SimpleQueryUpdateEmitter;
import org.axonframework.queryhandling.StreamingQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryResult;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.axonframework.queryhandling.UpdateHandlerRegistration;
import org.axonframework.queryhandling.registration.DuplicateQueryHandlerResolution;
import org.axonframework.queryhandling.registration.DuplicateQueryHandlerResolver;
import org.axonframework.tracing.NoOpSpanFactory;
import org.axonframework.tracing.Span;
import org.axonframework.tracing.SpanScope;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
import reactor.util.context.Context;

public class SimpleQueryBus
implements QueryBus {
    private static final Logger logger = LoggerFactory.getLogger(SimpleQueryBus.class);
    private final ConcurrentMap<String, List<QuerySubscription<?>>> subscriptions = new ConcurrentHashMap();
    private final MessageMonitor<? super QueryMessage> messageMonitor;
    private final DuplicateQueryHandlerResolver duplicateQueryHandlerResolver;
    private final QueryInvocationErrorHandler errorHandler;
    private final List<MessageHandlerInterceptor<QueryMessage>> handlerInterceptors = new CopyOnWriteArrayList<MessageHandlerInterceptor<QueryMessage>>();
    private final List<MessageDispatchInterceptor<? super QueryMessage>> dispatchInterceptors = new CopyOnWriteArrayList<MessageDispatchInterceptor<? super QueryMessage>>();
    private final QueryBusSpanFactory spanFactory;
    private final MessageTypeResolver messageTypeResolver;
    private final QueryUpdateEmitter queryUpdateEmitter;

    protected SimpleQueryBus(Builder builder) {
        builder.validate();
        this.messageMonitor = builder.messageMonitor;
        this.errorHandler = builder.errorHandler;
        this.queryUpdateEmitter = builder.queryUpdateEmitter;
        this.duplicateQueryHandlerResolver = builder.duplicateQueryHandlerResolver;
        this.spanFactory = builder.spanFactory;
        this.messageTypeResolver = builder.messageTypeResolver;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override
    public Registration subscribe(@Nonnull String queryName, @Nonnull Type responseType, @Nonnull MessageHandler<? super QueryMessage, ? extends QueryResponseMessage> handler) {
        QuerySubscription querySubscription = new QuerySubscription(responseType, handler);
        List handlers = this.subscriptions.computeIfAbsent(queryName, k -> new CopyOnWriteArrayList());
        if (handlers.contains(querySubscription)) {
            return () -> this.unsubscribe(queryName, querySubscription);
        }
        List<QuerySubscription<?>> existingHandlers = handlers.stream().filter(q -> q.getResponseType().equals(responseType)).collect(Collectors.toList());
        if (existingHandlers.isEmpty()) {
            handlers.add(querySubscription);
        } else {
            List<QuerySubscription<?>> resolvedHandlers = this.duplicateQueryHandlerResolver.resolve(queryName, responseType, existingHandlers, querySubscription);
            this.subscriptions.put(queryName, resolvedHandlers);
        }
        return () -> this.unsubscribe(queryName, querySubscription);
    }

    private <R> boolean unsubscribe(String queryName, QuerySubscription<R> querySubscription) {
        this.subscriptions.computeIfPresent(queryName, (key, handlers) -> {
            handlers.remove(querySubscription);
            if (handlers.isEmpty()) {
                return null;
            }
            return handlers;
        });
        return true;
    }

    @Override
    public CompletableFuture<QueryResponseMessage> query(@Nonnull QueryMessage query) {
        Span span = this.spanFactory.createQuerySpan(query, false);
        return span.runSupplier(() -> this.doQuery(query).whenComplete((r, t) -> {
            if (t != null) {
                span.recordException((Throwable)t);
            }
        }));
    }

    @Nonnull
    private CompletableFuture<QueryResponseMessage> doQuery(@Nonnull QueryMessage query) {
        Assert.isFalse(Publisher.class.isAssignableFrom(query.responseType().getExpectedResponseType()), () -> "Direct query does not support Flux as a return type.");
        MessageMonitor.MonitorCallback monitorCallback = this.messageMonitor.onMessageIngested(query);
        QueryMessage interceptedQuery = this.intercept(query);
        List<MessageHandler<? super QueryMessage, ? extends QueryResponseMessage>> handlers = this.getHandlersForMessage(interceptedQuery);
        CompletableFuture result = new CompletableFuture();
        try {
            ResponseType<?> responseType = interceptedQuery.responseType();
            if (handlers.isEmpty()) {
                throw this.noHandlerException(interceptedQuery);
            }
            Iterator<MessageHandler<? super QueryMessage, ? extends QueryResponseMessage>> handlerIterator = handlers.iterator();
            boolean invocationSuccess = false;
            while (!invocationSuccess && handlerIterator.hasNext()) {
                LegacyDefaultUnitOfWork<QueryMessage> uow = LegacyDefaultUnitOfWork.startAndGet(interceptedQuery);
                ResultMessage resultMessage = this.interceptAndInvoke(uow, handlerIterator.next());
                if (resultMessage.isExceptional()) {
                    if (resultMessage.exceptionResult() instanceof NoHandlerForQueryException) continue;
                    GenericQueryResponseMessage queryResponseMessage = responseType.convertExceptional(resultMessage.exceptionResult()).map(exceptionalResult -> new GenericQueryResponseMessage(this.messageTypeResolver.resolveOrThrow(exceptionalResult), exceptionalResult)).orElse(new GenericQueryResponseMessage(this.messageTypeResolver.resolveOrThrow(resultMessage.exceptionResult()), resultMessage.exceptionResult(), responseType.responseMessagePayloadType()));
                    result.complete(queryResponseMessage);
                    monitorCallback.reportFailure(resultMessage.exceptionResult());
                    return result;
                }
                result = (CompletableFuture)resultMessage.payload();
                invocationSuccess = true;
            }
            if (!invocationSuccess) {
                throw SimpleQueryBus.noSuitableHandlerException(interceptedQuery);
            }
            monitorCallback.reportSuccess();
        }
        catch (Exception e) {
            result.completeExceptionally(e);
            monitorCallback.reportFailure(e);
        }
        return result;
    }

    @Override
    public Publisher<QueryResponseMessage> streamingQuery(StreamingQueryMessage query) {
        Span span = this.spanFactory.createStreamingQuerySpan(query, false).start();
        try (SpanScope unused = span.makeCurrent();){
            AtomicReference lastError = new AtomicReference();
            Flux flux = Mono.just((Object)this.intercept(query)).flatMapMany(interceptedQuery -> Mono.just((Object)interceptedQuery).flatMapMany(this::getStreamingHandlersForMessage).switchIfEmpty((Publisher)Flux.error((Throwable)this.noHandlerException((QueryMessage)interceptedQuery))).map(handler -> this.interceptAndInvokeStreaming((StreamingQueryMessage)interceptedQuery, (MessageHandler<? super StreamingQueryMessage, ? extends QueryResponseMessage>)handler, span)).flatMap((Function)new CatchLastError(lastError)).doOnEach((Consumer)new ErrorIfComplete(lastError, (StreamingQueryMessage)interceptedQuery)).next().doOnEach((Consumer)new SuccessReporter()).flatMapMany(m -> (Publisher)m.payload())).contextWrite((Function)new MonitorCallbackContextWriter(this.messageMonitor, query)).doOnTerminate(span::end);
            return flux;
        }
    }

    private NoHandlerForQueryException noHandlerException(QueryMessage intercepted) {
        return new NoHandlerForQueryException(String.format("No handler found for [%s] with response type [%s]", intercepted.type(), intercepted.responseType()));
    }

    private static NoHandlerForQueryException noSuitableHandlerException(QueryMessage intercepted) {
        return new NoHandlerForQueryException(String.format("No suitable handler was found for [%s] with response type [%s]", intercepted.type(), intercepted.responseType()));
    }

    @Override
    public Stream<QueryResponseMessage> scatterGather(@Nonnull QueryMessage query, long timeout, @Nonnull TimeUnit unit) {
        Assert.isFalse(Publisher.class.isAssignableFrom(query.responseType().getExpectedResponseType()), () -> "Scatter-Gather query does not support Flux as a return type.");
        MessageMonitor.MonitorCallback monitorCallback = this.messageMonitor.onMessageIngested(query);
        QueryMessage interceptedQuery = this.intercept(query);
        List<MessageHandler<? super QueryMessage, ? extends QueryResponseMessage>> handlers = this.getHandlersForMessage(interceptedQuery);
        if (handlers.isEmpty()) {
            monitorCallback.reportIgnored();
            return Stream.empty();
        }
        return this.spanFactory.createScatterGatherSpan(query, false).runSupplier(() -> {
            long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
            List spans = handlers.stream().map(handler -> {
                int handlerIndex = handlers.indexOf(handler);
                return this.spanFactory.createScatterGatherHandlerSpan(query, handlerIndex);
            }).collect(Collectors.toList());
            return handlers.stream().map(handler -> {
                Span span = (Span)spans.get(handlers.indexOf(handler));
                return span.runSupplier(() -> this.scatterGatherHandler(span, monitorCallback, interceptedQuery, deadline, (MessageHandler<? super QueryMessage, ? extends QueryResponseMessage>)handler));
            }).filter(Objects::nonNull);
        });
    }

    private QueryResponseMessage scatterGatherHandler(Span span, MessageMonitor.MonitorCallback monitorCallback, QueryMessage interceptedQuery, long deadline, MessageHandler<? super QueryMessage, ? extends QueryResponseMessage> handler) {
        long leftTimeout = ObjectUtils.getRemainingOfDeadline(deadline);
        ResultMessage resultMessage = this.interceptAndInvoke(LegacyDefaultUnitOfWork.startAndGet(interceptedQuery), handler);
        QueryResponseMessage response = null;
        if (resultMessage.isExceptional()) {
            monitorCallback.reportFailure(resultMessage.exceptionResult());
            span.recordException(resultMessage.exceptionResult());
            this.errorHandler.onError(resultMessage.exceptionResult(), interceptedQuery, handler);
        } else {
            try {
                response = (QueryResponseMessage)((CompletableFuture)resultMessage.payload()).get(leftTimeout, TimeUnit.MILLISECONDS);
                monitorCallback.reportSuccess();
            }
            catch (Exception e) {
                span.recordException(e);
                monitorCallback.reportFailure(e);
                this.errorHandler.onError(e, interceptedQuery, handler);
            }
        }
        return response;
    }

    @Override
    public <Q, I, U> SubscriptionQueryResult<QueryResponseMessage, SubscriptionQueryUpdateMessage> subscriptionQuery(@Nonnull SubscriptionQueryMessage<Q, I, U> query, int updateBufferSize) {
        this.assertSubQueryResponseTypes(query);
        if (this.queryUpdateEmitter.queryUpdateHandlerRegistered(query)) {
            throw new IllegalArgumentException("There is already a subscription with the given message identifier");
        }
        Mono initialResult = Mono.fromFuture(() -> this.query(query)).doOnError(error -> logger.error("An error happened while trying to report an initial result. Query: {}", (Object)query, error));
        UpdateHandlerRegistration updateHandlerRegistration = this.queryUpdateEmitter.registerUpdateHandler(query, updateBufferSize);
        return this.getSubscriptionQueryResult((Publisher<QueryResponseMessage>)initialResult, updateHandlerRegistration);
    }

    private <Q, I, U> void assertSubQueryResponseTypes(SubscriptionQueryMessage<Q, I, U> query) {
        Assert.isFalse(Publisher.class.isAssignableFrom(query.responseType().getExpectedResponseType()), () -> "Subscription Query query does not support Flux as a return type.");
        Assert.isFalse(Publisher.class.isAssignableFrom(query.updatesResponseType().getExpectedResponseType()), () -> "Subscription Query query does not support Flux as an update type.");
    }

    private DefaultSubscriptionQueryResult<QueryResponseMessage, SubscriptionQueryUpdateMessage> getSubscriptionQueryResult(Publisher<QueryResponseMessage> initialResult, UpdateHandlerRegistration updateHandlerRegistration) {
        return new DefaultSubscriptionQueryResult<QueryResponseMessage, SubscriptionQueryUpdateMessage>(Mono.from(initialResult), updateHandlerRegistration.getUpdates(), () -> {
            updateHandlerRegistration.complete();
            return true;
        });
    }

    @Override
    public QueryUpdateEmitter queryUpdateEmitter() {
        return this.queryUpdateEmitter;
    }

    private ResultMessage interceptAndInvoke(LegacyUnitOfWork<QueryMessage> uow, MessageHandler<? super QueryMessage, ? extends QueryResponseMessage> handler) {
        return uow.executeWithResult(ctx -> {
            ResponseType<?> responseType = ((QueryMessage)uow.getMessage()).responseType();
            Object queryResponse = handler.handleSync((QueryMessage)uow.getMessage(), ctx);
            if (queryResponse instanceof CompletableFuture) {
                return ((CompletableFuture)queryResponse).thenCompose(result -> this.buildCompletableFuture(responseType, result));
            }
            if (queryResponse instanceof Future) {
                return CompletableFuture.supplyAsync(() -> {
                    try {
                        return this.asNullableResponseMessage(responseType.responseMessagePayloadType(), responseType.convert(((Future)queryResponse).get()));
                    }
                    catch (InterruptedException | ExecutionException e) {
                        throw new QueryExecutionException("Error happened while trying to execute query handler", e);
                    }
                });
            }
            return this.buildCompletableFuture(responseType, queryResponse);
        });
    }

    private <R> QueryResponseMessage asNullableResponseMessage(Class<R> declaredType, Object result) {
        if (result instanceof QueryResponseMessage) {
            return (QueryResponseMessage)result;
        }
        if (result instanceof ResultMessage) {
            ResultMessage resultMessage = (ResultMessage)result;
            if (resultMessage.isExceptional()) {
                Throwable cause = resultMessage.exceptionResult();
                return new GenericQueryResponseMessage(this.messageTypeResolver.resolveOrThrow(cause), cause, (Class<?>)declaredType, (Map<String, ?>)resultMessage.metaData());
            }
            return new GenericQueryResponseMessage(this.messageTypeResolver.resolveOrThrow(resultMessage.payload()), resultMessage.payload(), (Map<String, String>)resultMessage.metaData());
        }
        if (result instanceof Message) {
            Message message = (Message)result;
            return new GenericQueryResponseMessage(this.messageTypeResolver.resolveOrThrow(message.payload()), message.payload(), (Map<String, String>)message.metaData());
        }
        MessageType type = this.messageTypeResolver.resolveOrThrow(ObjectUtils.nullSafeTypeOf(result));
        return new GenericQueryResponseMessage(type, result, declaredType);
    }

    private ResultMessage interceptAndInvokeStreaming(StreamingQueryMessage query, MessageHandler<? super StreamingQueryMessage, ? extends QueryResponseMessage> handler, Span span) {
        try (SpanScope unused = span.makeCurrent();){
            LegacyDefaultUnitOfWork<StreamingQueryMessage> uow = LegacyDefaultUnitOfWork.startAndGet(query);
            ResultMessage resultMessage = uow.executeWithResult(ctx -> {
                Object queryResponse = handler.handleSync((StreamingQueryMessage)uow.getMessage(), ctx);
                return Flux.from(query.responseType().convert(queryResponse)).map(this::asResponseMessage);
            });
            return resultMessage;
        }
    }

    @Deprecated
    private QueryResponseMessage asResponseMessage(Object result) {
        if (result instanceof QueryResponseMessage) {
            QueryResponseMessage qrm = (QueryResponseMessage)result;
            return qrm;
        }
        if (result instanceof ResultMessage) {
            ResultMessage resultMessage = (ResultMessage)result;
            return new GenericQueryResponseMessage(this.messageTypeResolver.resolveOrThrow(resultMessage.payload()), resultMessage.payload(), (Map<String, String>)resultMessage.metaData());
        }
        if (result instanceof Message) {
            Message message = (Message)result;
            return new GenericQueryResponseMessage(this.messageTypeResolver.resolveOrThrow(message.payload()), message.payload(), (Map<String, String>)message.metaData());
        }
        return new GenericQueryResponseMessage(this.messageTypeResolver.resolveOrThrow(result), result);
    }

    private <R> CompletableFuture<QueryResponseMessage> buildCompletableFuture(ResponseType<R> responseType, Object queryResponse) {
        return CompletableFuture.completedFuture(this.asNullableResponseMessage(responseType.responseMessagePayloadType(), responseType.convert(queryResponse)));
    }

    private <Q, R, T extends QueryMessage> T intercept(T query) {
        return query;
    }

    protected Map<String, Collection<QuerySubscription<?>>> getSubscriptions() {
        return Collections.unmodifiableMap(this.subscriptions);
    }

    public Registration registerHandlerInterceptor(@Nonnull MessageHandlerInterceptor<QueryMessage> interceptor) {
        this.handlerInterceptors.add(interceptor);
        return () -> this.handlerInterceptors.remove(interceptor);
    }

    @Nonnull
    public Registration registerDispatchInterceptor(@Nonnull MessageDispatchInterceptor<? super QueryMessage> interceptor) {
        this.dispatchInterceptors.add(interceptor);
        return () -> this.dispatchInterceptors.remove(interceptor);
    }

    private List<MessageHandler<? super QueryMessage, ? extends QueryResponseMessage>> getHandlersForMessage(QueryMessage queryMessage) {
        ResponseType<?> responseType = queryMessage.responseType();
        return this.subscriptions.computeIfAbsent(queryMessage.type().name(), k -> new CopyOnWriteArrayList()).stream().collect(Collectors.groupingBy(querySubscription -> responseType.matchRank(querySubscription.getResponseType()), Collectors.mapping(Function.identity(), Collectors.toList()))).entrySet().stream().filter(entry -> (Integer)entry.getKey() != 0).sorted((entry1, entry2) -> (Integer)entry2.getKey() - (Integer)entry1.getKey()).map(Map.Entry::getValue).flatMap(Collection::stream).map(QuerySubscription::getQueryHandler).map(queryHandler -> queryHandler).collect(Collectors.toList());
    }

    private Publisher<MessageHandler<? super QueryMessage, ? extends QueryResponseMessage>> getStreamingHandlersForMessage(StreamingQueryMessage queryMessage) {
        return Flux.fromIterable(this.getHandlersForMessage(queryMessage));
    }

    public static class Builder {
        private MessageMonitor<? super QueryMessage> messageMonitor = NoOpMessageMonitor.INSTANCE;
        private TransactionManager transactionManager = NoTransactionManager.instance();
        private QueryInvocationErrorHandler errorHandler = LoggingQueryInvocationErrorHandler.builder().logger(logger).build();
        private DuplicateQueryHandlerResolver duplicateQueryHandlerResolver = DuplicateQueryHandlerResolution.logAndAccept();
        private QueryUpdateEmitter queryUpdateEmitter = SimpleQueryUpdateEmitter.builder().spanFactory(DefaultQueryUpdateEmitterSpanFactory.builder().spanFactory(NoOpSpanFactory.INSTANCE).build()).build();
        private QueryBusSpanFactory spanFactory = DefaultQueryBusSpanFactory.builder().spanFactory(NoOpSpanFactory.INSTANCE).build();
        private MessageTypeResolver messageTypeResolver = new ClassBasedMessageTypeResolver();

        public Builder messageMonitor(@Nonnull MessageMonitor<? super QueryMessage> messageMonitor) {
            BuilderUtils.assertNonNull(messageMonitor, "MessageMonitor may not be null");
            this.messageMonitor = messageMonitor;
            return this;
        }

        public Builder duplicateQueryHandlerResolver(DuplicateQueryHandlerResolver duplicateQueryHandlerResolver) {
            BuilderUtils.assertNonNull(duplicateQueryHandlerResolver, "DuplicateQueryHandlerResolver may not be null");
            this.duplicateQueryHandlerResolver = duplicateQueryHandlerResolver;
            return this;
        }

        public Builder transactionManager(@Nonnull TransactionManager transactionManager) {
            BuilderUtils.assertNonNull(transactionManager, "TransactionManager may not be null");
            this.transactionManager = transactionManager;
            return this;
        }

        public Builder errorHandler(@Nonnull QueryInvocationErrorHandler errorHandler) {
            BuilderUtils.assertNonNull(errorHandler, "QueryInvocationErrorHandler may not be null");
            this.errorHandler = errorHandler;
            return this;
        }

        public Builder queryUpdateEmitter(@Nonnull QueryUpdateEmitter queryUpdateEmitter) {
            BuilderUtils.assertNonNull(queryUpdateEmitter, "QueryUpdateEmitter may not be null");
            this.queryUpdateEmitter = queryUpdateEmitter;
            return this;
        }

        public Builder spanFactory(@Nonnull QueryBusSpanFactory spanFactory) {
            BuilderUtils.assertNonNull(spanFactory, "SpanFactory may not be null");
            this.spanFactory = spanFactory;
            return this;
        }

        public Builder messageNameResolver(MessageTypeResolver messageTypeResolver) {
            BuilderUtils.assertNonNull(messageTypeResolver, "MessageNameResolver may not be null");
            this.messageTypeResolver = messageTypeResolver;
            return this;
        }

        public SimpleQueryBus build() {
            return new SimpleQueryBus(this);
        }

        protected void validate() throws AxonConfigurationException {
        }
    }

    private static class MonitorCallbackContextWriter
    implements UnaryOperator<Context> {
        private final MessageMonitor<? super QueryMessage> messageMonitor;
        private final StreamingQueryMessage query;

        private MonitorCallbackContextWriter(MessageMonitor<? super QueryMessage> messageMonitor, StreamingQueryMessage query) {
            this.messageMonitor = messageMonitor;
            this.query = query;
        }

        @Override
        public Context apply(Context ctx) {
            return ctx.put(MessageMonitor.MonitorCallback.class, (Object)this.messageMonitor.onMessageIngested(this.query));
        }
    }

    private static class CatchLastError
    implements Function<ResultMessage, Mono<ResultMessage>> {
        private final AtomicReference<Throwable> lastError;

        private CatchLastError(AtomicReference<Throwable> lastError) {
            this.lastError = lastError;
        }

        @Override
        public Mono<ResultMessage> apply(ResultMessage resultMessage) {
            if (resultMessage.isExceptional()) {
                this.lastError.set(resultMessage.exceptionResult());
                return Mono.empty();
            }
            return Mono.just((Object)resultMessage);
        }
    }

    private static class ErrorIfComplete
    implements Consumer<Signal<?>> {
        private final AtomicReference<Throwable> lastError;
        private final StreamingQueryMessage interceptedQuery;

        private ErrorIfComplete(AtomicReference<Throwable> lastError, StreamingQueryMessage interceptedQuery) {
            this.lastError = lastError;
            this.interceptedQuery = interceptedQuery;
        }

        @Override
        public void accept(Signal signal) {
            if (signal.isOnComplete()) {
                Throwable throwable = this.lastError.get();
                if (Objects.isNull(throwable)) {
                    throw SimpleQueryBus.noSuitableHandlerException(this.interceptedQuery);
                }
                throw new QueryExecutionException("Error starting stream", throwable);
            }
        }
    }

    private static class SuccessReporter
    implements Consumer<Signal<?>> {
        private SuccessReporter() {
        }

        @Override
        public void accept(Signal signal) {
            MessageMonitor.MonitorCallback m = (MessageMonitor.MonitorCallback)signal.getContextView().get(MessageMonitor.MonitorCallback.class);
            if (signal.isOnNext()) {
                m.reportSuccess();
            } else if (signal.isOnError()) {
                m.reportFailure(signal.getThrowable());
            }
        }
    }
}

