/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.query.impl;

import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.InstructionHandler;
import io.axoniq.axonserver.connector.Registration;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.connector.ResultStream;
import io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel;
import io.axoniq.axonserver.connector.impl.AbstractBufferedStream;
import io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream;
import io.axoniq.axonserver.connector.impl.AxonServerManagedChannel;
import io.axoniq.axonserver.connector.impl.ObjectUtils;
import io.axoniq.axonserver.connector.query.QueryChannel;
import io.axoniq.axonserver.connector.query.QueryDefinition;
import io.axoniq.axonserver.connector.query.QueryHandler;
import io.axoniq.axonserver.connector.query.SubscriptionQueryResult;
import io.axoniq.axonserver.connector.query.impl.SubscriptionQueryStream;
import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.InstructionAck;
import io.axoniq.axonserver.grpc.SerializedObject;
import io.axoniq.axonserver.grpc.control.ClientIdentification;
import io.axoniq.axonserver.grpc.query.QueryComplete;
import io.axoniq.axonserver.grpc.query.QueryProviderInbound;
import io.axoniq.axonserver.grpc.query.QueryProviderOutbound;
import io.axoniq.axonserver.grpc.query.QueryRequest;
import io.axoniq.axonserver.grpc.query.QueryResponse;
import io.axoniq.axonserver.grpc.query.QueryServiceGrpc;
import io.axoniq.axonserver.grpc.query.QuerySubscription;
import io.axoniq.axonserver.grpc.query.QueryUpdate;
import io.axoniq.axonserver.grpc.query.QueryUpdateComplete;
import io.axoniq.axonserver.grpc.query.SubscriptionQuery;
import io.axoniq.axonserver.grpc.query.SubscriptionQueryRequest;
import io.axoniq.axonserver.grpc.query.SubscriptionQueryResponse;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryChannelImpl
extends AbstractAxonServerChannel
implements QueryChannel {
    private static final Logger logger = LoggerFactory.getLogger(QueryChannelImpl.class);
    private static final QueryResponse TERMINAL = QueryResponse.newBuilder().setErrorCode("__TERMINAL__").build();
    private final AtomicReference<StreamObserver<QueryProviderOutbound>> outboundQueryStream = new AtomicReference();
    private final Set<QueryDefinition> supportedQueries = new CopyOnWriteArraySet<QueryDefinition>();
    private final ConcurrentMap<String, Set<QueryHandler>> queryHandlers = new ConcurrentHashMap<String, Set<QueryHandler>>();
    private final ConcurrentMap<Enum<?>, InstructionHandler<QueryProviderInbound, QueryProviderOutbound>> instructionHandlers = new ConcurrentHashMap();
    private final ClientIdentification clientIdentification;
    private final int permits;
    private final int permitsBatch;
    private final Object queryHandlerMonitor = new Object();
    private final Map<String, Set<Registration>> subscriptionQueries = new ConcurrentHashMap<String, Set<Registration>>();
    private final QueryServiceGrpc.QueryServiceStub queryServiceStub;

    public QueryChannelImpl(ClientIdentification clientIdentification, int permits, int permitsBatch, ScheduledExecutorService executor, AxonServerManagedChannel channel) {
        super(executor, channel);
        this.clientIdentification = clientIdentification;
        this.permits = permits;
        this.permitsBatch = permitsBatch;
        this.instructionHandlers.put(QueryProviderInbound.RequestCase.QUERY, this::handleQuery);
        this.instructionHandlers.put(QueryProviderInbound.RequestCase.ACK, this::handleAck);
        this.instructionHandlers.put(SubscriptionQueryRequest.RequestCase.GET_INITIAL_RESULT, this::getInitialResult);
        this.instructionHandlers.put(SubscriptionQueryRequest.RequestCase.SUBSCRIBE, this::subscribeToQueryUpdates);
        this.instructionHandlers.put(SubscriptionQueryRequest.RequestCase.UNSUBSCRIBE, this::unsubscribeToQueryUpdates);
        this.queryServiceStub = QueryServiceGrpc.newStub((Channel)channel);
    }

    private void handleAck(QueryProviderInbound query, ReplyChannel<QueryProviderOutbound> result) {
        result.complete();
    }

    private void unsubscribeToQueryUpdates(QueryProviderInbound query, ReplyChannel<QueryProviderOutbound> result) {
        SubscriptionQuery unsubscribe = query.getSubscriptionQueryRequest().getUnsubscribe();
        Set<Registration> registration = this.subscriptionQueries.remove(unsubscribe.getSubscriptionIdentifier());
        if (registration != null) {
            registration.forEach(Registration::cancel);
        }
    }

    private void subscribeToQueryUpdates(QueryProviderInbound query, final ReplyChannel<QueryProviderOutbound> result) {
        SubscriptionQuery subscribe = query.getSubscriptionQueryRequest().getSubscribe();
        final String subscriptionIdentifier = subscribe.getSubscriptionIdentifier();
        Set<QueryHandler> handlers = this.queryHandlers.getOrDefault(subscribe.getQueryRequest().getQuery(), Collections.emptySet());
        handlers.forEach(e -> {
            Registration registration = e.registerSubscriptionQuery(subscribe, new QueryHandler.UpdateHandler(){

                @Override
                public void sendUpdate(QueryUpdate queryUpdate) {
                    result.send(QueryProviderOutbound.newBuilder().setSubscriptionQueryResponse(SubscriptionQueryResponse.newBuilder().setSubscriptionIdentifier(subscriptionIdentifier).setUpdate(queryUpdate).build()).build());
                }

                @Override
                public void complete() {
                    result.send(QueryProviderOutbound.newBuilder().setSubscriptionQueryResponse(SubscriptionQueryResponse.newBuilder().setSubscriptionIdentifier(subscriptionIdentifier).setComplete(QueryUpdateComplete.newBuilder().setClientId(QueryChannelImpl.this.clientIdentification.getClientId()).setComponentName(QueryChannelImpl.this.clientIdentification.getComponentName()).build()).build()).build());
                }
            });
            if (registration != null) {
                this.subscriptionQueries.compute(subscriptionIdentifier, (k, v) -> v != null ? v : new CopyOnWriteArraySet()).add(registration);
            }
        });
    }

    @Override
    public synchronized void connect() {
        if (this.outboundQueryStream.get() != null) {
            return;
        }
        IncomingQueryInstructionStream responseObserver = new IncomingQueryInstructionStream(this.clientIdentification.getClientId(), this.permits, this.permitsBatch, e -> this.scheduleReconnect());
        this.queryServiceStub.openStream((StreamObserver<QueryProviderInbound>)responseObserver);
        StreamObserver newValue = responseObserver.getInstructionsForPlatform();
        StreamObserver previous = this.outboundQueryStream.getAndSet(newValue);
        this.supportedQueries.forEach(k -> newValue.onNext((Object)this.buildSubscribeMessage(k.getQueryName(), k.getResultType(), UUID.randomUUID().toString())));
        responseObserver.enableFlowControl();
        logger.info("QueryChannel connected, {} query types registered", (Object)this.queryHandlers.size());
        ObjectUtils.silently(previous, StreamObserver::onCompleted);
    }

    private QueryProviderOutbound buildSubscribeMessage(String queryName, String resultName, String instructionId) {
        return QueryProviderOutbound.newBuilder().setInstructionId(instructionId).setSubscribe(QuerySubscription.newBuilder().setMessageId(instructionId).setQuery(queryName).setResultName(resultName).setClientId(this.clientIdentification.getClientId()).setComponentName(this.clientIdentification.getComponentName())).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Registration registerQueryHandler(QueryHandler handler, QueryDefinition ... queryDefinitions) {
        Object object = this.queryHandlerMonitor;
        synchronized (object) {
            for (QueryDefinition queryDefinition : queryDefinitions) {
                this.queryHandlers.computeIfAbsent(queryDefinition.getQueryName(), k -> new CopyOnWriteArraySet()).add(handler);
                boolean firstRegistration = this.supportedQueries.add(queryDefinition);
                if (firstRegistration) {
                    ObjectUtils.doIfNotNull(this.outboundQueryStream.get(), s -> s.onNext((Object)this.buildSubscribeMessage(queryDefinition.getQueryName(), queryDefinition.getResultType(), "")));
                }
                logger.info("Registered handler for query {}", (Object)queryDefinition);
            }
        }
        return () -> {
            Object object = this.queryHandlerMonitor;
            synchronized (object) {
                for (QueryDefinition queryDefinition : queryDefinitions) {
                    Set refs = (Set)this.queryHandlers.get(queryDefinition.getQueryName());
                    if (refs == null || !refs.remove(handler) || !refs.isEmpty()) continue;
                    this.queryHandlers.remove(queryDefinition.getQueryName());
                    this.sendUnsubscribe(queryDefinition);
                }
            }
        };
    }

    private void sendUnsubscribe(QueryDefinition queryDefinition) {
        String instructionId = UUID.randomUUID().toString();
        ObjectUtils.doIfNotNull(this.outboundQueryStream.get(), s -> s.onNext((Object)QueryProviderOutbound.newBuilder().setInstructionId(instructionId).setUnsubscribe(QuerySubscription.newBuilder().setMessageId(instructionId).setQuery(queryDefinition.getQueryName()).setResultName(queryDefinition.getResultType()).setClientId(this.clientIdentification.getClientId()).setComponentName(this.clientIdentification.getComponentName())).build()));
    }

    @Override
    public ResultStream<QueryResponse> query(QueryRequest query) {
        AbstractBufferedStream<QueryResponse, QueryRequest> results = new AbstractBufferedStream<QueryResponse, QueryRequest>(this.clientIdentification.getClientId(), Integer.MAX_VALUE, 0){

            @Override
            protected QueryRequest buildFlowControlMessage(FlowControl flowControl) {
                return null;
            }

            @Override
            protected QueryResponse terminalMessage() {
                return TERMINAL;
            }

            @Override
            public void close() {
            }
        };
        this.queryServiceStub.query(query, (StreamObserver<QueryResponse>)results);
        return results;
    }

    @Override
    public SubscriptionQueryResult subscriptionQuery(final QueryRequest query, SerializedObject updateResponseType, int bufferSize, int fetchSize) {
        final String subscriptionId = UUID.randomUUID().toString();
        final CompletableFuture<QueryResponse> initialResultFuture = new CompletableFuture<QueryResponse>();
        final SubscriptionQueryStream subscriptionStream = new SubscriptionQueryStream(subscriptionId, initialResultFuture, this.clientIdentification.getClientId(), bufferSize, fetchSize);
        final StreamObserver<SubscriptionQueryRequest> upstream = this.queryServiceStub.subscription((StreamObserver<SubscriptionQueryResponse>)subscriptionStream);
        subscriptionStream.enableFlowControl();
        upstream.onNext((Object)SubscriptionQueryRequest.newBuilder().setSubscribe(SubscriptionQuery.newBuilder().setQueryRequest(query).setSubscriptionIdentifier(subscriptionId).setUpdateResponseType(updateResponseType).build()).build());
        return new SubscriptionQueryResult(){
            private final AtomicBoolean initialResultRequested = new AtomicBoolean();

            @Override
            public CompletableFuture<QueryResponse> initialResult() {
                if (!initialResultFuture.isDone() && !this.initialResultRequested.getAndSet(true)) {
                    upstream.onNext((Object)SubscriptionQueryRequest.newBuilder().setGetInitialResult(SubscriptionQuery.newBuilder().setQueryRequest(query).setSubscriptionIdentifier(subscriptionId)).build());
                }
                return initialResultFuture;
            }

            @Override
            public ResultStream<QueryUpdate> updates() {
                return subscriptionStream.buffer();
            }
        };
    }

    @Override
    public void disconnect() {
        ObjectUtils.doIfNotNull(this.outboundQueryStream.getAndSet(null), StreamObserver::onCompleted);
        this.cancelAllSubscriptionQueries();
    }

    @Override
    public void prepareDisconnect() {
        this.supportedQueries.forEach(this::sendUnsubscribe);
        this.cancelAllSubscriptionQueries();
    }

    private void cancelAllSubscriptionQueries() {
        this.subscriptionQueries.forEach((k, v) -> this.subscriptionQueries.remove(k).forEach(Registration::cancel));
    }

    @Override
    public boolean isConnected() {
        return this.outboundQueryStream.get() != null;
    }

    public void doHandleQuery(QueryProviderInbound query, ReplyChannel<QueryResponse> responseHandler) {
        this.doHandleQuery(query.getQuery(), responseHandler);
    }

    public void doHandleQuery(final QueryRequest query, final ReplyChannel<QueryResponse> responseHandler) {
        Set<QueryHandler> handlers = this.queryHandlers.getOrDefault(query.getQuery(), Collections.emptySet());
        if (handlers.isEmpty()) {
            responseHandler.sendNack();
            responseHandler.sendLast(QueryResponse.newBuilder().setRequestIdentifier(query.getMessageIdentifier()).setErrorCode(ErrorCategory.NO_HANDLER_FOR_QUERY.errorCode()).setErrorMessage(ErrorMessage.newBuilder().setMessage("No handler for query").build()).build());
        }
        responseHandler.sendAck();
        final AtomicInteger completeCounter = new AtomicInteger(handlers.size());
        handlers.forEach(queryHandler -> queryHandler.handle(query, new ReplyChannel<QueryResponse>(){

            @Override
            public void send(QueryResponse response) {
                if (!query.getMessageIdentifier().equals(response.getRequestIdentifier())) {
                    logger.debug("RequestIdentifier not properly set, modifying message");
                    QueryResponse newResponse = response.toBuilder().setRequestIdentifier(query.getMessageIdentifier()).build();
                    responseHandler.send(newResponse);
                } else {
                    responseHandler.send(response);
                }
            }

            @Override
            public void complete() {
                if (completeCounter.decrementAndGet() == 0) {
                    responseHandler.complete();
                }
            }

            @Override
            public void completeWithError(ErrorMessage errorMessage) {
                responseHandler.completeWithError(errorMessage);
            }

            @Override
            public void completeWithError(ErrorCategory errorCategory, String message) {
                responseHandler.completeWithError(errorCategory, message);
            }

            @Override
            public void sendNack(ErrorMessage errorMessage) {
                responseHandler.sendNack(errorMessage);
            }

            @Override
            public void sendAck() {
                responseHandler.sendAck();
            }
        }));
    }

    private void handleQuery(final QueryProviderInbound inbound, final ReplyChannel<QueryProviderOutbound> result) {
        this.doHandleQuery(inbound, new ReplyChannel<QueryResponse>(){

            @Override
            public void send(QueryResponse response) {
                result.send(QueryProviderOutbound.newBuilder().setQueryResponse(response).build());
            }

            @Override
            public void complete() {
                result.send(QueryProviderOutbound.newBuilder().setQueryComplete(QueryComplete.newBuilder().setRequestId(inbound.getQuery().getMessageIdentifier()).setMessageId(UUID.randomUUID().toString()).build()).build());
                result.complete();
            }

            @Override
            public void completeWithError(ErrorMessage errorMessage) {
                result.completeWithError(errorMessage);
            }

            @Override
            public void completeWithError(ErrorCategory errorCategory, String message) {
                result.completeWithError(errorCategory, message);
            }

            @Override
            public void sendNack(ErrorMessage errorMessage) {
                result.sendNack(errorMessage);
            }

            @Override
            public void sendAck() {
                result.sendAck();
            }
        });
    }

    private void getInitialResult(QueryProviderInbound query, final ReplyChannel<QueryProviderOutbound> result) {
        final String subscriptionId = query.getSubscriptionQueryRequest().getGetInitialResult().getSubscriptionIdentifier();
        this.doHandleQuery(query.getSubscriptionQueryRequest().getGetInitialResult().getQueryRequest(), new ReplyChannel<QueryResponse>(){

            @Override
            public void send(QueryResponse response) {
                result.send(QueryProviderOutbound.newBuilder().setSubscriptionQueryResponse(SubscriptionQueryResponse.newBuilder().setSubscriptionIdentifier(subscriptionId).setInitialResult(response).setMessageIdentifier(response.getMessageIdentifier()).build()).build());
            }

            @Override
            public void complete() {
                result.sendAck();
            }

            @Override
            public void completeWithError(ErrorMessage errorMessage) {
                result.completeWithError(errorMessage);
            }

            @Override
            public void completeWithError(ErrorCategory errorCategory, String message) {
                result.completeWithError(errorCategory, message);
            }

            @Override
            public void sendNack(ErrorMessage errorMessage) {
                result.sendNack(errorMessage);
            }

            @Override
            public void sendAck() {
                result.sendAck();
            }
        });
    }

    private class IncomingQueryInstructionStream
    extends AbstractIncomingInstructionStream<QueryProviderInbound, QueryProviderOutbound> {
        public IncomingQueryInstructionStream(String clientId, int permits, int permitsBatch, Consumer<Throwable> disconnectHandler) {
            super(clientId, permits, permitsBatch, disconnectHandler);
        }

        @Override
        protected QueryProviderOutbound buildFlowControlMessage(FlowControl flowControl) {
            return QueryProviderOutbound.newBuilder().setFlowControl(flowControl).build();
        }

        @Override
        protected QueryProviderOutbound buildAckMessage(InstructionAck ack) {
            return QueryProviderOutbound.newBuilder().setAck(ack).build();
        }

        @Override
        protected String getInstructionId(QueryProviderInbound value) {
            return value.getInstructionId();
        }

        @Override
        protected InstructionHandler<QueryProviderInbound, QueryProviderOutbound> getHandler(QueryProviderInbound request) {
            if (request.getRequestCase() == QueryProviderInbound.RequestCase.SUBSCRIPTION_QUERY_REQUEST) {
                return (InstructionHandler)QueryChannelImpl.this.instructionHandlers.get((Object)request.getSubscriptionQueryRequest().getRequestCase());
            }
            return (InstructionHandler)QueryChannelImpl.this.instructionHandlers.get((Object)request.getRequestCase());
        }

        @Override
        protected boolean unregisterOutboundStream(StreamObserver<QueryProviderOutbound> expected) {
            if (QueryChannelImpl.this.outboundQueryStream.compareAndSet(expected, null)) {
                QueryChannelImpl.this.cancelAllSubscriptionQueries();
                return true;
            }
            return false;
        }
    }
}

