/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.command.impl;

import io.axoniq.axonserver.connector.AxonServerException;
import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.InstructionHandler;
import io.axoniq.axonserver.connector.Registration;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.connector.command.CommandChannel;
import io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel;
import io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream;
import io.axoniq.axonserver.connector.impl.AsyncRegistration;
import io.axoniq.axonserver.connector.impl.AxonServerManagedChannel;
import io.axoniq.axonserver.connector.impl.ObjectUtils;
import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.InstructionAck;
import io.axoniq.axonserver.grpc.MetaDataValue;
import io.axoniq.axonserver.grpc.ProcessingInstruction;
import io.axoniq.axonserver.grpc.ProcessingKey;
import io.axoniq.axonserver.grpc.command.Command;
import io.axoniq.axonserver.grpc.command.CommandProviderInbound;
import io.axoniq.axonserver.grpc.command.CommandProviderOutbound;
import io.axoniq.axonserver.grpc.command.CommandResponse;
import io.axoniq.axonserver.grpc.command.CommandServiceGrpc;
import io.axoniq.axonserver.grpc.command.CommandSubscription;
import io.axoniq.axonserver.grpc.control.ClientIdentification;
import io.grpc.Channel;
import io.grpc.netty.shaded.io.netty.util.internal.OutOfDirectMemoryError;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommandChannelImpl
extends AbstractAxonServerChannel<CommandProviderOutbound>
implements CommandChannel {
    private static final Logger logger = LoggerFactory.getLogger(CommandChannelImpl.class);
    private final AtomicReference<CallStreamObserver<CommandProviderOutbound>> outboundCommandStream = new AtomicReference();
    private final ClientIdentification clientIdentification;
    private final ConcurrentMap<String, CommandHandler> commandHandlers = new ConcurrentHashMap<String, CommandHandler>();
    private final ConcurrentMap<CommandProviderInbound.RequestCase, InstructionHandler<CommandProviderInbound, CommandProviderOutbound>> handlers = new ConcurrentHashMap<CommandProviderInbound.RequestCase, InstructionHandler<CommandProviderInbound, CommandProviderOutbound>>();
    private final int permits;
    private final int permitsBatch;
    private final CommandServiceGrpc.CommandServiceStub commandServiceStub;
    private final CommandHandler noCommandHandler = new CommandHandler(c -> this.noHandlerForCommand(), 0);
    private final String context;
    private final Set<CompletableFuture<?>> inProgressCommands = ConcurrentHashMap.newKeySet();
    private final AtomicBoolean subscriptionsCompleted = new AtomicBoolean(false);

    public CommandChannelImpl(ClientIdentification clientIdentification, String context, int permits, int permitsBatch, ScheduledExecutorService executor, AxonServerManagedChannel channel) {
        super(clientIdentification, executor, channel);
        this.clientIdentification = clientIdentification;
        this.context = context;
        this.permits = permits;
        this.permitsBatch = permitsBatch;
        this.handlers.put(CommandProviderInbound.RequestCase.COMMAND, this::handleIncomingCommand);
        this.handlers.put(CommandProviderInbound.RequestCase.ACK, this::handleAck);
        this.commandServiceStub = CommandServiceGrpc.newStub((Channel)channel);
    }

    private void handleIncomingCommand(CommandProviderInbound message, ReplyChannel<CommandProviderOutbound> outbound) {
        Command command = message.getCommand();
        CommandHandler handler = (CommandHandler)this.commandHandlers.get(command.getName());
        if (handler == null) {
            handler = this.noCommandHandler;
        }
        CompletionStage result = ((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)handler.getHandler().apply(command).thenApply(CommandResponse::newBuilder)).exceptionally(e -> CommandResponse.newBuilder().setErrorCode(ErrorCategory.COMMAND_EXECUTION_ERROR.errorCode()).setErrorMessage(ErrorMessage.newBuilder().setMessage(e.getMessage())))).thenApply(r -> r.setRequestIdentifier(command.getMessageIdentifier()))).whenComplete((r, e) -> outbound.send(CommandProviderOutbound.newBuilder().setCommandResponse((CommandResponse.Builder)r).build()))).thenRun(outbound::complete);
        this.inProgressCommands.add((CompletableFuture<?>)result);
        ((CompletableFuture)result).whenComplete((arg_0, arg_1) -> this.lambda$handleIncomingCommand$4((CompletableFuture)result, arg_0, arg_1));
    }

    private CompletableFuture<CommandResponse> noHandlerForCommand() {
        CompletableFuture<CommandResponse> r = new CompletableFuture<CommandResponse>();
        r.completeExceptionally(new AxonServerException(ErrorCategory.NO_HANDLER_FOR_COMMAND, "No handler for command", this.clientIdentification.getClientId()));
        return r;
    }

    private void handleAck(CommandProviderInbound message, ReplyChannel<CommandProviderOutbound> outbound) {
        this.processAck(message.getAck());
        outbound.complete();
    }

    @Override
    public void connect() {
        if (!this.commandHandlers.isEmpty()) {
            logger.debug("CommandChannel for context '{}' will attempt to connect.", (Object)this.context);
            this.doCreateCommandStream();
        }
    }

    private synchronized void doCreateCommandStream() {
        if (this.outboundCommandStream.get() != null) {
            logger.debug("CommandChannel for context '{}' is already connected.", (Object)this.context);
            return;
        }
        this.subscriptionsCompleted.set(this.commandHandlers.isEmpty());
        IncomingCommandStream responseObserver = new IncomingCommandStream(this.clientIdentification.getClientId(), this.permits, this.permitsBatch, this::scheduleReconnect, this::registerOutboundStream);
        try {
            this.commandServiceStub.openStream((StreamObserver<CommandProviderInbound>)responseObserver);
        }
        catch (Exception e2) {
            responseObserver.onError(e2);
            return;
        }
        ClientCallStreamObserver newValue = responseObserver.getInstructionsForPlatform();
        this.commandHandlers.entrySet().stream().map(e -> this.sendSubscribe((String)e.getKey(), ((CommandHandler)e.getValue()).getLoadFactor(), (StreamObserver<CommandProviderOutbound>)newValue)).reduce((xva$0, xva$1) -> CompletableFuture.allOf(xva$0, xva$1)).orElse(CompletableFuture.completedFuture(null)).whenComplete((unused, throwable) -> {
            if (throwable != null) {
                logger.warn("An error occurred while registering command handlers", throwable);
            } else {
                logger.info("CommandChannel for context '{}' connected, {} command handlers registered", (Object)this.context, (Object)this.commandHandlers.size());
            }
            this.subscriptionsCompleted.set(throwable == null);
        });
        responseObserver.enableFlowControl();
    }

    private void registerOutboundStream(CallStreamObserver<CommandProviderOutbound> upstream) {
        StreamObserver previous = (StreamObserver)this.outboundCommandStream.getAndSet(upstream);
        if (previous != upstream) {
            ObjectUtils.silently(previous, StreamObserver::onCompleted);
        }
    }

    @Override
    public void reconnect() {
        logger.debug("Reconnecting CommandChannel for context '{}'.", (Object)this.context);
        this.disconnect();
        this.scheduleImmediateReconnect();
    }

    @Override
    public void disconnect() {
        logger.debug("Disconnecting CommandChannel for context '{}'.", (Object)this.context);
        StreamObserver previousOutbound = this.outboundCommandStream.getAndSet(null);
        CompletableFuture unsubscribed = previousOutbound == null ? CompletableFuture.completedFuture(null) : this.commandHandlers.keySet().stream().map(commandName -> this.sendUnsubscribe((String)commandName, (StreamObserver<CommandProviderOutbound>)previousOutbound)).reduce((xva$0, xva$1) -> CompletableFuture.allOf(xva$0, xva$1)).map(cf -> cf.exceptionally(e -> {
            logger.warn("An error occurred while deregistering command handlers", e);
            return null;
        })).orElseGet(() -> CompletableFuture.completedFuture(null));
        ((CompletableFuture)unsubscribed.thenCompose(r -> {
            if (!this.inProgressCommands.isEmpty()) {
                logger.info("Waiting for {} commands to be completed", (Object)this.inProgressCommands.size());
            }
            return CompletableFuture.allOf(this.inProgressCommands.stream().reduce((xva$0, xva$1) -> CompletableFuture.allOf(xva$0, xva$1)).map(cf -> cf.exceptionally(e -> null)).orElseGet(() -> CompletableFuture.completedFuture(null)));
        })).thenRun(() -> ObjectUtils.doIfNotNull(previousOutbound, StreamObserver::onCompleted));
        this.subscriptionsCompleted.set(false);
    }

    @Override
    public boolean isReady() {
        return this.commandHandlers.isEmpty() || this.outboundCommandStream.get() != null && this.subscriptionsCompleted.get();
    }

    @Override
    public Registration registerCommandHandler(Function<Command, CompletableFuture<CommandResponse>> handler, int loadFactor, String ... commandNames) {
        if (this.commandHandlers.isEmpty()) {
            this.doCreateCommandStream();
        }
        CompletableFuture<Object> subscriptionResult = CompletableFuture.completedFuture(null);
        CommandHandler commandHandler = new CommandHandler(handler, loadFactor);
        for (String commandName : commandNames) {
            this.commandHandlers.put(commandName, commandHandler);
            logger.info("Registered handler for command '{}' in context '{}'", (Object)commandName, (Object)this.context);
            CompletableFuture<Void> ack = this.sendSubscribe(commandName, loadFactor, (StreamObserver<CommandProviderOutbound>)((StreamObserver)this.outboundCommandStream.get()));
            subscriptionResult = CompletableFuture.allOf(subscriptionResult, ack);
        }
        return new AsyncRegistration(subscriptionResult, () -> this.unsubscribe(commandHandler, commandNames));
    }

    private CompletableFuture<Void> sendSubscribe(String commandName, int loadFactor, StreamObserver<CommandProviderOutbound> outbound) {
        String instructionId = UUID.randomUUID().toString();
        return this.sendInstruction(CommandProviderOutbound.newBuilder().setInstructionId(instructionId).setSubscribe(CommandSubscription.newBuilder().setMessageId(instructionId).setCommand(commandName).setClientId(this.clientIdentification.getClientId()).setComponentName(this.clientIdentification.getComponentName()).setLoadFactor(loadFactor)).build(), CommandProviderOutbound::getInstructionId, outbound);
    }

    private CompletableFuture<Void> unsubscribe(CommandHandler handler, String ... commandNames) {
        CompletableFuture<Object> future = CompletableFuture.completedFuture(null);
        for (String commandName : commandNames) {
            if (this.commandHandlers.get(commandName) != handler) continue;
            logger.info("Deregistered handler for command '{}' in context '{}'", (Object)commandName, (Object)this.context);
            CompletionStage result = this.sendUnsubscribe(commandName, (StreamObserver<CommandProviderOutbound>)((StreamObserver)this.outboundCommandStream.get())).thenRun(() -> this.commandHandlers.remove(commandName, handler));
            future = CompletableFuture.allOf(new CompletableFuture[]{future, result});
        }
        return future;
    }

    private CompletableFuture<Void> sendUnsubscribe(String commandName, StreamObserver<CommandProviderOutbound> outbound) {
        String instructionId = UUID.randomUUID().toString();
        CommandSubscription unsubscribeMessage = CommandSubscription.newBuilder().setMessageId(instructionId).setCommand(commandName).setClientId(this.clientIdentification.getClientId()).setComponentName(this.clientIdentification.getComponentName()).build();
        return this.sendInstruction(CommandProviderOutbound.newBuilder().setInstructionId(instructionId).setUnsubscribe(unsubscribeMessage).build(), CommandProviderOutbound::getInstructionId, outbound);
    }

    @Override
    public CompletableFuture<CommandResponse> sendCommand(Command command) {
        logger.trace("Sending command over CommandChannel for context '{}'.", (Object)this.context);
        boolean hasRoutingKey = command.getProcessingInstructionsList().stream().anyMatch(pi -> pi.getKey() == ProcessingKey.ROUTING_KEY);
        String messageIdentifier = "".equals(command.getMessageIdentifier()) ? UUID.randomUUID().toString() : command.getMessageIdentifier();
        Command.Builder toSend = Command.newBuilder(command).setMessageIdentifier(messageIdentifier).setClientId(this.clientIdentification.getClientId()).setComponentName(this.clientIdentification.getComponentName());
        if (!hasRoutingKey) {
            toSend.addProcessingInstructions(ProcessingInstruction.newBuilder().setKey(ProcessingKey.ROUTING_KEY).setValue(MetaDataValue.newBuilder().setTextValue(toSend.getMessageIdentifier())));
        }
        CompletableFuture<CommandResponse> response = new CompletableFuture<CommandResponse>();
        try {
            this.commandServiceStub.dispatch(toSend.build(), new CommandResponseHandler(this.clientIdentification.getClientId(), response));
        }
        catch (OutOfDirectMemoryError e) {
            response.completeExceptionally(new AxonServerException(ErrorCategory.COMMAND_DISPATCH_ERROR, "Unable to buffer message for dispatching", this.clientIdentification.getClientId(), e));
        }
        catch (Exception e) {
            response.completeExceptionally(new AxonServerException(ErrorCategory.COMMAND_DISPATCH_ERROR, "An error occurred while attempting to dispatch a message", this.clientIdentification.getClientId(), e));
        }
        return response;
    }

    @Override
    public CompletableFuture<Void> prepareDisconnect() {
        logger.debug("Preparing disconnect on CommandChannel for context '{}'.", (Object)this.context);
        return this.commandHandlers.keySet().stream().map(commandName -> this.sendUnsubscribe((String)commandName, (StreamObserver<CommandProviderOutbound>)((StreamObserver)this.outboundCommandStream.get()))).reduce((xva$0, xva$1) -> CompletableFuture.allOf(xva$0, xva$1)).orElseGet(() -> CompletableFuture.completedFuture(null));
    }

    private /* synthetic */ void lambda$handleIncomingCommand$4(CompletableFuture result, Void r, Throwable e) {
        this.inProgressCommands.remove(result);
    }

    private class IncomingCommandStream
    extends AbstractIncomingInstructionStream<CommandProviderInbound, CommandProviderOutbound> {
        public IncomingCommandStream(String clientId, int permits, int permitsBatch, Consumer<Throwable> disconnectHandler, Consumer<CallStreamObserver<CommandProviderOutbound>> beforeStartHandler) {
            super(clientId, permits, permitsBatch, disconnectHandler, beforeStartHandler);
        }

        @Override
        protected CommandProviderOutbound buildFlowControlMessage(FlowControl flowControl) {
            return CommandProviderOutbound.newBuilder().setFlowControl(flowControl).build();
        }

        @Override
        protected CommandProviderOutbound buildAckMessage(InstructionAck ack) {
            return CommandProviderOutbound.newBuilder().setAck(ack).build();
        }

        @Override
        protected String getInstructionId(CommandProviderInbound instruction) {
            return instruction.getInstructionId();
        }

        @Override
        protected InstructionHandler<CommandProviderInbound, CommandProviderOutbound> getHandler(CommandProviderInbound request) {
            return (InstructionHandler)CommandChannelImpl.this.handlers.get((Object)request.getRequestCase());
        }

        @Override
        protected boolean unregisterOutboundStream(CallStreamObserver<CommandProviderOutbound> expected) {
            return CommandChannelImpl.this.outboundCommandStream.compareAndSet(expected, null);
        }
    }

    private static class CommandHandler {
        private final Function<Command, CompletableFuture<CommandResponse>> handler;
        private final int loadFactor;

        public CommandHandler(Function<Command, CompletableFuture<CommandResponse>> handler, int loadFactor) {
            this.handler = handler;
            this.loadFactor = loadFactor;
        }

        public Function<Command, CompletableFuture<CommandResponse>> getHandler() {
            return this.handler;
        }

        public int getLoadFactor() {
            return this.loadFactor;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            CommandHandler that = (CommandHandler)o;
            return this.handler.equals(that.handler);
        }

        public int hashCode() {
            return Objects.hash(this.handler);
        }
    }

    private static class CommandResponseHandler
    implements StreamObserver<CommandResponse> {
        private final String clientId;
        private final CompletableFuture<CommandResponse> response;

        public CommandResponseHandler(String clientId, CompletableFuture<CommandResponse> response) {
            this.clientId = clientId;
            this.response = response;
        }

        public void onNext(CommandResponse value) {
            if (!this.response.isDone()) {
                this.response.complete(value);
            }
        }

        public void onError(Throwable t) {
            if (!this.response.isDone()) {
                this.response.completeExceptionally(new AxonServerException(ErrorCategory.COMMAND_DISPATCH_ERROR, "Received exception while dispatching command", this.clientId, t));
            }
        }

        public void onCompleted() {
            if (!this.response.isDone()) {
                this.response.completeExceptionally(new AxonServerException(ErrorCategory.COMMAND_DISPATCH_ERROR, "Reply completed without result", this.clientId));
            }
        }
    }
}

