/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.query.impl;

import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.FlowControl;
import io.axoniq.axonserver.connector.InstructionHandler;
import io.axoniq.axonserver.connector.Registration;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.connector.ResultStream;
import io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel;
import io.axoniq.axonserver.connector.impl.AbstractBufferedStream;
import io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream;
import io.axoniq.axonserver.connector.impl.AsyncRegistration;
import io.axoniq.axonserver.connector.impl.AxonServerManagedChannel;
import io.axoniq.axonserver.connector.impl.BufferingReplyChannel;
import io.axoniq.axonserver.connector.impl.CloseAwareReplyChannel;
import io.axoniq.axonserver.connector.impl.DisposableReadonlyBuffer;
import io.axoniq.axonserver.connector.impl.FlowControlledReplyChannelWriter;
import io.axoniq.axonserver.connector.impl.NoopFlowControl;
import io.axoniq.axonserver.connector.impl.ObjectUtils;
import io.axoniq.axonserver.connector.impl.buffer.BlockingCloseableBuffer;
import io.axoniq.axonserver.connector.impl.buffer.FlowControlledDisposableReadonlyBuffer;
import io.axoniq.axonserver.connector.query.QueryChannel;
import io.axoniq.axonserver.connector.query.QueryDefinition;
import io.axoniq.axonserver.connector.query.QueryHandler;
import io.axoniq.axonserver.connector.query.SubscriptionQueryResult;
import io.axoniq.axonserver.connector.query.impl.SubscriptionQueryStream;
import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.InstructionAck;
import io.axoniq.axonserver.grpc.ProcessingInstruction;
import io.axoniq.axonserver.grpc.ProcessingKey;
import io.axoniq.axonserver.grpc.SerializedObject;
import io.axoniq.axonserver.grpc.control.ClientIdentification;
import io.axoniq.axonserver.grpc.query.QueryComplete;
import io.axoniq.axonserver.grpc.query.QueryProviderInbound;
import io.axoniq.axonserver.grpc.query.QueryProviderOutbound;
import io.axoniq.axonserver.grpc.query.QueryRequest;
import io.axoniq.axonserver.grpc.query.QueryResponse;
import io.axoniq.axonserver.grpc.query.QueryServiceGrpc;
import io.axoniq.axonserver.grpc.query.QuerySubscription;
import io.axoniq.axonserver.grpc.query.QueryUpdate;
import io.axoniq.axonserver.grpc.query.QueryUpdateComplete;
import io.axoniq.axonserver.grpc.query.SubscriptionQuery;
import io.axoniq.axonserver.grpc.query.SubscriptionQueryRequest;
import io.axoniq.axonserver.grpc.query.SubscriptionQueryResponse;
import io.grpc.Channel;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryChannelImpl
extends AbstractAxonServerChannel<QueryProviderOutbound>
implements QueryChannel {
    private static final Logger logger = LoggerFactory.getLogger(QueryChannelImpl.class);
    private static final QueryResponse TERMINAL = QueryResponse.newBuilder().setErrorCode("__TERMINAL__").build();
    private final AtomicReference<CallStreamObserver<QueryProviderOutbound>> outboundQueryStream = new AtomicReference();
    private final Map<QueryDefinition, AtomicInteger> supportedQueries = new ConcurrentHashMap<QueryDefinition, AtomicInteger>();
    private final ConcurrentMap<String, Set<QueryHandler>> queryHandlers = new ConcurrentHashMap<String, Set<QueryHandler>>();
    private final ConcurrentMap<Enum<?>, InstructionHandler<QueryProviderInbound, QueryProviderOutbound>> instructionHandlers = new ConcurrentHashMap();
    private final ClientIdentification clientIdentification;
    private final String context;
    private final int permits;
    private final int permitsBatch;
    private final Object queryHandlerMonitor = new Object();
    private final Map<String, Set<Registration>> subscriptionQueries = new ConcurrentHashMap<String, Set<Registration>>();
    private final QueryServiceGrpc.QueryServiceStub queryServiceStub;
    private final Map<String, QueryInProgress> queriesInProgress = new ConcurrentHashMap<String, QueryInProgress>();
    private final AtomicBoolean subscriptionsCompleted = new AtomicBoolean(false);

    public QueryChannelImpl(ClientIdentification clientIdentification, String context, int permits, int permitsBatch, ScheduledExecutorService executor, AxonServerManagedChannel channel) {
        super(clientIdentification, executor, channel);
        this.clientIdentification = clientIdentification;
        this.context = context;
        this.permits = permits;
        this.permitsBatch = permitsBatch;
        this.instructionHandlers.put(QueryProviderInbound.RequestCase.QUERY, this::handleQuery);
        this.instructionHandlers.put(QueryProviderInbound.RequestCase.ACK, this::handleAck);
        this.instructionHandlers.put(SubscriptionQueryRequest.RequestCase.GET_INITIAL_RESULT, this::getInitialResult);
        this.instructionHandlers.put(SubscriptionQueryRequest.RequestCase.SUBSCRIBE, this::subscribeToQueryUpdates);
        this.instructionHandlers.put(SubscriptionQueryRequest.RequestCase.UNSUBSCRIBE, this::unsubscribeToQueryUpdates);
        this.instructionHandlers.put(QueryProviderInbound.RequestCase.QUERY_CANCEL, this::handleCancelRequest);
        this.instructionHandlers.put(QueryProviderInbound.RequestCase.QUERY_FLOW_CONTROL, this::handleFlowControlRequest);
        this.queryServiceStub = QueryServiceGrpc.newStub((Channel)channel);
    }

    private void handleAck(QueryProviderInbound query, ReplyChannel<QueryProviderOutbound> result) {
        this.processAck(query.getAck());
        result.complete();
    }

    private void unsubscribeToQueryUpdates(QueryProviderInbound query, ReplyChannel<QueryProviderOutbound> result) {
        SubscriptionQuery unsubscribe = query.getSubscriptionQueryRequest().getUnsubscribe();
        Set<Registration> registration = this.subscriptionQueries.remove(unsubscribe.getSubscriptionIdentifier());
        if (registration != null) {
            registration.forEach(Registration::cancel);
        }
        result.complete();
    }

    private void subscribeToQueryUpdates(QueryProviderInbound query, final ReplyChannel<QueryProviderOutbound> result) {
        final SubscriptionQuery subscribe = query.getSubscriptionQueryRequest().getSubscribe();
        final String subscriptionIdentifier = subscribe.getSubscriptionIdentifier();
        Set<QueryHandler> handlers = this.queryHandlers.getOrDefault(subscribe.getQueryRequest().getQuery(), Collections.emptySet());
        handlers.forEach(e -> {
            Registration registration = e.registerSubscriptionQuery(subscribe, new QueryHandler.UpdateHandler(){

                @Override
                public void sendUpdate(QueryUpdate queryUpdate) {
                    SubscriptionQueryResponse subscriptionQueryUpdate = SubscriptionQueryResponse.newBuilder().setSubscriptionIdentifier(subscriptionIdentifier).setUpdate(queryUpdate).setMessageIdentifier(queryUpdate.getMessageIdentifier()).build();
                    result.send(QueryProviderOutbound.newBuilder().setSubscriptionQueryResponse(subscriptionQueryUpdate).build());
                    logger.debug("Subscription Query Update [id: {}] for subscription {}, sent to client {}.", new Object[]{queryUpdate.getMessageIdentifier(), subscribe.getSubscriptionIdentifier(), queryUpdate.getClientId()});
                }

                @Override
                public void complete() {
                    QueryUpdateComplete complete = QueryUpdateComplete.newBuilder().setClientId(QueryChannelImpl.this.clientIdentification.getClientId()).setComponentName(QueryChannelImpl.this.clientIdentification.getComponentName()).build();
                    SubscriptionQueryResponse subscriptionQueryResult = SubscriptionQueryResponse.newBuilder().setSubscriptionIdentifier(subscriptionIdentifier).setComplete(complete).build();
                    result.send(QueryProviderOutbound.newBuilder().setSubscriptionQueryResponse(subscriptionQueryResult).build());
                    logger.debug("Subscription Query Update completion sent to client {}.", (Object)complete.getClientId());
                }
            });
            if (registration != null) {
                this.subscriptionQueries.compute(subscriptionIdentifier, (k, v) -> v != null ? v : new CopyOnWriteArraySet()).add(registration);
            }
        });
        result.complete();
    }

    private void handleCancelRequest(QueryProviderInbound complete, ReplyChannel<QueryProviderOutbound> result) {
        this.queriesInProgress.getOrDefault(complete.getQueryCancel().getRequestId(), QueryInProgress.noop()).cancel();
        result.complete();
    }

    private void handleFlowControlRequest(QueryProviderInbound flowControl, ReplyChannel<QueryProviderOutbound> result) {
        this.queriesInProgress.getOrDefault(flowControl.getQueryFlowControl().getQueryReference().getRequestId(), QueryInProgress.noop()).request(flowControl.getQueryFlowControl().getPermits());
        result.complete();
    }

    @Override
    public void connect() {
        if (!this.queryHandlers.isEmpty()) {
            logger.debug("QueryChannel for context '{}' will attempt to connect.", (Object)this.context);
            this.doConnectQueryStream();
        }
    }

    private synchronized void doConnectQueryStream() {
        if (this.outboundQueryStream.get() != null) {
            logger.debug("QueryChannel for context '{}' is already connected.", (Object)this.context);
            return;
        }
        IncomingQueryInstructionStream responseObserver = new IncomingQueryInstructionStream(this.clientIdentification.getClientId(), this.permits, this.permitsBatch, this::onConnectionError, this::registerOutboundStream);
        try {
            this.queryServiceStub.openStream((StreamObserver<QueryProviderInbound>)responseObserver);
        }
        catch (Exception e) {
            responseObserver.onError(e);
            return;
        }
        ClientCallStreamObserver newValue = responseObserver.getInstructionsForPlatform();
        this.supportedQueries.keySet().stream().map(queryDef -> this.sendInstruction(this.buildSubscribeMessage(queryDef.getQueryName(), queryDef.getResultType(), UUID.randomUUID().toString()), QueryProviderOutbound::getInstructionId, newValue)).reduce((xva$0, xva$1) -> CompletableFuture.allOf(xva$0, xva$1)).orElse(CompletableFuture.completedFuture(null)).whenComplete((unused, throwable) -> {
            if (throwable != null) {
                logger.warn("An error occurred while registering query handlers", throwable);
            } else {
                logger.info("QueryChannel for context '{}' connected, {} registrations resubscribed", (Object)this.context, (Object)this.queryHandlers.size());
            }
            this.subscriptionsCompleted.set(throwable == null);
        });
        responseObserver.enableFlowControl();
    }

    private void onConnectionError(Throwable error) {
        logger.info("Error on QueryChannel for context {}", (Object)this.context, (Object)error);
        this.subscriptionsCompleted.set(false);
        this.scheduleReconnect(error);
    }

    private void registerOutboundStream(CallStreamObserver<QueryProviderOutbound> upstream) {
        StreamObserver previous = (StreamObserver)this.outboundQueryStream.getAndSet(upstream);
        if (previous != upstream) {
            ObjectUtils.silently(previous, StreamObserver::onCompleted);
        }
    }

    private QueryProviderOutbound buildSubscribeMessage(String queryName, String resultName, String instructionId) {
        QuerySubscription.Builder querySubscription = QuerySubscription.newBuilder().setMessageId(instructionId).setQuery(queryName).setResultName(resultName).setClientId(this.clientIdentification.getClientId()).setComponentName(this.clientIdentification.getComponentName());
        return QueryProviderOutbound.newBuilder().setInstructionId(instructionId).setSubscribe(querySubscription).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Registration registerQueryHandler(QueryHandler handler, QueryDefinition ... queryDefinitions) {
        CompletableFuture<Object> subscriptionResult = CompletableFuture.completedFuture(null);
        Object object = this.queryHandlerMonitor;
        synchronized (object) {
            if (this.queryHandlers.isEmpty()) {
                this.doConnectQueryStream();
            }
            for (QueryDefinition queryDefinition : queryDefinitions) {
                boolean firstRegistration;
                this.queryHandlers.computeIfAbsent(queryDefinition.getQueryName(), k -> new CopyOnWriteArraySet()).add(handler);
                boolean bl = firstRegistration = this.supportedQueries.computeIfAbsent(queryDefinition, k -> new AtomicInteger()).getAndIncrement() == 0;
                if (firstRegistration) {
                    QueryProviderOutbound subscribeMessage = this.buildSubscribeMessage(queryDefinition.getQueryName(), queryDefinition.getResultType(), UUID.randomUUID().toString());
                    CompletableFuture<Void> instructionResult = this.sendInstruction(subscribeMessage, QueryProviderOutbound::getInstructionId, (StreamObserver)this.outboundQueryStream.get());
                    subscriptionResult = CompletableFuture.allOf(subscriptionResult, instructionResult);
                }
                logger.info("Registered handler for query '{}' in context '{}'", (Object)queryDefinition, (Object)this.context);
            }
        }
        return new AsyncRegistration(subscriptionResult, () -> {
            Object object = this.queryHandlerMonitor;
            synchronized (object) {
                CompletableFuture<Object> result = CompletableFuture.completedFuture(null);
                for (QueryDefinition def : queryDefinitions) {
                    Set refs = (Set)this.queryHandlers.get(def.getQueryName());
                    if (refs != null && refs.remove(handler) && refs.isEmpty()) {
                        this.queryHandlers.remove(def.getQueryName());
                        result = CompletableFuture.allOf(result, this.sendUnsubscribe(def, (StreamObserver<QueryProviderOutbound>)((StreamObserver)this.outboundQueryStream.get())));
                        logger.debug("Deregistered handlers for query '{}' in context '{}'", (Object)def, (Object)this.context);
                    }
                    this.supportedQueries.computeIfPresent(def, (qd, counter) -> counter.decrementAndGet() == 0 ? null : counter);
                }
                return result;
            }
        });
    }

    private CompletableFuture<Void> sendUnsubscribe(QueryDefinition queryDefinition, StreamObserver<QueryProviderOutbound> outboundStream) {
        if (outboundStream == null) {
            return CompletableFuture.completedFuture(null);
        }
        String instructionId = UUID.randomUUID().toString();
        QuerySubscription unsubscribeMessage = QuerySubscription.newBuilder().setMessageId(instructionId).setQuery(queryDefinition.getQueryName()).setResultName(queryDefinition.getResultType()).setClientId(this.clientIdentification.getClientId()).setComponentName(this.clientIdentification.getComponentName()).build();
        return this.sendInstruction(QueryProviderOutbound.newBuilder().setInstructionId(instructionId).setUnsubscribe(unsubscribeMessage).build(), QueryProviderOutbound::getInstructionId, outboundStream);
    }

    @Override
    public ResultStream<QueryResponse> query(QueryRequest query) {
        logger.trace("Sending query over QueryChannel for context '{}'.", (Object)this.context);
        if (query.getMessageIdentifier().isEmpty()) {
            query = query.toBuilder().setMessageIdentifier(UUID.randomUUID().toString()).build();
        }
        AbstractBufferedStream<QueryResponse, QueryRequest> results = new AbstractBufferedStream<QueryResponse, QueryRequest>(this.clientIdentification.getClientId(), 32, 8){

            @Override
            protected QueryRequest buildFlowControlMessage(io.axoniq.axonserver.grpc.FlowControl flowControl) {
                return null;
            }

            @Override
            protected QueryResponse terminalMessage() {
                return TERMINAL;
            }

            @Override
            public void close() {
                this.outboundStream().cancel("Client cancelled the stream.", null);
            }
        };
        if ("".equals(query.getMessageIdentifier())) {
            logger.debug("No message identifier has been set on the query. Adding a random identifier now.");
            query = query.toBuilder().setMessageIdentifier(UUID.randomUUID().toString()).build();
        }
        this.queryServiceStub.query(query, (StreamObserver<QueryResponse>)results);
        return results;
    }

    @Override
    public SubscriptionQueryResult subscriptionQuery(QueryRequest query, SerializedObject updateResponseType, int bufferSize, int fetchSize) {
        logger.trace("Sending subscription query over QueryChannel for context '{}'.", (Object)this.context);
        final QueryRequest finalQuery = query.getMessageIdentifier().isEmpty() ? query.toBuilder().setMessageIdentifier(UUID.randomUUID().toString()).build() : query;
        final String subscriptionId = finalQuery.getMessageIdentifier();
        final CompletableFuture<QueryResponse> initialResultFuture = new CompletableFuture<QueryResponse>();
        final SubscriptionQueryStream subscriptionStream = new SubscriptionQueryStream(subscriptionId, initialResultFuture, this.clientIdentification.getClientId(), bufferSize, fetchSize);
        final StreamObserver<SubscriptionQueryRequest> upstream = this.queryServiceStub.subscription((StreamObserver<SubscriptionQueryResponse>)subscriptionStream);
        subscriptionStream.enableFlowControl();
        SubscriptionQuery subscriptionQuery = SubscriptionQuery.newBuilder().setQueryRequest(finalQuery).setSubscriptionIdentifier(subscriptionId).setUpdateResponseType(updateResponseType).build();
        upstream.onNext((Object)SubscriptionQueryRequest.newBuilder().setSubscribe(subscriptionQuery).build());
        return new SubscriptionQueryResult(){
            private final AtomicBoolean initialResultRequested = new AtomicBoolean();

            @Override
            public CompletableFuture<QueryResponse> initialResult() {
                if (!initialResultFuture.isDone() && !this.initialResultRequested.getAndSet(true)) {
                    SubscriptionQuery.Builder initialResultRequest = SubscriptionQuery.newBuilder().setQueryRequest(finalQuery).setSubscriptionIdentifier(subscriptionId);
                    upstream.onNext((Object)SubscriptionQueryRequest.newBuilder().setGetInitialResult(initialResultRequest).build());
                }
                return initialResultFuture;
            }

            @Override
            public ResultStream<QueryUpdate> updates() {
                return subscriptionStream.buffer();
            }
        };
    }

    @Override
    public synchronized void disconnect() {
        logger.debug("Disconnecting QueryChannel for context '{}'.", (Object)this.context);
        CallStreamObserver previousOutbound = this.outboundQueryStream.getAndSet(null);
        CompletableFuture unsubscribed = previousOutbound == null ? CompletableFuture.completedFuture(null) : this.supportedQueries.keySet().stream().map(queryDefinition -> this.sendUnsubscribe((QueryDefinition)queryDefinition, (StreamObserver<QueryProviderOutbound>)previousOutbound)).reduce((xva$0, xva$1) -> CompletableFuture.allOf(xva$0, xva$1)).map(cf -> cf.exceptionally(e -> {
            logger.warn("An error occurred while deregistering query handlers", e);
            return null;
        })).orElseGet(() -> CompletableFuture.completedFuture(null));
        this.cancelAllSubscriptionQueries();
        ((CompletableFuture)unsubscribed.thenCompose(stream -> {
            if (!this.queriesInProgress.isEmpty()) {
                logger.info("Disconnect requested. Waiting for {} queries to be completed", (Object)this.queriesInProgress.size());
            }
            return this.queriesInProgress.values().stream().map(QueryInProgress::whenComplete).reduce((xva$0, xva$1) -> CompletableFuture.allOf(xva$0, xva$1)).orElseGet(() -> CompletableFuture.completedFuture(null));
        })).thenAccept(previousStream -> ObjectUtils.doIfNotNull(previousOutbound, StreamObserver::onCompleted));
        this.subscriptionsCompleted.set(false);
    }

    @Override
    public void reconnect() {
        logger.debug("Reconnecting QueryChannel for context '{}'.", (Object)this.context);
        this.disconnect();
        this.scheduleImmediateReconnect();
    }

    @Override
    public CompletableFuture<Void> prepareDisconnect() {
        logger.debug("Preparing disconnect on QueryChannel for context '{}'.", (Object)this.context);
        CallStreamObserver<QueryProviderOutbound> outboundStream = this.outboundQueryStream.get();
        CompletableFuture future = this.supportedQueries.keySet().stream().map(queryDefinition -> this.sendUnsubscribe((QueryDefinition)queryDefinition, (StreamObserver<QueryProviderOutbound>)outboundStream)).reduce((xva$0, xva$1) -> CompletableFuture.allOf(xva$0, xva$1)).orElseGet(() -> CompletableFuture.completedFuture(null));
        this.cancelAllSubscriptionQueries();
        return future;
    }

    private void cancelAllSubscriptionQueries() {
        this.subscriptionQueries.forEach((k, v) -> this.subscriptionQueries.remove(k).forEach(Registration::cancel));
    }

    @Override
    public boolean isReady() {
        return this.queryHandlers.isEmpty() || this.outboundQueryStream.get() != null && this.subscriptionsCompleted.get();
    }

    private void doHandleQuery(QueryProviderInbound query, ReplyChannel<QueryResponse> responseHandler) {
        this.doHandleQuery(query.getQuery(), responseHandler);
    }

    private void doHandleQuery(QueryRequest query, ReplyChannel<QueryResponse> responseChannel) {
        AtomicReference<FlowControlledReplyChannelWriter<QueryResponse>> flowControlRef = new AtomicReference<FlowControlledReplyChannelWriter<QueryResponse>>();
        Runnable removeQuery = () -> {
            this.queriesInProgress.remove(query.getMessageIdentifier());
            responseChannel.complete();
        };
        QueryInProgress queryInProgress = new QueryInProgress(removeQuery, flowControlRef::get);
        if (this.queriesInProgress.putIfAbsent(query.getMessageIdentifier(), queryInProgress) != null) {
            return;
        }
        CloseAwareReplyChannel<QueryResponse> responseHandler = new CloseAwareReplyChannel<QueryResponse>(responseChannel, queryInProgress::cancel);
        Set handlers = this.queryHandlers.getOrDefault(query.getQuery(), Collections.emptySet());
        if (handlers.isEmpty()) {
            responseHandler.sendLast(QueryResponse.newBuilder().setRequestIdentifier(query.getMessageIdentifier()).setErrorCode(ErrorCategory.NO_HANDLER_FOR_QUERY.errorCode()).setErrorMessage(ErrorMessage.newBuilder().setMessage("No handler for query").build()).build());
        }
        List buffers = handlers.stream().map(queryHandler -> this.executeQuery((QueryHandler)queryHandler, query, responseChannel)).collect(Collectors.toList());
        RequestIdEnhancementReplyChannel requestIdReplenishmentReplyChannel = new RequestIdEnhancementReplyChannel(query, responseHandler);
        FlowControlledReplyChannelWriter<QueryResponse> flowControl = new FlowControlledReplyChannelWriter<QueryResponse>(buffers, requestIdReplenishmentReplyChannel);
        flowControlRef.set(flowControl);
        if (!this.supportsStreaming(query)) {
            flowControl.request(Long.MAX_VALUE);
        }
    }

    private DisposableReadonlyBuffer<QueryResponse> executeQuery(QueryHandler handler, QueryRequest query, ReplyChannel<QueryResponse> replyChannel) {
        BlockingCloseableBuffer buffer = new BlockingCloseableBuffer();
        BufferingReplyChannel<QueryResponse> bufferingReplyChannel = new BufferingReplyChannel<QueryResponse>(replyChannel, buffer);
        return new FlowControlledDisposableReadonlyBuffer<QueryResponse>(handler.stream(query, bufferingReplyChannel), buffer);
    }

    private boolean supportsStreaming(QueryRequest queryRequest) {
        return this.axonServerSupportsQueryStreaming(queryRequest) && this.querySenderSupportsStreaming(queryRequest);
    }

    private boolean axonServerSupportsQueryStreaming(QueryRequest queryRequest) {
        return this.booleanProcessingValue(queryRequest.getProcessingInstructionsList(), ProcessingKey.SERVER_SUPPORTS_STREAMING);
    }

    private boolean querySenderSupportsStreaming(QueryRequest queryRequest) {
        return this.booleanProcessingValue(queryRequest.getProcessingInstructionsList(), ProcessingKey.CLIENT_SUPPORTS_STREAMING);
    }

    private boolean booleanProcessingValue(List<ProcessingInstruction> instructions, ProcessingKey processingKey) {
        return instructions.stream().filter(instruction -> processingKey.equals((Object)instruction.getKey())).map(instruction -> instruction.getValue().getBooleanValue()).findFirst().orElse(false);
    }

    private void handleQuery(final QueryProviderInbound inbound, final ReplyChannel<QueryProviderOutbound> result) {
        this.doHandleQuery(inbound, new ReplyChannel<QueryResponse>(){

            @Override
            public void send(QueryResponse response) {
                result.send(QueryProviderOutbound.newBuilder().setQueryResponse(response).build());
            }

            @Override
            public void complete() {
                QueryComplete queryComplete = QueryComplete.newBuilder().setRequestId(inbound.getQuery().getMessageIdentifier()).setMessageId(UUID.randomUUID().toString()).build();
                result.send(QueryProviderOutbound.newBuilder().setQueryComplete(queryComplete).build());
                result.complete();
            }

            @Override
            public void completeWithError(ErrorMessage errorMessage) {
                result.completeWithError(errorMessage);
            }

            @Override
            public void completeWithError(ErrorCategory errorCategory, String message) {
                result.completeWithError(errorCategory, message);
            }
        });
    }

    private void getInitialResult(QueryProviderInbound query, final ReplyChannel<QueryProviderOutbound> result) {
        final String subscriptionId = query.getSubscriptionQueryRequest().getGetInitialResult().getSubscriptionIdentifier();
        this.doHandleQuery(query.getSubscriptionQueryRequest().getGetInitialResult().getQueryRequest(), new ReplyChannel<QueryResponse>(){

            @Override
            public void send(QueryResponse response) {
                SubscriptionQueryResponse initialResult = SubscriptionQueryResponse.newBuilder().setSubscriptionIdentifier(subscriptionId).setInitialResult(response).setMessageIdentifier(response.getMessageIdentifier()).build();
                result.send(QueryProviderOutbound.newBuilder().setSubscriptionQueryResponse(initialResult).build());
            }

            @Override
            public void complete() {
                result.complete();
            }

            @Override
            public void completeWithError(ErrorMessage errorMessage) {
                result.completeWithError(errorMessage);
            }

            @Override
            public void completeWithError(ErrorCategory errorCategory, String message) {
                result.completeWithError(errorCategory, message);
            }
        });
    }

    private class IncomingQueryInstructionStream
    extends AbstractIncomingInstructionStream<QueryProviderInbound, QueryProviderOutbound> {
        public IncomingQueryInstructionStream(String clientId, int permits, int permitsBatch, Consumer<Throwable> disconnectHandler, Consumer<CallStreamObserver<QueryProviderOutbound>> beforeStartHandler) {
            super(clientId, permits, permitsBatch, disconnectHandler, beforeStartHandler);
        }

        @Override
        protected QueryProviderOutbound buildFlowControlMessage(io.axoniq.axonserver.grpc.FlowControl flowControl) {
            return QueryProviderOutbound.newBuilder().setFlowControl(flowControl).build();
        }

        @Override
        protected QueryProviderOutbound buildAckMessage(InstructionAck ack) {
            return QueryProviderOutbound.newBuilder().setAck(ack).build();
        }

        @Override
        protected String getInstructionId(QueryProviderInbound instruction) {
            return instruction.getInstructionId();
        }

        @Override
        protected InstructionHandler<QueryProviderInbound, QueryProviderOutbound> getHandler(QueryProviderInbound request) {
            if (request.getRequestCase() == QueryProviderInbound.RequestCase.SUBSCRIPTION_QUERY_REQUEST) {
                return (InstructionHandler)QueryChannelImpl.this.instructionHandlers.get((Object)request.getSubscriptionQueryRequest().getRequestCase());
            }
            return (InstructionHandler)QueryChannelImpl.this.instructionHandlers.get((Object)request.getRequestCase());
        }

        @Override
        protected boolean unregisterOutboundStream(CallStreamObserver<QueryProviderOutbound> expected) {
            if (QueryChannelImpl.this.outboundQueryStream.compareAndSet(expected, null)) {
                QueryChannelImpl.this.cancelAllSubscriptionQueries();
                return true;
            }
            return false;
        }
    }

    private static class RequestIdEnhancementReplyChannel
    implements ReplyChannel<QueryResponse> {
        private final QueryRequest query;
        private final ReplyChannel<QueryResponse> delegate;

        public RequestIdEnhancementReplyChannel(QueryRequest query, ReplyChannel<QueryResponse> delegate) {
            this.query = query;
            this.delegate = delegate;
        }

        @Override
        public void send(QueryResponse response) {
            if (!this.query.getMessageIdentifier().equals(response.getRequestIdentifier())) {
                logger.debug("RequestIdentifier not properly set, modifying message");
                QueryResponse newResponse = response.toBuilder().setRequestIdentifier(this.query.getMessageIdentifier()).build();
                this.delegate.send(newResponse);
            } else {
                this.delegate.send(response);
            }
        }

        @Override
        public void complete() {
            this.delegate.complete();
        }

        @Override
        public void completeWithError(ErrorMessage errorMessage) {
            this.delegate.completeWithError(errorMessage);
        }

        @Override
        public void completeWithError(ErrorCategory errorCategory, String message) {
            this.delegate.completeWithError(errorCategory, message);
        }

        @Override
        public void sendNack(ErrorMessage errorMessage) {
            this.delegate.sendNack(errorMessage);
        }

        @Override
        public void sendAck() {
            this.delegate.sendAck();
        }
    }

    private static class QueryInProgress {
        private final CompletableFuture<Void> cancelHandler = new CompletableFuture();
        private final Supplier<FlowControl> flowControlSupplier;

        public static QueryInProgress noop() {
            return new QueryInProgress(() -> {}, () -> NoopFlowControl.INSTANCE);
        }

        public QueryInProgress(Runnable cancelHandler, Supplier<FlowControl> flowControlSupplier) {
            this.flowControlSupplier = flowControlSupplier;
            this.cancelHandler.whenComplete((r, e) -> {
                FlowControl flowControl = (FlowControl)flowControlSupplier.get();
                if (flowControl != null) {
                    flowControl.cancel();
                }
                cancelHandler.run();
            });
        }

        public CompletableFuture<Void> whenComplete() {
            return this.cancelHandler;
        }

        public void cancel() {
            this.whenComplete().complete(null);
        }

        public void request(long requested) {
            FlowControl flowControl = this.flowControlSupplier.get();
            if (flowControl != null) {
                flowControl.request(requested);
            }
        }
    }
}

