/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.command.impl;

import io.axoniq.axonserver.connector.AxonServerException;
import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.InstructionHandler;
import io.axoniq.axonserver.connector.Registration;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.connector.command.CommandChannel;
import io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel;
import io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream;
import io.axoniq.axonserver.connector.impl.AxonServerManagedChannel;
import io.axoniq.axonserver.connector.impl.ObjectUtils;
import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.InstructionAck;
import io.axoniq.axonserver.grpc.MetaDataValue;
import io.axoniq.axonserver.grpc.ProcessingInstruction;
import io.axoniq.axonserver.grpc.ProcessingKey;
import io.axoniq.axonserver.grpc.command.Command;
import io.axoniq.axonserver.grpc.command.CommandProviderInbound;
import io.axoniq.axonserver.grpc.command.CommandProviderOutbound;
import io.axoniq.axonserver.grpc.command.CommandResponse;
import io.axoniq.axonserver.grpc.command.CommandServiceGrpc;
import io.axoniq.axonserver.grpc.command.CommandSubscription;
import io.axoniq.axonserver.grpc.control.ClientIdentification;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import io.netty.util.internal.OutOfDirectMemoryError;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommandChannelImpl
extends AbstractAxonServerChannel
implements CommandChannel {
    private static final Logger logger = LoggerFactory.getLogger(CommandChannelImpl.class);
    private final AtomicReference<StreamObserver<CommandProviderOutbound>> outboundCommandStream = new AtomicReference();
    private final ClientIdentification clientIdentification;
    private final ConcurrentMap<String, Function<Command, CompletableFuture<CommandResponse>>> commandHandlers = new ConcurrentHashMap<String, Function<Command, CompletableFuture<CommandResponse>>>();
    private final ConcurrentMap<String, CompletableFuture<Void>> instructionsAwaitingAck = new ConcurrentHashMap<String, CompletableFuture<Void>>();
    private final ConcurrentMap<CommandProviderInbound.RequestCase, InstructionHandler<CommandProviderInbound, CommandProviderOutbound>> handlers = new ConcurrentHashMap<CommandProviderInbound.RequestCase, InstructionHandler<CommandProviderInbound, CommandProviderOutbound>>();
    private final int permits;
    private final int permitsBatch;
    private final CommandServiceGrpc.CommandServiceStub commandServiceStub;

    public CommandChannelImpl(ClientIdentification clientIdentification, int permits, int permitsBatch, ScheduledExecutorService executor, AxonServerManagedChannel channel) {
        super(executor, channel);
        this.clientIdentification = clientIdentification;
        this.permits = permits;
        this.permitsBatch = permitsBatch;
        this.handlers.put(CommandProviderInbound.RequestCase.COMMAND, this::handleIncomingCommand);
        this.handlers.put(CommandProviderInbound.RequestCase.ACK, this::handleAck);
        this.commandServiceStub = CommandServiceGrpc.newStub((Channel)channel);
    }

    private void handleIncomingCommand(CommandProviderInbound message, ReplyChannel<CommandProviderOutbound> outbound) {
        Command command = message.getCommand();
        Function<Command, CompletableFuture> handler = (Function<Command, CompletableFuture>)this.commandHandlers.get(command.getName());
        if (handler != null) {
            outbound.sendAck();
        } else {
            outbound.sendNack();
            handler = c -> this.noHandlerForCommand();
        }
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)handler.apply(command).exceptionally(e -> CommandResponse.newBuilder().setErrorCode(ErrorCategory.COMMAND_EXECUTION_ERROR.errorCode()).setErrorMessage(ErrorMessage.newBuilder().setMessage(e.getMessage()).build()).build())).thenApply(CommandResponse::newBuilder)).thenApply(r -> r.setRequestIdentifier(command.getMessageIdentifier()))).whenComplete((r, e) -> outbound.send(CommandProviderOutbound.newBuilder().setCommandResponse((CommandResponse.Builder)r).build()))).thenRun(outbound::complete);
    }

    private CompletableFuture<CommandResponse> noHandlerForCommand() {
        CompletableFuture<CommandResponse> r = new CompletableFuture<CommandResponse>();
        r.completeExceptionally(new AxonServerException(ErrorCategory.NO_HANDLER_FOR_COMMAND, "No handler for command", this.clientIdentification.getClientId()));
        return r;
    }

    private void handleAck(CommandProviderInbound message, ReplyChannel<CommandProviderOutbound> outbound) {
        InstructionAck ack = message.getAck();
        CompletableFuture instructionResult = (CompletableFuture)this.instructionsAwaitingAck.remove(ack.getInstructionId());
        if (instructionResult == null) {
            return;
        }
        if (ack.getSuccess()) {
            instructionResult.complete(null);
        } else {
            instructionResult.completeExceptionally(new AxonServerException(ack.getError()));
        }
    }

    @Override
    public synchronized void connect() {
        if (this.outboundCommandStream.get() != null) {
            return;
        }
        IncomingCommandStream responseObserver = new IncomingCommandStream(this.clientIdentification.getClientId(), this.permits, this.permitsBatch, this::onConnectionError);
        this.commandServiceStub.openStream((StreamObserver<CommandProviderInbound>)responseObserver);
        StreamObserver newValue = responseObserver.getInstructionsForPlatform();
        StreamObserver previous = this.outboundCommandStream.getAndSet(newValue);
        this.commandHandlers.keySet().forEach(k -> newValue.onNext((Object)this.buildSubscribeMessage((String)k, "", 100)));
        responseObserver.enableFlowControl();
        logger.info("CommandChannel connected, {} command handlers registered", (Object)this.commandHandlers.size());
        ObjectUtils.silently(previous, StreamObserver::onCompleted);
    }

    private void onConnectionError(Throwable error) {
        this.instructionsAwaitingAck.keySet().forEach(k -> ObjectUtils.doIfNotNull(this.instructionsAwaitingAck.remove(k), f -> f.completeExceptionally(error)));
        this.scheduleReconnect();
    }

    @Override
    public void disconnect() {
        this.commandHandlers.keySet().forEach(this::sendUnsubscribe);
        this.commandHandlers.clear();
        ObjectUtils.doIfNotNull(this.outboundCommandStream.getAndSet(null), StreamObserver::onCompleted);
    }

    @Override
    public boolean isConnected() {
        return this.outboundCommandStream.get() != null;
    }

    @Override
    public Registration registerCommandHandler(Function<Command, CompletableFuture<CommandResponse>> handler, int loadFactor, String ... commandNames) {
        for (String commandName : commandNames) {
            this.commandHandlers.put(commandName, handler);
            logger.info("Registered handler for command {}", (Object)commandName);
            String instructionId = UUID.randomUUID().toString();
            ObjectUtils.doIfNotNull(this.outboundCommandStream.get(), s -> s.onNext((Object)this.buildSubscribeMessage(commandName, instructionId, loadFactor)));
        }
        return () -> this.unsubscribe(handler, commandNames);
    }

    private void unsubscribe(Function<Command, CompletableFuture<CommandResponse>> handler, String ... commandNames) {
        for (String commandName : commandNames) {
            if (!this.commandHandlers.remove(commandName, handler)) continue;
            this.sendUnsubscribe(commandName);
        }
    }

    private void sendUnsubscribe(String commandName) {
        ObjectUtils.doIfNotNull(this.outboundCommandStream.get(), s -> s.onNext((Object)CommandProviderOutbound.newBuilder().setUnsubscribe(CommandSubscription.newBuilder().setCommand(commandName).setClientId(this.clientIdentification.getClientId()).setComponentName(this.clientIdentification.getComponentName())).build()));
    }

    private CommandProviderOutbound buildSubscribeMessage(String commandName, String instructionId, int loadFactor) {
        return CommandProviderOutbound.newBuilder().setInstructionId(instructionId).setSubscribe(CommandSubscription.newBuilder().setMessageId(instructionId).setCommand(commandName).setClientId(this.clientIdentification.getClientId()).setComponentName(this.clientIdentification.getComponentName()).setLoadFactor(loadFactor)).build();
    }

    @Override
    public CompletableFuture<Void> prepareDisconnect() {
        this.commandHandlers.keySet().forEach(this::sendUnsubscribe);
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public CompletableFuture<CommandResponse> sendCommand(Command command) {
        boolean hasRoutingKey = command.getProcessingInstructionsList().stream().anyMatch(pi -> pi.getKey() == ProcessingKey.ROUTING_KEY);
        Command.Builder toSend = Command.newBuilder(command).setMessageIdentifier("".equals(command.getMessageIdentifier()) ? UUID.randomUUID().toString() : command.getMessageIdentifier()).setClientId(this.clientIdentification.getClientId()).setComponentName(this.clientIdentification.getComponentName());
        if (!hasRoutingKey) {
            toSend.addProcessingInstructions(ProcessingInstruction.newBuilder().setKey(ProcessingKey.ROUTING_KEY).setValue(MetaDataValue.newBuilder().setTextValue(toSend.getMessageIdentifier())));
        }
        CompletableFuture<CommandResponse> response = new CompletableFuture<CommandResponse>();
        try {
            this.commandServiceStub.dispatch(toSend.build(), new CommandResponseHandler(this.clientIdentification.getClientId(), response));
        }
        catch (OutOfDirectMemoryError e) {
            response.completeExceptionally(new AxonServerException(ErrorCategory.COMMAND_DISPATCH_ERROR, "Unable to buffer message for dispatching", this.clientIdentification.getClientId(), e));
        }
        catch (Exception e) {
            response.completeExceptionally(new AxonServerException(ErrorCategory.COMMAND_DISPATCH_ERROR, "An error occurred while attempting to dispatch a message", this.clientIdentification.getClientId(), e));
        }
        return response;
    }

    private class IncomingCommandStream
    extends AbstractIncomingInstructionStream<CommandProviderInbound, CommandProviderOutbound> {
        public IncomingCommandStream(String clientId, int permits, int permitsBatch, Consumer<Throwable> disconnectHandler) {
            super(clientId, permits, permitsBatch, disconnectHandler);
        }

        @Override
        protected CommandProviderOutbound buildFlowControlMessage(FlowControl flowControl) {
            return CommandProviderOutbound.newBuilder().setFlowControl(flowControl).build();
        }

        @Override
        protected CommandProviderOutbound buildAckMessage(InstructionAck ack) {
            return CommandProviderOutbound.newBuilder().setAck(ack).build();
        }

        @Override
        protected String getInstructionId(CommandProviderInbound value) {
            return value.getInstructionId();
        }

        @Override
        protected InstructionHandler<CommandProviderInbound, CommandProviderOutbound> getHandler(CommandProviderInbound request) {
            return (InstructionHandler)CommandChannelImpl.this.handlers.get((Object)request.getRequestCase());
        }

        @Override
        protected boolean unregisterOutboundStream(StreamObserver<CommandProviderOutbound> expected) {
            return CommandChannelImpl.this.outboundCommandStream.compareAndSet(expected, null);
        }
    }

    private static class CommandResponseHandler
    implements StreamObserver<CommandResponse> {
        private final String clientId;
        private final CompletableFuture<CommandResponse> response;

        public CommandResponseHandler(String clientId, CompletableFuture<CommandResponse> response) {
            this.clientId = clientId;
            this.response = response;
        }

        public void onNext(CommandResponse value) {
            if (!this.response.isDone()) {
                this.response.complete(value);
            }
        }

        public void onError(Throwable t) {
            if (!this.response.isDone()) {
                this.response.completeExceptionally(new AxonServerException(ErrorCategory.COMMAND_DISPATCH_ERROR, "Received exception while dispatching command", this.clientId, t));
            }
        }

        public void onCompleted() {
            if (!this.response.isDone()) {
                this.response.completeExceptionally(new AxonServerException(ErrorCategory.COMMAND_DISPATCH_ERROR, "Reply completed without result", this.clientId));
            }
        }
    }
}

