/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.query;

import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.query.QueryComplete;
import io.axoniq.axonserver.grpc.query.QueryProviderInbound;
import io.axoniq.axonserver.grpc.query.QueryProviderOutbound;
import io.axoniq.axonserver.grpc.query.QueryRequest;
import io.axoniq.axonserver.grpc.query.QueryResponse;
import io.axoniq.axonserver.grpc.query.QueryServiceGrpc;
import io.axoniq.axonserver.grpc.query.QuerySubscription;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import io.netty.util.internal.OutOfDirectMemoryError;
import java.lang.reflect.Type;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.axonserver.connector.DispatchInterceptors;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.TargetContextResolver;
import org.axonframework.axonserver.connector.command.AxonServerRegistration;
import org.axonframework.axonserver.connector.query.QueryPriorityCalculator;
import org.axonframework.axonserver.connector.query.QuerySerializer;
import org.axonframework.axonserver.connector.query.subscription.AxonServerSubscriptionQueryResult;
import org.axonframework.axonserver.connector.query.subscription.DeserializedResult;
import org.axonframework.axonserver.connector.query.subscription.SubscriptionMessageSerializer;
import org.axonframework.axonserver.connector.query.subscription.SubscriptionQueryRequestTarget;
import org.axonframework.axonserver.connector.util.BufferingSpliterator;
import org.axonframework.axonserver.connector.util.ExceptionSerializer;
import org.axonframework.axonserver.connector.util.ExecutorServiceBuilder;
import org.axonframework.axonserver.connector.util.FlowControllingStreamObserver;
import org.axonframework.axonserver.connector.util.ProcessingInstructionHelper;
import org.axonframework.axonserver.connector.util.ResubscribableStreamObserver;
import org.axonframework.axonserver.connector.util.UpstreamAwareStreamObserver;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryMessage;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.QueryUpdateEmitter;
import org.axonframework.queryhandling.SubscriptionQueryBackpressure;
import org.axonframework.queryhandling.SubscriptionQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryResult;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.axonframework.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AxonServerQueryBus
implements QueryBus {
    private static final Logger logger = LoggerFactory.getLogger(AxonServerQueryBus.class);
    private static final int DIRECT_QUERY_NUMBER_OF_RESULTS = 1;
    private static final long DIRECT_QUERY_TIMEOUT_MS = TimeUnit.HOURS.toMillis(1L);
    private static final int SCATTER_GATHER_NUMBER_OF_RESULTS = -1;
    private final AxonServerConnectionManager axonServerConnectionManager;
    private final AxonServerConfiguration configuration;
    private final QueryUpdateEmitter updateEmitter;
    private final QueryBus localSegment;
    private final QuerySerializer serializer;
    private final SubscriptionMessageSerializer subscriptionSerializer;
    private final QueryPriorityCalculator priorityCalculator;
    private final QueryProcessor queryProcessor;
    private final DispatchInterceptors<QueryMessage<?, ?>> dispatchInterceptors;
    private final Map<String, Set<String>> subscriptions = new ConcurrentHashMap<String, Set<String>>();
    private final Map<QueryProviderInbound.RequestCase, Collection<Consumer<QueryProviderInbound>>> queryHandlers;
    private final TargetContextResolver<? super QueryMessage<?, ?>> targetContextResolver;

    @Deprecated
    public AxonServerQueryBus(AxonServerConnectionManager axonServerConnectionManager, AxonServerConfiguration configuration, QueryUpdateEmitter updateEmitter, QueryBus localSegment, Serializer messageSerializer, Serializer genericSerializer, QueryPriorityCalculator priorityCalculator) {
        this(axonServerConnectionManager, configuration, updateEmitter, localSegment, messageSerializer, genericSerializer, priorityCalculator, q -> configuration.getContext());
    }

    @Deprecated
    public AxonServerQueryBus(AxonServerConnectionManager axonServerConnectionManager, AxonServerConfiguration configuration, QueryUpdateEmitter updateEmitter, QueryBus localSegment, Serializer messageSerializer, Serializer genericSerializer, QueryPriorityCalculator priorityCalculator, TargetContextResolver<? super QueryMessage<?, ?>> targetContextResolver) {
        this.axonServerConnectionManager = axonServerConnectionManager;
        this.configuration = configuration;
        this.updateEmitter = updateEmitter;
        this.localSegment = localSegment;
        this.serializer = new QuerySerializer(messageSerializer, genericSerializer, configuration);
        this.subscriptionSerializer = new SubscriptionMessageSerializer(messageSerializer, genericSerializer, configuration);
        this.priorityCalculator = priorityCalculator;
        String context = configuration.getContext();
        this.targetContextResolver = targetContextResolver.orElse(m -> context);
        this.queryProcessor = new QueryProcessor(context, configuration, ExecutorServiceBuilder.defaultQueryExecutorServiceBuilder());
        this.dispatchInterceptors = new DispatchInterceptors();
        this.queryHandlers = new EnumMap<QueryProviderInbound.RequestCase, Collection<Consumer<QueryProviderInbound>>>(QueryProviderInbound.RequestCase.class);
        this.axonServerConnectionManager.addReconnectListener(context, () -> this.queryProcessor.resubscribe());
        this.axonServerConnectionManager.addReconnectInterceptor(this::interceptReconnectRequest);
        this.axonServerConnectionManager.addDisconnectListener(context, () -> this.queryProcessor.unsubscribeAll());
        this.axonServerConnectionManager.addDisconnectListener(this::onApplicationDisconnected);
        SubscriptionQueryRequestTarget target = new SubscriptionQueryRequestTarget(localSegment, qpo -> this.publish(context, (QueryProviderOutbound)qpo), this.subscriptionSerializer);
        this.on(QueryProviderInbound.RequestCase.SUBSCRIPTION_QUERY_REQUEST, target::onSubscriptionQueryRequest);
        this.axonServerConnectionManager.addDisconnectListener(target::onApplicationDisconnected);
    }

    public AxonServerQueryBus(Builder builder) {
        builder.validate();
        this.axonServerConnectionManager = builder.axonServerConnectionManager;
        this.configuration = builder.configuration;
        this.updateEmitter = builder.updateEmitter;
        this.localSegment = builder.localSegment;
        this.serializer = builder.buildQuerySerializer();
        this.subscriptionSerializer = builder.buildSubscriptionMessageSerializer();
        this.priorityCalculator = builder.priorityCalculator;
        String context = this.configuration.getContext();
        this.targetContextResolver = builder.targetContextResolver.orElse(m -> context);
        this.queryProcessor = new QueryProcessor(context, this.configuration, builder.executorServiceBuilder);
        this.dispatchInterceptors = new DispatchInterceptors();
        this.queryHandlers = new EnumMap<QueryProviderInbound.RequestCase, Collection<Consumer<QueryProviderInbound>>>(QueryProviderInbound.RequestCase.class);
        this.axonServerConnectionManager.addReconnectListener(context, () -> this.queryProcessor.resubscribe());
        this.axonServerConnectionManager.addReconnectInterceptor(this::interceptReconnectRequest);
        this.axonServerConnectionManager.addDisconnectListener(context, () -> this.queryProcessor.unsubscribeAll());
        this.axonServerConnectionManager.addDisconnectListener(this::onApplicationDisconnected);
        SubscriptionQueryRequestTarget target = new SubscriptionQueryRequestTarget(this.localSegment, qpo -> this.publish(context, (QueryProviderOutbound)qpo), this.subscriptionSerializer);
        this.on(QueryProviderInbound.RequestCase.SUBSCRIPTION_QUERY_REQUEST, target::onSubscriptionQueryRequest);
        this.axonServerConnectionManager.addDisconnectListener(target::onApplicationDisconnected);
    }

    public static Builder builder() {
        return new Builder();
    }

    private Consumer<String> interceptReconnectRequest(Consumer<String> reconnect) {
        if (this.subscriptions.isEmpty()) {
            return reconnect;
        }
        return c -> logger.info("Reconnect for context [{}] refused because there are active subscription queries.", c);
    }

    private void onApplicationDisconnected(String context) {
        this.subscriptions.remove(context);
    }

    public <R> Registration subscribe(String queryName, Type responseType, MessageHandler<? super QueryMessage<?, R>> handler) {
        return new AxonServerRegistration(this.queryProcessor.subscribe(queryName, responseType, this.configuration.getComponentName(), handler), () -> this.queryProcessor.unsubscribe(queryName, responseType, this.configuration.getComponentName()));
    }

    public <Q, R> CompletableFuture<QueryResponseMessage<R>> query(QueryMessage<Q, R> queryMessage) {
        QueryMessage<Q, R> interceptedQuery = this.dispatchInterceptors.intercept(queryMessage);
        final CompletableFuture<QueryResponseMessage<R>> completableFuture = new CompletableFuture<QueryResponseMessage<R>>();
        try {
            String context = this.targetContextResolver.resolveContext(interceptedQuery);
            QueryRequest queryRequest = this.serializer.serializeRequest(interceptedQuery, 1, DIRECT_QUERY_TIMEOUT_MS, this.priorityCalculator.determinePriority(interceptedQuery));
            this.queryService(context).query(queryRequest, new StreamObserver<QueryResponse>(){

                public void onNext(QueryResponse queryResponse) {
                    logger.debug("Received query response [{}]", (Object)queryResponse);
                    completableFuture.complete(AxonServerQueryBus.this.serializer.deserializeResponse(queryResponse));
                }

                public void onError(Throwable throwable) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Received error while waiting for first response", throwable);
                    }
                    completableFuture.completeExceptionally((Throwable)ErrorCode.QUERY_DISPATCH_ERROR.convert(AxonServerQueryBus.this.configuration.getClientId(), throwable));
                }

                public void onCompleted() {
                    if (completableFuture.isDone()) {
                        return;
                    }
                    completableFuture.completeExceptionally((Throwable)ErrorCode.QUERY_DISPATCH_ERROR.convert(ErrorMessage.newBuilder().setMessage("No result from query executor").build()));
                }
            });
        }
        catch (Exception e) {
            logger.debug("There was a problem issuing a query {}.", interceptedQuery, (Object)e);
            completableFuture.completeExceptionally((Throwable)ErrorCode.QUERY_DISPATCH_ERROR.convert(this.configuration.getClientId(), e));
        }
        return completableFuture;
    }

    public <Q, R> Stream<QueryResponseMessage<R>> scatterGather(QueryMessage<Q, R> queryMessage, long timeout, TimeUnit timeUnit) {
        QueryMessage<Q, R> interceptedQuery = this.dispatchInterceptors.intercept(queryMessage);
        String context = this.targetContextResolver.resolveContext(interceptedQuery);
        QueryRequest queryRequest = this.serializer.serializeRequest(interceptedQuery, -1, timeUnit.toMillis(timeout), this.priorityCalculator.determinePriority(interceptedQuery));
        final BufferingSpliterator resultSpliterator = new BufferingSpliterator(Instant.now().plusMillis(timeUnit.toMillis(timeout)));
        ((QueryServiceGrpc.QueryServiceStub)this.queryService(context).withDeadlineAfter(timeout, timeUnit)).query(queryRequest, (StreamObserver<QueryResponse>)new UpstreamAwareStreamObserver<QueryResponse>(){

            public void onNext(QueryResponse queryResponse) {
                logger.debug("Received query response [{}]", (Object)queryResponse);
                if (queryResponse.hasErrorMessage()) {
                    logger.debug("The received query response has error message [{}]", (Object)queryResponse.getErrorMessage());
                } else if (!resultSpliterator.put(AxonServerQueryBus.this.serializer.deserializeResponse(queryResponse))) {
                    this.getRequestStream().cancel("Cancellation requested by client", null);
                }
            }

            public void onError(Throwable throwable) {
                if (!AxonServerQueryBus.this.isDeadlineExceeded(throwable)) {
                    logger.info("Received error while waiting for responses", throwable);
                }
                resultSpliterator.cancel(throwable);
            }

            public void onCompleted() {
                resultSpliterator.cancel(null);
            }
        });
        return (Stream)StreamSupport.stream(resultSpliterator, false).onClose(() -> resultSpliterator.cancel(null));
    }

    private boolean isDeadlineExceeded(Throwable throwable) {
        return throwable instanceof StatusRuntimeException && ((StatusRuntimeException)throwable).getStatus().getCode().equals((Object)Status.Code.DEADLINE_EXCEEDED);
    }

    public void disconnect() {
        this.queryProcessor.disconnect();
    }

    public QueryBus localSegment() {
        return this.localSegment;
    }

    private void publish(String context, QueryProviderOutbound providerOutbound) {
        this.queryProcessor.getSubscriberObserver(context).onNext((Object)providerOutbound);
    }

    private void on(QueryProviderInbound.RequestCase requestCase, BiConsumer<String, QueryProviderInbound> consumer) {
        Collection consumers = this.queryHandlers.computeIfAbsent(requestCase, rc -> new CopyOnWriteArraySet());
        consumers.add(qpi -> consumer.accept(this.configuration.getContext(), (QueryProviderInbound)qpi));
    }

    public <Q, I, U> SubscriptionQueryResult<QueryResponseMessage<I>, SubscriptionQueryUpdateMessage<U>> subscriptionQuery(SubscriptionQueryMessage<Q, I, U> query, SubscriptionQueryBackpressure backPressure, int updateBufferSize) {
        SubscriptionQueryMessage<Q, I, U> interceptedQuery = this.dispatchInterceptors.intercept(query);
        String subscriptionId = interceptedQuery.getIdentifier();
        String context = this.targetContextResolver.resolveContext((QueryMessage<?, ?>)interceptedQuery);
        Set contextSubscriptions = this.subscriptions.computeIfAbsent(context, k -> new ConcurrentSkipListSet());
        if (!contextSubscriptions.add(subscriptionId)) {
            String errorMessage = "There already is a subscription query with subscription Id [" + subscriptionId + "] for context [" + context + "]";
            logger.warn(errorMessage);
            throw new IllegalArgumentException(errorMessage);
        }
        logger.debug("Subscription Query requested with subscription Id [{}]", (Object)subscriptionId);
        AxonServerSubscriptionQueryResult result = new AxonServerSubscriptionQueryResult(this.subscriptionSerializer.serialize(interceptedQuery), this.queryService(context)::subscription, this.configuration, backPressure, updateBufferSize, () -> contextSubscriptions.remove(subscriptionId));
        return new DeserializedResult(result.get(), this.subscriptionSerializer);
    }

    QueryServiceGrpc.QueryServiceStub queryService(String context) {
        return QueryServiceGrpc.newStub(this.axonServerConnectionManager.getChannel(context));
    }

    public QueryUpdateEmitter queryUpdateEmitter() {
        return this.updateEmitter;
    }

    public Registration registerHandlerInterceptor(MessageHandlerInterceptor<? super QueryMessage<?, ?>> interceptor) {
        return this.localSegment.registerHandlerInterceptor(interceptor);
    }

    public Registration registerDispatchInterceptor(MessageDispatchInterceptor<? super QueryMessage<?, ?>> dispatchInterceptor) {
        return this.dispatchInterceptors.registerDispatchInterceptor(dispatchInterceptor);
    }

    public static class Builder {
        private AxonServerConnectionManager axonServerConnectionManager;
        private AxonServerConfiguration configuration;
        private QueryBus localSegment;
        private QueryUpdateEmitter updateEmitter;
        private Serializer messageSerializer;
        private Serializer genericSerializer;
        private QueryPriorityCalculator priorityCalculator = QueryPriorityCalculator.defaultQueryPriorityCalculator();
        private TargetContextResolver<? super QueryMessage<?, ?>> targetContextResolver = q -> this.configuration.getContext();
        private ExecutorServiceBuilder executorServiceBuilder = ExecutorServiceBuilder.defaultQueryExecutorServiceBuilder();

        public Builder axonServerConnectionManager(AxonServerConnectionManager axonServerConnectionManager) {
            BuilderUtils.assertNonNull((Object)axonServerConnectionManager, (String)"AxonServerConnectionManager may not be null");
            this.axonServerConnectionManager = axonServerConnectionManager;
            return this;
        }

        public Builder configuration(AxonServerConfiguration configuration) {
            BuilderUtils.assertNonNull((Object)configuration, (String)"AxonServerConfiguration may not be null");
            this.configuration = configuration;
            return this;
        }

        public Builder localSegment(QueryBus localSegment) {
            BuilderUtils.assertNonNull((Object)localSegment, (String)"Local QueryBus may not be null");
            this.localSegment = localSegment;
            return this;
        }

        public Builder updateEmitter(QueryUpdateEmitter updateEmitter) {
            BuilderUtils.assertNonNull((Object)updateEmitter, (String)"QueryUpdateEmitter may not be null");
            this.updateEmitter = updateEmitter;
            return this;
        }

        public Builder messageSerializer(Serializer messageSerializer) {
            BuilderUtils.assertNonNull((Object)messageSerializer, (String)"Message Serializer may not be null");
            this.messageSerializer = messageSerializer;
            return this;
        }

        public Builder genericSerializer(Serializer genericSerializer) {
            BuilderUtils.assertNonNull((Object)genericSerializer, (String)"Generic Serializer may not be null");
            this.genericSerializer = genericSerializer;
            return this;
        }

        public Builder priorityCalculator(QueryPriorityCalculator priorityCalculator) {
            BuilderUtils.assertNonNull(this.targetContextResolver, (String)"QueryPriorityCalculator may not be null");
            this.priorityCalculator = priorityCalculator;
            return this;
        }

        public Builder targetContextResolver(TargetContextResolver<? super QueryMessage<?, ?>> targetContextResolver) {
            BuilderUtils.assertNonNull(targetContextResolver, (String)"TargetContextResolver may not be null");
            this.targetContextResolver = targetContextResolver;
            return this;
        }

        public Builder executorServiceBuilder(ExecutorServiceBuilder executorServiceBuilder) {
            BuilderUtils.assertNonNull((Object)executorServiceBuilder, (String)"ExecutorServiceBuilder may not be null");
            this.executorServiceBuilder = executorServiceBuilder;
            return this;
        }

        public AxonServerQueryBus build() {
            return new AxonServerQueryBus(this);
        }

        protected QuerySerializer buildQuerySerializer() {
            return new QuerySerializer(this.messageSerializer, this.genericSerializer, this.configuration);
        }

        protected SubscriptionMessageSerializer buildSubscriptionMessageSerializer() {
            return new SubscriptionMessageSerializer(this.messageSerializer, this.genericSerializer, this.configuration);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull((Object)this.axonServerConnectionManager, (String)"The AxonServerConnectionManager is a hard requirement and should be provided");
            BuilderUtils.assertNonNull((Object)this.configuration, (String)"The AxonServerConfiguration is a hard requirement and should be provided");
            BuilderUtils.assertNonNull((Object)this.localSegment, (String)"The Local QueryBus is a hard requirement and should be provided");
            BuilderUtils.assertNonNull((Object)this.updateEmitter, (String)"The QueryUpdateEmitter is a hard requirement and should be provided");
            BuilderUtils.assertNonNull((Object)this.messageSerializer, (String)"The Message Serializer is a hard requirement and should be provided");
            BuilderUtils.assertNonNull((Object)this.genericSerializer, (String)"The Generic Serializer is a hard requirement and should be provided");
        }
    }

    private class QueryProcessor {
        private static final int QUERY_QUEUE_CAPACITY = 1000;
        private static final int DEFAULT_PRIORITY = 0;
        private final String context;
        private final ConcurrentMap<QueryDefinition, Set<MessageHandler<? super QueryMessage<?, ?>>>> subscribedQueries;
        private final ExecutorService queryExecutor;
        private volatile boolean subscribing;
        private volatile boolean running = true;
        private volatile StreamObserver<QueryProviderOutbound> outboundStreamObserver;

        QueryProcessor(String context, AxonServerConfiguration configuration, ExecutorServiceBuilder executorServiceBuilder) {
            this.context = context;
            this.subscribedQueries = new ConcurrentHashMap();
            PriorityBlockingQueue<Runnable> queryProcessQueue = new PriorityBlockingQueue<Runnable>(1000, Comparator.comparingLong(r -> r instanceof QueryProcessingTask ? ((QueryProcessingTask)r).getPriority() : 0L));
            this.queryExecutor = (ExecutorService)executorServiceBuilder.apply(configuration, queryProcessQueue);
        }

        private void resubscribe() {
            if (this.subscribedQueries.isEmpty() || this.subscribing) {
                return;
            }
            try {
                StreamObserver<QueryProviderOutbound> subscriberStreamObserver = this.getSubscriberObserver(this.context);
                this.subscribedQueries.forEach((queryDefinition, handlers) -> subscriberStreamObserver.onNext((Object)QueryProviderOutbound.newBuilder().setSubscribe(this.buildQuerySubscription((QueryDefinition)queryDefinition, handlers.size())).build()));
            }
            catch (Exception ex) {
                logger.warn("Error while resubscribing query handlers", (Throwable)ex);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public <R> Registration subscribe(String queryName, Type responseType, String componentName, MessageHandler<? super QueryMessage<?, R>> handler) {
            this.subscribing = true;
            Set registrations = this.subscribedQueries.computeIfAbsent(new QueryDefinition(queryName, responseType.getTypeName(), componentName), k -> new CopyOnWriteArraySet());
            registrations.add(handler);
            try {
                this.getSubscriberObserver(this.context).onNext((Object)QueryProviderOutbound.newBuilder().setSubscribe(QuerySubscription.newBuilder().setMessageId(UUID.randomUUID().toString()).setClientId(AxonServerQueryBus.this.configuration.getClientId()).setComponentName(componentName).setQuery(queryName).setResultName(responseType.getTypeName()).setNrOfHandlers(registrations.size()).build()).build());
            }
            catch (Exception ex) {
                logger.warn("Error subscribing query handler", (Throwable)ex);
            }
            finally {
                this.subscribing = false;
            }
            return AxonServerQueryBus.this.localSegment.subscribe(queryName, responseType, handler);
        }

        private void processQuery(QueryRequest query) {
            String requestId = query.getMessageIdentifier();
            QueryMessage queryMessage = AxonServerQueryBus.this.serializer.deserializeRequest(query);
            try {
                if (ProcessingInstructionHelper.numberOfResults(query.getProcessingInstructionsList()) == 1L) {
                    QueryResponseMessage response2 = (QueryResponseMessage)AxonServerQueryBus.this.localSegment.query(queryMessage).get();
                    this.outboundStreamObserver.onNext((Object)QueryProviderOutbound.newBuilder().setQueryResponse(AxonServerQueryBus.this.serializer.serializeResponse(response2, requestId)).build());
                } else {
                    AxonServerQueryBus.this.localSegment.scatterGather(queryMessage, 0L, TimeUnit.SECONDS).forEach(response -> this.outboundStreamObserver.onNext((Object)QueryProviderOutbound.newBuilder().setQueryResponse(AxonServerQueryBus.this.serializer.serializeResponse((QueryResponseMessage<?>)response, requestId)).build()));
                }
                this.outboundStreamObserver.onNext((Object)QueryProviderOutbound.newBuilder().setQueryComplete(QueryComplete.newBuilder().setMessageId(UUID.randomUUID().toString()).setRequestId(requestId)).build());
            }
            catch (Exception e) {
                logger.warn("Failed to dispatch query [{}] locally", (Object)queryMessage.getQueryName(), (Object)e);
                if (this.outboundStreamObserver == null) {
                    return;
                }
                this.outboundStreamObserver.onNext((Object)QueryProviderOutbound.newBuilder().setQueryResponse(QueryResponse.newBuilder().setMessageIdentifier(UUID.randomUUID().toString()).setRequestIdentifier(requestId).setErrorMessage(ExceptionSerializer.serialize(AxonServerQueryBus.this.configuration.getClientId(), e)).setErrorCode(ErrorCode.QUERY_EXECUTION_ERROR.errorCode()).build()).build());
            }
        }

        private synchronized StreamObserver<QueryProviderOutbound> getSubscriberObserver(String context) {
            if (this.outboundStreamObserver != null) {
                return this.outboundStreamObserver;
            }
            StreamObserver<QueryProviderInbound> queryProviderInboundStreamObserver = new StreamObserver<QueryProviderInbound>(){

                public void onNext(QueryProviderInbound inboundRequest) {
                    QueryProviderInbound.RequestCase requestCase = inboundRequest.getRequestCase();
                    ((Collection)AxonServerQueryBus.this.queryHandlers.getOrDefault((Object)requestCase, Collections.emptySet())).forEach(consumer -> consumer.accept(inboundRequest));
                    switch (requestCase) {
                        case CONFIRMATION: {
                            break;
                        }
                        case QUERY: {
                            QueryProcessor.this.queryExecutor.execute(new QueryProcessingTask(inboundRequest.getQuery()));
                        }
                    }
                }

                public void onError(Throwable ex) {
                    logger.warn("Query Inbound Stream closed with error", ex);
                    QueryProcessor.this.outboundStreamObserver = null;
                }

                public void onCompleted() {
                    logger.info("Received completed from server.");
                    QueryProcessor.this.outboundStreamObserver = null;
                }
            };
            ResubscribableStreamObserver<QueryProviderInbound> resubscribableStreamObserver = new ResubscribableStreamObserver<QueryProviderInbound>(queryProviderInboundStreamObserver, t -> this.resubscribe());
            StreamObserver<QueryProviderOutbound> streamObserver = AxonServerQueryBus.this.axonServerConnectionManager.getQueryStream(context, resubscribableStreamObserver);
            logger.info("Creating new query stream subscriber");
            this.outboundStreamObserver = new FlowControllingStreamObserver<QueryProviderOutbound>(streamObserver, AxonServerQueryBus.this.configuration, flowControl -> QueryProviderOutbound.newBuilder().setFlowControl((FlowControl)flowControl).build(), t -> t.getRequestCase().equals((Object)QueryProviderOutbound.RequestCase.QUERY_RESPONSE)).sendInitialPermits();
            return this.outboundStreamObserver;
        }

        public void unsubscribe(String queryName, Type responseType, String componentName) {
            QueryDefinition queryDefinition = new QueryDefinition(queryName, responseType.getTypeName(), componentName);
            this.subscribedQueries.remove(queryDefinition);
            try {
                this.getSubscriberObserver(this.context).onNext((Object)QueryProviderOutbound.newBuilder().setUnsubscribe(this.buildQuerySubscription(queryDefinition, 1)).build());
            }
            catch (Exception exception) {
                // empty catch block
            }
        }

        private void unsubscribeAll() {
            this.subscribedQueries.forEach((queryDefinition, handlerSet) -> {
                try {
                    this.getSubscriberObserver(this.context).onNext((Object)QueryProviderOutbound.newBuilder().setUnsubscribe(this.buildQuerySubscription((QueryDefinition)queryDefinition, 1)).build());
                }
                catch (Exception exception) {
                    // empty catch block
                }
            });
            this.outboundStreamObserver = null;
        }

        void disconnect() {
            if (this.outboundStreamObserver != null) {
                this.outboundStreamObserver.onCompleted();
            }
            this.running = false;
            this.queryExecutor.shutdown();
        }

        private QuerySubscription buildQuerySubscription(QueryDefinition queryDefinition, int nrHandlers) {
            return QuerySubscription.newBuilder().setClientId(AxonServerQueryBus.this.configuration.getClientId()).setMessageId(UUID.randomUUID().toString()).setComponentName(queryDefinition.componentName).setQuery(queryDefinition.queryName).setNrOfHandlers(nrHandlers).setResultName(queryDefinition.responseName).build();
        }

        private class QueryProcessingTask
        implements Runnable {
            private final long priority;
            private final QueryRequest queryRequest;

            private QueryProcessingTask(QueryRequest queryRequest) {
                this.priority = -ProcessingInstructionHelper.priority(queryRequest.getProcessingInstructionsList());
                this.queryRequest = queryRequest;
            }

            public long getPriority() {
                return this.priority;
            }

            @Override
            public void run() {
                if (!QueryProcessor.this.running) {
                    logger.debug("Query Processor has stopped running, hence query [{}] will no longer be processed", (Object)this.queryRequest.getQuery());
                    return;
                }
                try {
                    logger.debug("Will process query [{}]", (Object)this.queryRequest.getQuery());
                    QueryProcessor.this.processQuery(this.queryRequest);
                }
                catch (OutOfDirectMemoryError | RuntimeException e) {
                    logger.warn("Query Processor had an exception when processing query [{}]", (Object)this.queryRequest.getQuery(), (Object)e);
                }
            }
        }

        private class QueryDefinition {
            private final String queryName;
            private final String responseName;
            private final String componentName;

            QueryDefinition(String queryName, String responseName, String componentName) {
                this.queryName = queryName;
                this.responseName = responseName;
                this.componentName = componentName;
            }

            public boolean equals(Object o) {
                if (this == o) {
                    return true;
                }
                if (o == null || this.getClass() != o.getClass()) {
                    return false;
                }
                QueryDefinition that = (QueryDefinition)o;
                return Objects.equals(this.queryName, that.queryName) && Objects.equals(this.responseName, that.responseName) && Objects.equals(this.componentName, that.componentName);
            }

            public int hashCode() {
                return Objects.hash(this.queryName, this.responseName, this.componentName);
            }
        }
    }
}

