/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.query;

import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.connector.ResultStream;
import io.axoniq.axonserver.connector.query.QueryDefinition;
import io.axoniq.axonserver.connector.query.QueryHandler;
import io.axoniq.axonserver.connector.query.SubscriptionQueryResult;
import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.query.QueryProviderInbound;
import io.axoniq.axonserver.grpc.query.QueryProviderOutbound;
import io.axoniq.axonserver.grpc.query.QueryRequest;
import io.axoniq.axonserver.grpc.query.QueryResponse;
import io.axoniq.axonserver.grpc.query.QueryUpdate;
import io.axoniq.axonserver.grpc.query.SubscriptionQuery;
import io.grpc.stub.StreamObserver;
import io.netty.util.internal.OutOfDirectMemoryError;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Type;
import java.util.Objects;
import java.util.Spliterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.axonserver.connector.DispatchInterceptors;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.InstructionAckSource;
import org.axonframework.axonserver.connector.TargetContextResolver;
import org.axonframework.axonserver.connector.command.AxonServerRegistration;
import org.axonframework.axonserver.connector.query.AxonServerQueryDispatchException;
import org.axonframework.axonserver.connector.query.QueryPriorityCalculator;
import org.axonframework.axonserver.connector.query.QuerySerializer;
import org.axonframework.axonserver.connector.query.subscription.AxonServerSubscriptionQueryResult;
import org.axonframework.axonserver.connector.query.subscription.SubscriptionMessageSerializer;
import org.axonframework.axonserver.connector.util.ExceptionSerializer;
import org.axonframework.axonserver.connector.util.ExecutorServiceBuilder;
import org.axonframework.axonserver.connector.util.ProcessingInstructionHelper;
import org.axonframework.axonserver.connector.util.UpstreamAwareStreamObserver;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.AxonException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.axonframework.lifecycle.ShutdownHandler;
import org.axonframework.lifecycle.ShutdownLatch;
import org.axonframework.lifecycle.StartHandler;
import org.axonframework.messaging.Distributed;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.responsetypes.ResponseType;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryMessage;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.QueryUpdateEmitter;
import org.axonframework.queryhandling.SubscriptionQueryBackpressure;
import org.axonframework.queryhandling.SubscriptionQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.axonframework.queryhandling.UpdateHandlerRegistration;
import org.axonframework.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AxonServerQueryBus
implements QueryBus,
Distributed<QueryBus> {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final int DIRECT_QUERY_NUMBER_OF_RESULTS = 1;
    private static final long DIRECT_QUERY_TIMEOUT_MS = TimeUnit.HOURS.toMillis(1L);
    private static final int SCATTER_GATHER_NUMBER_OF_RESULTS = -1;
    private static final int QUERY_QUEUE_CAPACITY = 1000;
    private final AxonServerConnectionManager axonServerConnectionManager;
    private final AxonServerConfiguration configuration;
    private final QueryUpdateEmitter updateEmitter;
    private final QueryBus localSegment;
    private final QuerySerializer serializer;
    private final SubscriptionMessageSerializer subscriptionSerializer;
    private final QueryPriorityCalculator priorityCalculator;
    private final DispatchInterceptors<QueryMessage<?, ?>> dispatchInterceptors;
    private final TargetContextResolver<? super QueryMessage<?, ?>> targetContextResolver;
    private final ShutdownLatch shutdownLatch = new ShutdownLatch();
    private final ExecutorService queryExecutor;
    private final QueryHandler localSegmentAdapter;
    private final String context;

    public static Builder builder() {
        return new Builder();
    }

    public AxonServerQueryBus(Builder builder) {
        builder.validate();
        this.axonServerConnectionManager = builder.axonServerConnectionManager;
        this.configuration = builder.configuration;
        this.updateEmitter = builder.updateEmitter;
        this.localSegment = builder.localSegment;
        this.serializer = builder.buildQuerySerializer();
        this.subscriptionSerializer = builder.buildSubscriptionMessageSerializer();
        this.priorityCalculator = builder.priorityCalculator;
        this.context = this.configuration.getContext();
        this.targetContextResolver = builder.targetContextResolver.orElse(m -> this.context);
        this.dispatchInterceptors = new DispatchInterceptors();
        PriorityBlockingQueue queryProcessQueue = new PriorityBlockingQueue(1000);
        this.queryExecutor = (ExecutorService)builder.executorServiceBuilder.apply(this.configuration, queryProcessQueue);
        this.localSegmentAdapter = new LocalSegmentAdapter();
    }

    @StartHandler(phase=0x1FFFFFFF)
    public void start() {
        this.shutdownLatch.initialize();
    }

    public <R> Registration subscribe(String queryName, Type responseType, MessageHandler<? super QueryMessage<?, R>> handler) {
        Registration localRegistration = this.localSegment.subscribe(queryName, responseType, handler);
        QueryDefinition queryDefinition = new QueryDefinition(queryName, responseType);
        io.axoniq.axonserver.connector.Registration serverRegistration = this.axonServerConnectionManager.getConnection(this.context).queryChannel().registerQueryHandler(this.localSegmentAdapter, new QueryDefinition[]{queryDefinition});
        return new AxonServerRegistration(localRegistration, () -> ((io.axoniq.axonserver.connector.Registration)serverRegistration).cancel());
    }

    public <Q, R> CompletableFuture<QueryResponseMessage<R>> query(QueryMessage<Q, R> queryMessage) {
        this.shutdownLatch.ifShuttingDown(String.format("Cannot dispatch new %s as this bus is being shut down", "queries"));
        QueryMessage<Q, R> interceptedQuery = this.dispatchInterceptors.intercept(queryMessage);
        ShutdownLatch.ActivityHandle queryInTransit = this.shutdownLatch.registerActivity();
        CompletableFuture queryTransaction = new CompletableFuture();
        try {
            String targetContext = this.targetContextResolver.resolveContext(interceptedQuery);
            int priority = this.priorityCalculator.determinePriority(interceptedQuery);
            QueryRequest queryRequest = this.serializer.serializeRequest(interceptedQuery, 1, DIRECT_QUERY_TIMEOUT_MS, priority);
            ResultStream result = this.axonServerConnectionManager.getConnection(targetContext).queryChannel().query(queryRequest);
            ResponseProcessingTask responseProcessingTask = new ResponseProcessingTask((ResultStream<QueryResponse>)result, this.serializer, queryTransaction, priority, queryMessage.getResponseType());
            result.onAvailable(() -> this.queryExecutor.execute(responseProcessingTask));
        }
        catch (Exception e2) {
            logger.debug("There was a problem issuing a query {}.", interceptedQuery, (Object)e2);
            AxonException exception = ErrorCode.QUERY_DISPATCH_ERROR.convert(this.configuration.getClientId(), e2);
            queryTransaction.completeExceptionally((Throwable)exception);
        }
        return queryTransaction.whenComplete((r, e) -> queryInTransit.end());
    }

    public <Q, R> Stream<QueryResponseMessage<R>> scatterGather(QueryMessage<Q, R> queryMessage, long timeout, TimeUnit timeUnit) {
        this.shutdownLatch.ifShuttingDown(String.format("Cannot dispatch new %s as this bus is being shut down", "scatter-gather queries"));
        QueryMessage<Q, R> interceptedQuery = this.dispatchInterceptors.intercept(queryMessage);
        ShutdownLatch.ActivityHandle queryInTransit = this.shutdownLatch.registerActivity();
        long deadline = System.currentTimeMillis() + timeUnit.toMillis(timeout);
        try {
            String targetContext = this.targetContextResolver.resolveContext(interceptedQuery);
            QueryRequest queryRequest = this.serializer.serializeRequest(interceptedQuery, -1, timeUnit.toMillis(timeout), this.priorityCalculator.determinePriority(interceptedQuery));
            ResultStream queryResult = this.axonServerConnectionManager.getConnection(targetContext).queryChannel().query(queryRequest);
            return (Stream)StreamSupport.stream(new QueryResponseSpliterator<Q, R>(queryMessage, (ResultStream<QueryResponse>)queryResult, deadline, this.serializer, () -> ((ShutdownLatch.ActivityHandle)queryInTransit).end()), false).onClose(() -> ((ShutdownLatch.ActivityHandle)queryInTransit).end());
        }
        catch (Exception e) {
            logger.debug("There was a problem issuing a scatter-gather query {}.", interceptedQuery, (Object)e);
            queryInTransit.end();
            throw e;
        }
    }

    @Deprecated
    public <Q, I, U> org.axonframework.queryhandling.SubscriptionQueryResult<QueryResponseMessage<I>, SubscriptionQueryUpdateMessage<U>> subscriptionQuery(SubscriptionQueryMessage<Q, I, U> query, SubscriptionQueryBackpressure backPressure, int updateBufferSize) {
        return this.subscriptionQuery(query, updateBufferSize);
    }

    public <Q, I, U> org.axonframework.queryhandling.SubscriptionQueryResult<QueryResponseMessage<I>, SubscriptionQueryUpdateMessage<U>> subscriptionQuery(SubscriptionQueryMessage<Q, I, U> query, int updateBufferSize) {
        this.shutdownLatch.ifShuttingDown(String.format("Cannot dispatch new %s as this bus is being shut down", "subscription queries"));
        SubscriptionQueryMessage<Q, I, U> interceptedQuery = this.dispatchInterceptors.intercept(query);
        String subscriptionId = interceptedQuery.getIdentifier();
        String targetContext = this.targetContextResolver.resolveContext((QueryMessage<?, ?>)interceptedQuery);
        logger.debug("Subscription Query requested with subscription Id [{}]", (Object)subscriptionId);
        SubscriptionQueryResult result = this.axonServerConnectionManager.getConnection(targetContext).queryChannel().subscriptionQuery(this.subscriptionSerializer.serializeQuery(interceptedQuery), this.subscriptionSerializer.serializeUpdateType(interceptedQuery), this.configuration.getQueryFlowControl().getInitialNrOfPermits().intValue(), this.configuration.getQueryFlowControl().getNrOfNewPermits().intValue());
        return new AxonServerSubscriptionQueryResult(result, this.subscriptionSerializer);
    }

    public QueryUpdateEmitter queryUpdateEmitter() {
        return this.updateEmitter;
    }

    public QueryBus localSegment() {
        return this.localSegment;
    }

    public Registration registerHandlerInterceptor(MessageHandlerInterceptor<? super QueryMessage<?, ?>> interceptor) {
        return this.localSegment.registerHandlerInterceptor(interceptor);
    }

    public Registration registerDispatchInterceptor(MessageDispatchInterceptor<? super QueryMessage<?, ?>> dispatchInterceptor) {
        return this.dispatchInterceptors.registerDispatchInterceptor(dispatchInterceptor);
    }

    @ShutdownHandler(phase=0x1FFFFFFF)
    public void disconnect() {
        if (this.axonServerConnectionManager.isConnected(this.context)) {
            this.axonServerConnectionManager.getConnection(this.context).queryChannel().prepareDisconnect();
        }
    }

    @ShutdownHandler(phase=0)
    public CompletableFuture<Void> shutdownDispatching() {
        return this.shutdownLatch.initiateShutdown();
    }

    private static class QueryResponseSpliterator<Q, R>
    implements Spliterator<QueryResponseMessage<R>> {
        private final QueryMessage<Q, R> queryMessage;
        private final ResultStream<QueryResponse> queryResult;
        private final long deadline;
        private final QuerySerializer serializer;
        private final Runnable closeHandler;

        public QueryResponseSpliterator(QueryMessage<Q, R> queryMessage, ResultStream<QueryResponse> queryResult, long deadline, QuerySerializer serializer, Runnable closeHandler) {
            this.queryMessage = queryMessage;
            this.queryResult = queryResult;
            this.deadline = deadline;
            this.serializer = serializer;
            this.closeHandler = closeHandler;
        }

        @Override
        public boolean tryAdvance(Consumer<? super QueryResponseMessage<R>> action) {
            QueryResponse next;
            long remaining = this.deadline - System.currentTimeMillis();
            if (remaining > 0L) {
                try {
                    next = (QueryResponse)this.queryResult.nextIfAvailable(remaining, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    this.closeHandler.run();
                    return false;
                }
            } else {
                next = (QueryResponse)this.queryResult.nextIfAvailable();
            }
            if (next != null) {
                action.accept(this.serializer.deserializeResponse(next, this.queryMessage.getResponseType()));
                return true;
            }
            this.queryResult.close();
            this.closeHandler.run();
            return false;
        }

        @Override
        public Spliterator<QueryResponseMessage<R>> trySplit() {
            return null;
        }

        @Override
        public long estimateSize() {
            return Long.MAX_VALUE;
        }

        @Override
        public int characteristics() {
            return 0;
        }
    }

    private static class ResponseProcessingTask<R>
    extends PriorityTask {
        private final AtomicBoolean singleExecutionCheck = new AtomicBoolean();
        private final ResultStream<QueryResponse> result;
        private final QuerySerializer serializer;
        private final CompletableFuture<QueryResponseMessage<R>> queryTransaction;
        private final ResponseType<R> expectedResponseType;

        public ResponseProcessingTask(ResultStream<QueryResponse> result, QuerySerializer serializer, CompletableFuture<QueryResponseMessage<R>> queryTransaction, int priority, ResponseType<R> expectedResponseType) {
            super(priority);
            this.result = result;
            this.serializer = serializer;
            this.queryTransaction = queryTransaction;
            this.expectedResponseType = expectedResponseType;
        }

        @Override
        public void run() {
            if (this.singleExecutionCheck.compareAndSet(false, true)) {
                QueryResponse nextAvailable = (QueryResponse)this.result.nextIfAvailable();
                if (nextAvailable != null) {
                    this.queryTransaction.complete(this.serializer.deserializeResponse(nextAvailable, this.expectedResponseType));
                } else if (this.result.isClosed() && !this.queryTransaction.isDone()) {
                    Exception exception = (Exception)this.result.getError().map(ErrorCode.QUERY_DISPATCH_ERROR::convert).orElse(new AxonServerQueryDispatchException(ErrorCode.QUERY_DISPATCH_ERROR.errorCode(), "Query did not yield the expected number of results."));
                    this.queryTransaction.completeExceptionally(exception);
                }
            }
        }
    }

    public static class Builder {
        private AxonServerConnectionManager axonServerConnectionManager;
        private AxonServerConfiguration configuration;
        private QueryBus localSegment;
        private QueryUpdateEmitter updateEmitter;
        private Serializer messageSerializer;
        private Serializer genericSerializer;
        private QueryPriorityCalculator priorityCalculator = QueryPriorityCalculator.defaultQueryPriorityCalculator();
        private TargetContextResolver<? super QueryMessage<?, ?>> targetContextResolver = q -> this.configuration.getContext();
        private ExecutorServiceBuilder executorServiceBuilder = ExecutorServiceBuilder.defaultQueryExecutorServiceBuilder();

        public Builder axonServerConnectionManager(AxonServerConnectionManager axonServerConnectionManager) {
            BuilderUtils.assertNonNull((Object)axonServerConnectionManager, (String)"AxonServerConnectionManager may not be null");
            this.axonServerConnectionManager = axonServerConnectionManager;
            return this;
        }

        public Builder configuration(AxonServerConfiguration configuration) {
            BuilderUtils.assertNonNull((Object)configuration, (String)"AxonServerConfiguration may not be null");
            this.configuration = configuration;
            return this;
        }

        public Builder localSegment(QueryBus localSegment) {
            BuilderUtils.assertNonNull((Object)localSegment, (String)"Local QueryBus may not be null");
            this.localSegment = localSegment;
            return this;
        }

        public Builder updateEmitter(QueryUpdateEmitter updateEmitter) {
            BuilderUtils.assertNonNull((Object)updateEmitter, (String)"QueryUpdateEmitter may not be null");
            this.updateEmitter = updateEmitter;
            return this;
        }

        public Builder messageSerializer(Serializer messageSerializer) {
            BuilderUtils.assertNonNull((Object)messageSerializer, (String)"Message Serializer may not be null");
            this.messageSerializer = messageSerializer;
            return this;
        }

        public Builder genericSerializer(Serializer genericSerializer) {
            BuilderUtils.assertNonNull((Object)genericSerializer, (String)"Generic Serializer may not be null");
            this.genericSerializer = genericSerializer;
            return this;
        }

        public Builder priorityCalculator(QueryPriorityCalculator priorityCalculator) {
            BuilderUtils.assertNonNull(this.targetContextResolver, (String)"QueryPriorityCalculator may not be null");
            this.priorityCalculator = priorityCalculator;
            return this;
        }

        public Builder targetContextResolver(TargetContextResolver<? super QueryMessage<?, ?>> targetContextResolver) {
            BuilderUtils.assertNonNull(targetContextResolver, (String)"TargetContextResolver may not be null");
            this.targetContextResolver = targetContextResolver;
            return this;
        }

        public Builder executorServiceBuilder(ExecutorServiceBuilder executorServiceBuilder) {
            BuilderUtils.assertNonNull((Object)executorServiceBuilder, (String)"ExecutorServiceBuilder may not be null");
            this.executorServiceBuilder = executorServiceBuilder;
            return this;
        }

        @Deprecated
        public Builder requestStreamFactory(Function<UpstreamAwareStreamObserver<QueryProviderInbound>, StreamObserver<QueryProviderOutbound>> requestStreamFactory) {
            return this;
        }

        @Deprecated
        public Builder instructionAckSource(InstructionAckSource<QueryProviderOutbound> instructionAckSource) {
            return this;
        }

        public AxonServerQueryBus build() {
            return new AxonServerQueryBus(this);
        }

        protected QuerySerializer buildQuerySerializer() {
            return new QuerySerializer(this.messageSerializer, this.genericSerializer, this.configuration);
        }

        protected SubscriptionMessageSerializer buildSubscriptionMessageSerializer() {
            return new SubscriptionMessageSerializer(this.messageSerializer, this.genericSerializer, this.configuration);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull((Object)this.axonServerConnectionManager, (String)"The AxonServerConnectionManager is a hard requirement and should be provided");
            BuilderUtils.assertNonNull((Object)this.configuration, (String)"The AxonServerConfiguration is a hard requirement and should be provided");
            BuilderUtils.assertNonNull((Object)this.localSegment, (String)"The Local QueryBus is a hard requirement and should be provided");
            BuilderUtils.assertNonNull((Object)this.updateEmitter, (String)"The QueryUpdateEmitter is a hard requirement and should be provided");
            BuilderUtils.assertNonNull((Object)this.messageSerializer, (String)"The Message Serializer is a hard requirement and should be provided");
            BuilderUtils.assertNonNull((Object)this.genericSerializer, (String)"The Generic Serializer is a hard requirement and should be provided");
        }
    }

    private static class QueryProcessingTask
    extends PriorityTask {
        private final QueryBus localSegment;
        private final QueryRequest queryRequest;
        private final ReplyChannel<QueryResponse> responseHandler;
        private final QuerySerializer serializer;
        private final String clientId;

        private QueryProcessingTask(QueryBus localSegment, QueryRequest queryRequest, ReplyChannel<QueryResponse> responseHandler, QuerySerializer serializer, String clientId) {
            super(ProcessingInstructionHelper.priority(queryRequest.getProcessingInstructionsList()));
            this.localSegment = localSegment;
            this.queryRequest = queryRequest;
            this.responseHandler = responseHandler;
            this.serializer = serializer;
            this.clientId = clientId;
        }

        @Override
        public void run() {
            try {
                logger.debug("Will process query [{}]", (Object)this.queryRequest.getQuery());
                QueryMessage queryMessage = this.serializer.deserializeRequest(this.queryRequest);
                if (ProcessingInstructionHelper.numberOfResults(this.queryRequest.getProcessingInstructionsList()) == 1L) {
                    this.localSegment.query(queryMessage).whenComplete((r, e) -> {
                        if (e != null) {
                            ErrorMessage ex = ExceptionSerializer.serialize(this.clientId, e);
                            QueryResponse response = QueryResponse.newBuilder().setErrorCode(ErrorCode.QUERY_EXECUTION_ERROR.errorCode()).setErrorMessage(ex).setRequestIdentifier(this.queryRequest.getMessageIdentifier()).build();
                            this.responseHandler.sendLast((Object)response);
                        } else {
                            this.responseHandler.sendLast((Object)this.serializer.serializeResponse((QueryResponseMessage<?>)r, this.queryRequest.getMessageIdentifier()));
                        }
                    });
                } else {
                    Stream result = this.localSegment.scatterGather(queryMessage, ProcessingInstructionHelper.timeout(this.queryRequest.getProcessingInstructionsList()), TimeUnit.MILLISECONDS);
                    result.forEach(r -> this.responseHandler.send((Object)this.serializer.serializeResponse((QueryResponseMessage<?>)r, this.queryRequest.getMessageIdentifier())));
                    this.responseHandler.complete();
                }
            }
            catch (OutOfDirectMemoryError | RuntimeException e2) {
                ErrorMessage ex = ExceptionSerializer.serialize(this.clientId, e2);
                this.responseHandler.sendLast((Object)QueryResponse.newBuilder().setErrorCode(ErrorCode.QUERY_EXECUTION_ERROR.errorCode()).setErrorMessage(ex).setRequestIdentifier(this.queryRequest.getMessageIdentifier()).build());
                logger.warn("Query Processor had an exception when processing query [{}]", (Object)this.queryRequest.getQuery(), (Object)e2);
            }
        }
    }

    private class LocalSegmentAdapter
    implements QueryHandler {
        private LocalSegmentAdapter() {
        }

        public void handle(QueryRequest query, ReplyChannel<QueryResponse> responseHandler) {
            QueryProcessingTask processingTask = new QueryProcessingTask(AxonServerQueryBus.this.localSegment, query, responseHandler, AxonServerQueryBus.this.serializer, AxonServerQueryBus.this.configuration.getClientId());
            AxonServerQueryBus.this.queryExecutor.execute(processingTask);
        }

        public io.axoniq.axonserver.connector.Registration registerSubscriptionQuery(SubscriptionQuery query, QueryHandler.UpdateHandler sendUpdate) {
            UpdateHandlerRegistration updateHandler = AxonServerQueryBus.this.updateEmitter.registerUpdateHandler(AxonServerQueryBus.this.subscriptionSerializer.deserialize(query), 1024);
            updateHandler.getUpdates().map(AxonServerQueryBus.this.subscriptionSerializer::serialize).doOnError(e -> {
                ErrorMessage error = ExceptionSerializer.serialize(AxonServerQueryBus.this.configuration.getClientId(), e);
                String errorCode = ErrorCode.QUERY_EXECUTION_ERROR.errorCode();
                QueryUpdate queryUpdate = QueryUpdate.newBuilder().setErrorMessage(error).setErrorCode(errorCode).build();
                sendUpdate.sendUpdate(queryUpdate);
                sendUpdate.complete();
            }).doOnComplete(() -> ((QueryHandler.UpdateHandler)sendUpdate).complete()).subscribe(arg_0 -> ((QueryHandler.UpdateHandler)sendUpdate).sendUpdate(arg_0));
            return () -> {
                updateHandler.getRegistration().close();
                return CompletableFuture.completedFuture(null);
            };
        }
    }

    private static abstract class PriorityTask
    implements Runnable,
    Comparable<PriorityTask> {
        private static final AtomicLong COUNTER = new AtomicLong(Long.MIN_VALUE);
        private final long priority;
        private final long index;

        public PriorityTask(long priority) {
            this.priority = priority;
            this.index = COUNTER.incrementAndGet();
        }

        public long getPriority() {
            return this.priority;
        }

        @Override
        public int compareTo(PriorityTask o) {
            int c = Long.compare(this.priority, o.priority);
            if (c != 0) {
                return -c;
            }
            return Long.compare(this.index, o.index);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            PriorityTask that = (PriorityTask)o;
            return this.priority == that.priority && this.index == that.index;
        }

        public int hashCode() {
            return Objects.hash(this.priority, this.index);
        }
    }
}

