/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.query;

import io.axoniq.axonserver.connector.FlowControl;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.connector.ResultStream;
import io.axoniq.axonserver.connector.ResultStreamPublisher;
import io.axoniq.axonserver.connector.impl.CloseAwareReplyChannel;
import io.axoniq.axonserver.connector.query.QueryDefinition;
import io.axoniq.axonserver.connector.query.QueryHandler;
import io.axoniq.axonserver.connector.query.SubscriptionQueryResult;
import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.query.QueryRequest;
import io.axoniq.axonserver.grpc.query.QueryResponse;
import io.axoniq.axonserver.grpc.query.QueryUpdate;
import io.axoniq.axonserver.grpc.query.SubscriptionQuery;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Type;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Spliterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.axonserver.connector.AxonServerRegistration;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.TargetContextResolver;
import org.axonframework.axonserver.connector.query.AxonServerQueryDispatchException;
import org.axonframework.axonserver.connector.query.QueryProcessingTask;
import org.axonframework.axonserver.connector.query.QuerySerializer;
import org.axonframework.axonserver.connector.query.subscription.SubscriptionMessageSerializer;
import org.axonframework.axonserver.connector.util.ExceptionSerializer;
import org.axonframework.axonserver.connector.util.PriorityTaskSchedulers;
import org.axonframework.axonserver.connector.util.ProcessingInstructionHelper;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.AxonException;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.FutureUtils;
import org.axonframework.common.Registration;
import org.axonframework.common.StringUtils;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.lifecycle.ShutdownLatch;
import org.axonframework.messaging.DefaultMessageDispatchInterceptorChain;
import org.axonframework.messaging.Distributed;
import org.axonframework.messaging.GenericMessage;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.responsetypes.ConvertingResponseMessage;
import org.axonframework.messaging.responsetypes.InstanceResponseType;
import org.axonframework.messaging.responsetypes.MultipleInstancesResponseType;
import org.axonframework.messaging.responsetypes.ResponseType;
import org.axonframework.messaging.unitofwork.ProcessingContext;
import org.axonframework.queryhandling.GenericQueryResponseMessage;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryHandler;
import org.axonframework.queryhandling.QueryHandlerName;
import org.axonframework.queryhandling.QueryMessage;
import org.axonframework.queryhandling.QueryPriorityCalculator;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.QueryUpdateEmitter;
import org.axonframework.queryhandling.StreamingQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryResponseMessages;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.axonframework.queryhandling.UpdateHandler;
import org.axonframework.queryhandling.tracing.DefaultQueryBusSpanFactory;
import org.axonframework.queryhandling.tracing.QueryBusSpanFactory;
import org.axonframework.serialization.Serializer;
import org.axonframework.tracing.NoOpSpanFactory;
import org.axonframework.tracing.Span;
import org.axonframework.tracing.SpanFactory;
import org.axonframework.tracing.SpanScope;
import org.axonframework.util.PriorityRunnable;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;

public class AxonServerQueryBus
implements QueryBus,
Distributed<QueryBus> {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final AtomicLong TASK_SEQUENCE = new AtomicLong(Long.MIN_VALUE);
    private static final int DIRECT_QUERY_NUMBER_OF_RESULTS = 1;
    private static final long DIRECT_QUERY_TIMEOUT_MS = TimeUnit.HOURS.toMillis(1L);
    private static final int SCATTER_GATHER_NUMBER_OF_RESULTS = -1;
    private static final int QUERY_QUEUE_CAPACITY = 1000;
    private final AxonServerConnectionManager axonServerConnectionManager;
    private final AxonServerConfiguration configuration;
    private final QueryUpdateEmitter updateEmitter;
    private final QueryBus localSegment;
    private final QuerySerializer serializer;
    private final SubscriptionMessageSerializer subscriptionSerializer;
    private final QueryPriorityCalculator priorityCalculator;
    private final List<MessageDispatchInterceptor<? super QueryMessage>> dispatchInterceptors;
    private final TargetContextResolver<? super QueryMessage> targetContextResolver;
    private final ShutdownLatch shutdownLatch = new ShutdownLatch();
    private final ExecutorService queryExecutor;
    private final ExecutorService queryResponseExecutor;
    private final LocalSegmentAdapter localSegmentAdapter;
    private final String context;
    private final QueryBusSpanFactory spanFactory;
    private final boolean localSegmentShortCut;
    private final Duration queryInProgressAwait;
    private final Set<String> queryHandlerNames = new CopyOnWriteArraySet<String>();

    public AxonServerQueryBus(Builder builder) {
        builder.validate();
        this.axonServerConnectionManager = builder.axonServerConnectionManager;
        this.configuration = builder.configuration;
        this.updateEmitter = builder.updateEmitter;
        this.localSegment = builder.localSegment;
        this.serializer = builder.buildQuerySerializer();
        this.subscriptionSerializer = builder.buildSubscriptionMessageSerializer();
        this.priorityCalculator = builder.priorityCalculator;
        this.context = StringUtils.nonEmptyOrNull((String)builder.defaultContext) ? builder.defaultContext : this.configuration.getContext();
        this.targetContextResolver = builder.targetContextResolver.orElse(m -> this.context);
        this.spanFactory = builder.spanFactory;
        this.queryInProgressAwait = builder.queryInProgressAwait;
        this.dispatchInterceptors = new CopyOnWriteArrayList<MessageDispatchInterceptor<? super QueryMessage>>();
        PriorityBlockingQueue queryProcessQueue = new PriorityBlockingQueue(1000);
        this.queryExecutor = builder.queryExecutorServiceBuilder.apply(this.configuration, queryProcessQueue);
        PriorityBlockingQueue queryResponseProcessQueue = new PriorityBlockingQueue(1000);
        this.queryResponseExecutor = builder.queryResponseExecutorServiceBuilder.apply(this.configuration, queryResponseProcessQueue);
        this.localSegmentAdapter = new LocalSegmentAdapter();
        this.localSegmentShortCut = builder.localSegmentShortCut;
    }

    public QueryBus subscribe(@Nonnull QueryHandlerName handlerName, @Nonnull QueryHandler queryHandler) {
        return null;
    }

    @Nonnull
    public Publisher<QueryResponseMessage> streamingQuery(@Nonnull StreamingQueryMessage query, @Nullable ProcessingContext context) {
        Span span = this.spanFactory.createStreamingQuerySpan((QueryMessage)query, true).start();
        try (SpanScope unused = span.makeCurrent();){
            StreamingQueryMessage queryWithContext = (StreamingQueryMessage)this.spanFactory.propagateContext((QueryMessage)query);
            int priority = this.priorityCalculator.determinePriority((QueryMessage)queryWithContext);
            AtomicReference<Scheduler> scheduler = new AtomicReference<Scheduler>(PriorityTaskSchedulers.forPriority(this.queryResponseExecutor, priority, TASK_SEQUENCE));
            Flux flux = Mono.fromSupplier(this::registerStreamingQueryActivity).flatMapMany(activity -> new DefaultMessageDispatchInterceptorChain(this.dispatchInterceptors).proceed((Message)queryWithContext, null).first().cast().asMono().map(MessageStream.Entry::message).flatMapMany(intercepted -> {
                if (this.shouldRunQueryLocally(intercepted.type().name())) {
                    return this.localSegment.streamingQuery(intercepted, context);
                }
                return Mono.just((Object)this.serializeStreaming((QueryMessage)intercepted, priority)).flatMapMany(queryRequest -> new ResultStreamPublisher(() -> this.sendRequest((QueryMessage)intercepted, (QueryRequest)queryRequest))).concatMap(queryResponse -> this.deserialize((StreamingQueryMessage)intercepted, (QueryResponse)queryResponse));
            }).publishOn((Scheduler)scheduler.get()).doOnError(arg_0 -> ((Span)span).recordException(arg_0)).doFinally((Consumer)new ActivityFinisher((ShutdownLatch.ActivityHandle)activity, span)).subscribeOn((Scheduler)scheduler.get()));
            return flux;
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    public void start() {
        this.shutdownLatch.initialize();
    }

    public Registration subscribe(@Nonnull String queryName, @Nonnull Type responseType, @Nonnull MessageHandler<? super QueryMessage, ? extends QueryResponseMessage> handler) {
        Registration localRegistration = () -> true;
        QueryDefinition queryDefinition = new QueryDefinition(queryName, responseType);
        io.axoniq.axonserver.connector.Registration serverRegistration = this.axonServerConnectionManager.getConnection(this.context).queryChannel().registerQueryHandler((io.axoniq.axonserver.connector.query.QueryHandler)this.localSegmentAdapter, new QueryDefinition[]{queryDefinition});
        this.queryHandlerNames.add(queryName);
        return new AxonServerRegistration(() -> this.unsubscribe(queryName, localRegistration), () -> ((io.axoniq.axonserver.connector.Registration)serverRegistration).cancel());
    }

    private boolean unsubscribe(String queryName, Registration localSegmentRegistration) {
        boolean result = localSegmentRegistration.cancel();
        if (result) {
            this.queryHandlerNames.remove(queryName);
        }
        return result;
    }

    @Nonnull
    public MessageStream<QueryResponseMessage> query(@Nonnull QueryMessage queryMessage, @Nullable ProcessingContext context) {
        Span span = this.spanFactory.createQuerySpan(queryMessage, true).start();
        try (SpanScope unused = span.makeCurrent();){
            QueryMessage queryWithContext = this.spanFactory.propagateContext(queryMessage);
            Assert.isFalse((boolean)Publisher.class.isAssignableFrom(queryMessage.responseType().getExpectedResponseType()), () -> "The direct query does not support Flux as a return type.");
            this.shutdownLatch.ifShuttingDown("Cannot dispatch new queries as this bus is being shut down");
            QueryMessage interceptedQuery = (QueryMessage)new DefaultMessageDispatchInterceptorChain(this.dispatchInterceptors).proceed((Message)queryWithContext, null).first().cast().asMono().map(MessageStream.Entry::message).block();
            ShutdownLatch.ActivityHandle queryInTransit = this.shutdownLatch.registerActivity();
            CompletableFuture<QueryResponseMessage> queryTransaction = new CompletableFuture<QueryResponseMessage>();
            try {
                if (!this.shouldRunQueryLocally(interceptedQuery.type().name())) {
                    int priority = this.priorityCalculator.determinePriority(interceptedQuery);
                    QueryRequest queryRequest = this.serialize(interceptedQuery, false, priority);
                    ResultStream<QueryResponse> result = this.sendRequest(interceptedQuery, queryRequest);
                    queryTransaction.whenComplete((r, e) -> result.close());
                    Span responseTaskSpan = this.spanFactory.createResponseProcessingSpan(interceptedQuery);
                    ResponseProcessingTask responseProcessingTask = new ResponseProcessingTask(result, this.serializer, queryTransaction, queryMessage.responseType(), responseTaskSpan);
                    result.onAvailable(() -> this.queryResponseExecutor.execute((Runnable)new PriorityRunnable(responseProcessingTask, (long)priority, TASK_SEQUENCE.incrementAndGet())));
                }
            }
            catch (Exception e2) {
                logger.debug("There was a problem issuing a query {}.", (Object)interceptedQuery, (Object)e2);
                AxonException exception = ErrorCode.QUERY_DISPATCH_ERROR.convert(this.configuration.getClientId(), e2);
                queryTransaction.completeExceptionally((Throwable)exception);
                span.recordException((Throwable)e2).end();
            }
            queryTransaction.whenComplete((r, e) -> {
                queryInTransit.end();
                if (e != null) {
                    span.recordException(e);
                }
                if (r != null && r.isExceptional()) {
                    span.recordException(r.exceptionResult());
                }
                span.end();
            });
        }
        return MessageStream.empty().cast();
    }

    private boolean shouldRunQueryLocally(String queryName) {
        return this.localSegmentShortCut && this.queryHandlerNames.contains(queryName);
    }

    private QueryRequest serializeStreaming(QueryMessage query, int priority) {
        return this.serialize(query, true, priority);
    }

    public void describeTo(@Nonnull ComponentDescriptor descriptor) {
    }

    private ShutdownLatch.ActivityHandle registerStreamingQueryActivity() {
        this.shutdownLatch.ifShuttingDown("Cannot dispatch new queries as this bus is being shut down");
        return this.shutdownLatch.registerActivity();
    }

    private QueryRequest serialize(QueryMessage query, boolean stream, int priority) {
        return this.serializer.serializeRequest(query, 1, DIRECT_QUERY_TIMEOUT_MS, priority, stream);
    }

    private ResultStream<QueryResponse> sendRequest(QueryMessage queryMessage, QueryRequest queryRequest) {
        return this.axonServerConnectionManager.getConnection(this.targetContextResolver.resolveContext((QueryMessage)queryMessage)).queryChannel().query(queryRequest);
    }

    private <R> Publisher<QueryResponseMessage> deserialize(StreamingQueryMessage queryMessage, QueryResponse queryResponse) {
        Class expectedResponseType = queryMessage.responseType().getExpectedResponseType();
        QueryResponseMessage responseMessage = this.serializer.deserializeResponse(queryResponse);
        if (responseMessage.isExceptional()) {
            return Flux.error((Throwable)responseMessage.exceptionResult());
        }
        if (expectedResponseType.isAssignableFrom(responseMessage.payloadType())) {
            InstanceResponseType instanceResponseType = new InstanceResponseType(expectedResponseType);
            return Flux.just((Object)new ConvertingResponseMessage((ResponseType)instanceResponseType, responseMessage));
        }
        MultipleInstancesResponseType multiResponseType = new MultipleInstancesResponseType(expectedResponseType);
        ConvertingResponseMessage convertingMessage = new ConvertingResponseMessage((ResponseType)multiResponseType, responseMessage);
        return Flux.fromStream(((List)convertingMessage.payload()).stream().map(payload -> this.singleMessage(responseMessage, payload, expectedResponseType)));
    }

    private <R> QueryResponseMessage singleMessage(QueryResponseMessage original, R newPayload, Class<R> expectedPayloadType) {
        GenericMessage delegate = new GenericMessage(original.identifier(), original.type(), newPayload, expectedPayloadType, (Map)original.metadata());
        return new GenericQueryResponseMessage((Message)delegate);
    }

    @Nonnull
    public SubscriptionQueryResponseMessages subscriptionQuery(@Nonnull SubscriptionQueryMessage query, @Nullable ProcessingContext context, int updateBufferSize) {
        Assert.isFalse((boolean)Publisher.class.isAssignableFrom(query.responseType().getExpectedResponseType()), () -> "The subscription Query query does not support Flux as a return type.");
        Assert.isFalse((boolean)Publisher.class.isAssignableFrom(query.updatesResponseType().getExpectedResponseType()), () -> "The subscription Query query does not support Flux as an update type.");
        this.shutdownLatch.ifShuttingDown(String.format("Cannot dispatch new %s as this bus is being shut down", "subscription queries"));
        Span span = this.spanFactory.createSubscriptionQuerySpan(query, true).start();
        try (SpanScope unused = span.makeCurrent();){
            SubscriptionQueryMessage interceptedQuery = (SubscriptionQueryMessage)new DefaultMessageDispatchInterceptorChain(this.dispatchInterceptors).proceed((Message)this.spanFactory.propagateContext((QueryMessage)((SubscriptionQueryMessage)this.spanFactory.propagateContext((QueryMessage)query))), null).first().cast().asMono().map(MessageStream.Entry::message).block();
            String subscriptionId = interceptedQuery.identifier();
            String targetContext = this.targetContextResolver.resolveContext((QueryMessage)interceptedQuery);
            logger.debug("Subscription Query requested with subscription Id [{}]", (Object)subscriptionId);
            SubscriptionQueryResult result = this.axonServerConnectionManager.getConnection(targetContext).queryChannel().subscriptionQuery(this.subscriptionSerializer.serializeQuery(interceptedQuery), this.subscriptionSerializer.serializeUpdateType(interceptedQuery), Math.max(32, updateBufferSize), Math.max(4, updateBufferSize >> 3));
            SubscriptionQueryResponseMessages subscriptionQueryResponseMessages = null;
            return subscriptionQueryResponseMessages;
        }
    }

    @Nonnull
    public UpdateHandler subscribeToUpdates(@Nonnull SubscriptionQueryMessage query, int updateBufferSize) {
        return null;
    }

    @Nonnull
    public CompletableFuture<Void> emitUpdate(@Nonnull Predicate<SubscriptionQueryMessage> filter, @Nonnull Supplier<SubscriptionQueryUpdateMessage> updateSupplier, @Nullable ProcessingContext context) {
        return null;
    }

    @Nonnull
    public CompletableFuture<Void> completeSubscriptions(@Nonnull Predicate<SubscriptionQueryMessage> filter, @Nullable ProcessingContext context) {
        return null;
    }

    @Nonnull
    public CompletableFuture<Void> completeSubscriptionsExceptionally(@Nonnull Predicate<SubscriptionQueryMessage> filter, @Nonnull Throwable cause, @Nullable ProcessingContext context) {
        return null;
    }

    public QueryBus localSegment() {
        return this.localSegment;
    }

    public Registration registerHandlerInterceptor(@Nonnull MessageHandlerInterceptor<QueryMessage> interceptor) {
        return null;
    }

    @Nonnull
    public Registration registerDispatchInterceptor(@Nonnull MessageDispatchInterceptor<? super QueryMessage> dispatchInterceptor) {
        this.dispatchInterceptors.add(dispatchInterceptor);
        return () -> this.dispatchInterceptors.remove(dispatchInterceptor);
    }

    public void disconnect() {
        if (this.axonServerConnectionManager.isConnected(this.context)) {
            this.axonServerConnectionManager.getConnection(this.context).queryChannel().prepareDisconnect();
        }
        if (!this.localSegmentAdapter.awaitTermination(this.queryInProgressAwait)) {
            logger.info("Awaited termination of queries in progress without success. Going to cancel remaining queries in progress.");
            this.localSegmentAdapter.cancel();
        }
    }

    public CompletableFuture<Void> shutdownDispatching() {
        return this.shutdownLatch.initiateShutdown();
    }

    public static class Builder {
        private AxonServerConnectionManager axonServerConnectionManager;
        private AxonServerConfiguration configuration;
        private QueryBus localSegment;
        private QueryUpdateEmitter updateEmitter;
        private Serializer messageSerializer;
        private Serializer genericSerializer;
        private QueryPriorityCalculator priorityCalculator = QueryPriorityCalculator.defaultCalculator();
        private TargetContextResolver<? super QueryMessage> targetContextResolver = q -> this.configuration.getContext();
        private BiFunction<AxonServerConfiguration, BlockingQueue<Runnable>, ExecutorService> queryExecutorServiceBuilder = (axonServerConfiguration, queue) -> new ThreadPoolExecutor(axonServerConfiguration.getQueryThreads(), axonServerConfiguration.getQueryThreads(), 0L, TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>)queue, (ThreadFactory)new AxonThreadFactory("QueryProcessor"));
        private BiFunction<AxonServerConfiguration, BlockingQueue<Runnable>, ExecutorService> queryResponseExecutorServiceBuilder = (axonServerConfiguration, queue) -> new ThreadPoolExecutor(axonServerConfiguration.getQueryResponseThreads(), axonServerConfiguration.getQueryResponseThreads(), 0L, TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>)queue, (ThreadFactory)new AxonThreadFactory("QueryResponseProcessor"));
        private String defaultContext;
        private QueryBusSpanFactory spanFactory = DefaultQueryBusSpanFactory.builder().spanFactory((SpanFactory)NoOpSpanFactory.INSTANCE).build();
        private boolean localSegmentShortCut;
        private Duration queryInProgressAwait = Duration.ofSeconds(5L);

        public Builder axonServerConnectionManager(AxonServerConnectionManager axonServerConnectionManager) {
            BuilderUtils.assertNonNull((Object)axonServerConnectionManager, (String)"AxonServerConnectionManager may not be null");
            this.axonServerConnectionManager = axonServerConnectionManager;
            return this;
        }

        public Builder configuration(AxonServerConfiguration configuration) {
            BuilderUtils.assertNonNull((Object)configuration, (String)"AxonServerConfiguration may not be null");
            this.configuration = configuration;
            return this;
        }

        public Builder localSegment(QueryBus localSegment) {
            BuilderUtils.assertNonNull((Object)localSegment, (String)"Local QueryBus may not be null");
            this.localSegment = localSegment;
            return this;
        }

        public Builder updateEmitter(QueryUpdateEmitter updateEmitter) {
            BuilderUtils.assertNonNull((Object)updateEmitter, (String)"QueryUpdateEmitter may not be null");
            this.updateEmitter = updateEmitter;
            return this;
        }

        public Builder messageSerializer(Serializer messageSerializer) {
            BuilderUtils.assertNonNull((Object)messageSerializer, (String)"Message Serializer may not be null");
            this.messageSerializer = messageSerializer;
            return this;
        }

        public Builder genericSerializer(Serializer genericSerializer) {
            BuilderUtils.assertNonNull((Object)genericSerializer, (String)"Generic Serializer may not be null");
            this.genericSerializer = genericSerializer;
            return this;
        }

        public Builder priorityCalculator(QueryPriorityCalculator priorityCalculator) {
            BuilderUtils.assertNonNull(this.targetContextResolver, (String)"QueryPriorityCalculator may not be null");
            this.priorityCalculator = priorityCalculator;
            return this;
        }

        public Builder targetContextResolver(TargetContextResolver<? super QueryMessage> targetContextResolver) {
            BuilderUtils.assertNonNull(targetContextResolver, (String)"TargetContextResolver may not be null");
            this.targetContextResolver = targetContextResolver;
            return this;
        }

        @Deprecated
        public Builder executorServiceBuilder(BiFunction<AxonServerConfiguration, BlockingQueue<Runnable>, ExecutorService> executorServiceBuilder) {
            return this.queryExecutorServiceBuilder(executorServiceBuilder);
        }

        public Builder queryExecutorServiceBuilder(BiFunction<AxonServerConfiguration, BlockingQueue<Runnable>, ExecutorService> executorServiceBuilder) {
            BuilderUtils.assertNonNull(executorServiceBuilder, (String)"ExecutorServiceBuilder may not be null");
            this.queryExecutorServiceBuilder = executorServiceBuilder;
            return this;
        }

        public Builder queryResponseExecutorServiceBuilder(BiFunction<AxonServerConfiguration, BlockingQueue<Runnable>, ExecutorService> executorServiceBuilder) {
            BuilderUtils.assertNonNull(executorServiceBuilder, (String)"ExecutorServiceBuilder may not be null");
            this.queryResponseExecutorServiceBuilder = executorServiceBuilder;
            return this;
        }

        public Builder defaultContext(String defaultContext) {
            BuilderUtils.assertNonEmpty((String)defaultContext, (String)"The context may not be null or empty");
            this.defaultContext = defaultContext;
            return this;
        }

        public Builder spanFactory(@Nonnull QueryBusSpanFactory spanFactory) {
            BuilderUtils.assertNonNull((Object)spanFactory, (String)"SpanFactory may not be null");
            this.spanFactory = spanFactory;
            return this;
        }

        public Builder enabledLocalSegmentShortCut() {
            this.localSegmentShortCut = true;
            return this;
        }

        public Builder queryInProgressAwait(@Nonnull Duration queryInProgressAwait) {
            BuilderUtils.assertNonNull((Object)queryInProgressAwait, (String)"Query in progress await timeout may not be null");
            this.queryInProgressAwait = queryInProgressAwait;
            return this;
        }

        public AxonServerQueryBus build() {
            return new AxonServerQueryBus(this);
        }

        protected QuerySerializer buildQuerySerializer() {
            return new QuerySerializer(this.messageSerializer, this.genericSerializer, this.configuration);
        }

        protected SubscriptionMessageSerializer buildSubscriptionMessageSerializer() {
            return new SubscriptionMessageSerializer(this.messageSerializer, this.genericSerializer, this.configuration);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull((Object)this.axonServerConnectionManager, (String)"The AxonServerConnectionManager is a hard requirement and should be provided");
            BuilderUtils.assertNonNull((Object)this.configuration, (String)"The AxonServerConfiguration is a hard requirement and should be provided");
            BuilderUtils.assertNonNull((Object)this.localSegment, (String)"The Local QueryBus is a hard requirement and should be provided");
            BuilderUtils.assertNonNull((Object)this.updateEmitter, (String)"The QueryUpdateEmitter is a hard requirement and should be provided");
            BuilderUtils.assertNonNull((Object)this.messageSerializer, (String)"The Message Serializer is a hard requirement and should be provided");
            BuilderUtils.assertNonNull((Object)this.genericSerializer, (String)"The Generic Serializer is a hard requirement and should be provided");
        }
    }

    private class LocalSegmentAdapter
    implements io.axoniq.axonserver.connector.query.QueryHandler {
        private final Map<String, QueryProcessingTask> queriesInProgress = new ConcurrentHashMap<String, QueryProcessingTask>();

        private LocalSegmentAdapter() {
        }

        public void handle(QueryRequest query, ReplyChannel<QueryResponse> responseHandler) {
            this.stream(query, responseHandler).request(Long.MAX_VALUE);
        }

        public FlowControl stream(QueryRequest query, ReplyChannel<QueryResponse> responseHandler) {
            Runnable onClose = () -> this.queriesInProgress.remove(query.getMessageIdentifier());
            CloseAwareReplyChannel closeAwareReplyChannel = new CloseAwareReplyChannel(responseHandler, onClose);
            long priority = ProcessingInstructionHelper.priority(query.getProcessingInstructionsList());
            final QueryProcessingTask processingTask = new QueryProcessingTask(AxonServerQueryBus.this.localSegment, query, (ReplyChannel<QueryResponse>)closeAwareReplyChannel, AxonServerQueryBus.this.serializer, AxonServerQueryBus.this.configuration.getClientId(), AxonServerQueryBus.this.spanFactory);
            final PriorityRunnable priorityTask = new PriorityRunnable((Runnable)processingTask, priority, TASK_SEQUENCE.incrementAndGet());
            this.queriesInProgress.put(query.getMessageIdentifier(), processingTask);
            AxonServerQueryBus.this.queryExecutor.execute((Runnable)priorityTask);
            return new FlowControl(){

                public void request(long requested) {
                    AxonServerQueryBus.this.queryExecutor.execute((Runnable)new PriorityRunnable(() -> processingTask.request(requested), priorityTask.priority(), TASK_SEQUENCE.incrementAndGet()));
                }

                public void cancel() {
                    AxonServerQueryBus.this.queryExecutor.execute((Runnable)new PriorityRunnable(processingTask::cancel, priorityTask.priority(), TASK_SEQUENCE.incrementAndGet()));
                }
            };
        }

        public io.axoniq.axonserver.connector.Registration registerSubscriptionQuery(SubscriptionQuery query, QueryHandler.UpdateHandler sendUpdate) {
            UpdateHandler updateHandler = AxonServerQueryBus.this.localSegment.subscribeToUpdates(AxonServerQueryBus.this.subscriptionSerializer.deserialize(query), 1024);
            updateHandler.updates().doOnError(e -> {
                ErrorMessage error = ExceptionSerializer.serialize(AxonServerQueryBus.this.configuration.getClientId(), e);
                String errorCode = ErrorCode.getQueryExecutionErrorCode(e).errorCode();
                QueryUpdate queryUpdate = QueryUpdate.newBuilder().setErrorMessage(error).setErrorCode(errorCode).build();
                sendUpdate.sendUpdate(queryUpdate);
                sendUpdate.complete();
            }).doOnComplete(() -> ((QueryHandler.UpdateHandler)sendUpdate).complete()).map(AxonServerQueryBus.this.subscriptionSerializer::serialize).subscribe(arg_0 -> ((QueryHandler.UpdateHandler)sendUpdate).sendUpdate(arg_0));
            return () -> {
                updateHandler.cancel();
                return FutureUtils.emptyCompletedFuture();
            };
        }

        private boolean awaitTermination(Duration timeout) {
            Instant startAwait = Instant.now();
            Instant endAwait = startAwait.plusSeconds(timeout.getSeconds());
            while (Instant.now().isBefore(endAwait) && !this.queriesInProgress.isEmpty()) {
                this.queriesInProgress.values().stream().findFirst().ifPresent(queryInProgress -> {
                    while (Instant.now().isBefore(endAwait) && queryInProgress.resultPending()) {
                        LockSupport.parkNanos(10000000L);
                    }
                });
            }
            return this.queriesInProgress.isEmpty();
        }

        private void cancel() {
            this.queriesInProgress.values().iterator().forEachRemaining(QueryProcessingTask::cancel);
        }
    }

    private static class ResponseProcessingTask<R>
    implements Runnable {
        private final AtomicBoolean singleExecutionCheck = new AtomicBoolean();
        private final ResultStream<QueryResponse> result;
        private final QuerySerializer serializer;
        private final CompletableFuture<QueryResponseMessage> queryTransaction;
        private final ResponseType<R> expectedResponseType;
        private final Span span;

        public ResponseProcessingTask(ResultStream<QueryResponse> result, QuerySerializer serializer, CompletableFuture<QueryResponseMessage> queryTransaction, ResponseType<R> expectedResponseType, Span responseTaskSpan) {
            this.result = result;
            this.serializer = serializer;
            this.queryTransaction = queryTransaction;
            this.expectedResponseType = expectedResponseType;
            this.span = responseTaskSpan;
        }

        @Override
        public void run() {
            if (this.singleExecutionCheck.compareAndSet(false, true)) {
                QueryResponse nextAvailable = (QueryResponse)this.result.nextIfAvailable();
                if (nextAvailable != null) {
                    this.span.run(() -> this.queryTransaction.complete(this.serializer.deserializeResponse(nextAvailable, this.expectedResponseType)));
                } else if (this.result.isClosed() && !this.queryTransaction.isDone()) {
                    Exception exception = (Exception)this.result.getError().map(ErrorCode.QUERY_DISPATCH_ERROR::convert).orElse(new AxonServerQueryDispatchException(ErrorCode.QUERY_DISPATCH_ERROR.errorCode(), "Query did not yield the expected number of results."));
                    this.queryTransaction.completeExceptionally(exception);
                }
            }
        }
    }

    private static class ActivityFinisher
    implements Consumer<SignalType> {
        private final ShutdownLatch.ActivityHandle activity;
        private final Span span;

        private ActivityFinisher(ShutdownLatch.ActivityHandle activity, Span span) {
            this.activity = activity;
            this.span = span;
        }

        @Override
        public void accept(SignalType signalType) {
            this.span.end();
            this.activity.end();
        }
    }

    private static class QueryResponseSpliterator
    implements Spliterator<QueryResponseMessage> {
        private final QueryMessage queryMessage;
        private final ResultStream<QueryResponse> queryResult;
        private final long deadline;
        private final QuerySerializer serializer;
        private final Runnable closeHandler;

        public QueryResponseSpliterator(QueryMessage queryMessage, ResultStream<QueryResponse> queryResult, long deadline, QuerySerializer serializer, Runnable closeHandler) {
            this.queryMessage = queryMessage;
            this.queryResult = queryResult;
            this.deadline = deadline;
            this.serializer = serializer;
            this.closeHandler = closeHandler;
        }

        @Override
        public boolean tryAdvance(Consumer<? super QueryResponseMessage> action) {
            QueryResponse next;
            long remaining = this.deadline - System.currentTimeMillis();
            if (remaining > 0L) {
                try {
                    next = (QueryResponse)this.queryResult.nextIfAvailable(remaining, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    this.closeHandler.run();
                    return false;
                }
            } else {
                next = (QueryResponse)this.queryResult.nextIfAvailable();
            }
            if (next != null) {
                action.accept((QueryResponseMessage)this.serializer.deserializeResponse(next, this.queryMessage.responseType()));
                return true;
            }
            this.queryResult.close();
            this.closeHandler.run();
            return false;
        }

        @Override
        public Spliterator<QueryResponseMessage> trySplit() {
            return null;
        }

        @Override
        public long estimateSize() {
            return Long.MAX_VALUE;
        }

        @Override
        public int characteristics() {
            return 0;
        }
    }
}

