/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.command;

import io.axoniq.axonserver.grpc.command.Command;
import io.axoniq.axonserver.grpc.command.CommandProviderInbound;
import io.axoniq.axonserver.grpc.command.CommandProviderOutbound;
import io.axoniq.axonserver.grpc.command.CommandResponse;
import io.grpc.stub.StreamObserver;
import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.function.BiConsumer;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.axonserver.connector.DefaultHandlers;
import org.axonframework.axonserver.connector.DispatchInterceptors;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.Handlers;
import org.axonframework.axonserver.connector.TargetContextResolver;
import org.axonframework.axonserver.connector.command.AxonServerCommandDispatchException;
import org.axonframework.axonserver.connector.command.AxonServerRegistration;
import org.axonframework.axonserver.connector.command.CommandLoadFactorProvider;
import org.axonframework.axonserver.connector.command.CommandPriorityCalculator;
import org.axonframework.axonserver.connector.command.CommandSerializer;
import org.axonframework.axonserver.connector.util.ExecutorServiceBuilder;
import org.axonframework.axonserver.connector.util.ProcessingInstructionHelper;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.CommandResultMessage;
import org.axonframework.commandhandling.GenericCommandResultMessage;
import org.axonframework.commandhandling.callbacks.NoOpCallback;
import org.axonframework.commandhandling.distributed.RoutingStrategy;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.ObjectUtils;
import org.axonframework.common.Registration;
import org.axonframework.lifecycle.ShutdownHandler;
import org.axonframework.lifecycle.ShutdownLatch;
import org.axonframework.lifecycle.StartHandler;
import org.axonframework.messaging.Distributed;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AxonServerCommandBus
implements CommandBus,
Distributed<CommandBus> {
    private static final Logger logger = LoggerFactory.getLogger(AxonServerCommandBus.class);
    private static final long DEFAULT_PRIORITY = 0L;
    private final AxonServerConnectionManager axonServerConnectionManager;
    private final AxonServerConfiguration configuration;
    private final CommandBus localSegment;
    private final CommandSerializer serializer;
    private final RoutingStrategy routingStrategy;
    private final CommandPriorityCalculator priorityCalculator;
    private final CommandLoadFactorProvider loadFactorProvider;
    private final DispatchInterceptors<CommandMessage<?>> dispatchInterceptors;
    private final TargetContextResolver<? super CommandMessage<?>> targetContextResolver;
    private final CommandCallback<Object, Object> defaultCommandCallback;
    private final Handlers<CommandProviderInbound.RequestCase, BiConsumer<CommandProviderInbound, StreamObserver<CommandProviderOutbound>>> commandHandlers = new DefaultHandlers<CommandProviderInbound.RequestCase, BiConsumer<CommandProviderInbound, StreamObserver<CommandProviderOutbound>>>();
    private final ShutdownLatch shutdownLatch = new ShutdownLatch();
    private final Map<String, ExecutorService> executorPerContext = new ConcurrentHashMap<String, ExecutorService>();
    private final Map<String, Registration> subscribedHandlers = new ConcurrentHashMap<String, Registration>();
    private final ExecutorService executorService;

    public static Builder builder() {
        return new Builder();
    }

    public AxonServerCommandBus(Builder builder) {
        builder.validate();
        this.axonServerConnectionManager = builder.axonServerConnectionManager;
        this.configuration = builder.configuration;
        this.localSegment = builder.localSegment;
        this.serializer = builder.buildSerializer();
        this.routingStrategy = builder.routingStrategy;
        this.priorityCalculator = builder.priorityCalculator;
        this.defaultCommandCallback = builder.defaultCommandCallback;
        this.loadFactorProvider = builder.loadFactorProvider;
        String context = this.configuration.getContext();
        this.targetContextResolver = builder.targetContextResolver.orElse(m -> context);
        this.executorService = (ExecutorService)builder.executorServiceBuilder.apply(builder.configuration, new PriorityBlockingQueue<Object>(1000, Comparator.comparingLong(r -> r instanceof CommandProcessingTask ? ((CommandProcessingTask)r).getPriority() : 0L).reversed()));
        this.dispatchInterceptors = new DispatchInterceptors();
    }

    @StartHandler(phase=0x1FFFFFFF)
    public void start() {
        this.shutdownLatch.initialize();
    }

    public <C> void dispatch(CommandMessage<C> command) {
        this.dispatch(command, this.defaultCommandCallback);
    }

    public <C, R> void dispatch(CommandMessage<C> commandMessage, CommandCallback<? super C, ? super R> commandCallback) {
        logger.debug("Dispatch command [{}] with callback", (Object)commandMessage.getCommandName());
        this.doDispatch(this.dispatchInterceptors.intercept(commandMessage), commandCallback);
    }

    private <C, R> void doDispatch(CommandMessage<C> commandMessage, CommandCallback<? super C, ? super R> commandCallback) {
        this.shutdownLatch.ifShuttingDown("Cannot dispatch new commands as this bus is being shutdown");
        ShutdownLatch.ActivityHandle commandInTransit = this.shutdownLatch.registerActivity();
        try {
            String context = this.targetContextResolver.resolveContext(commandMessage);
            Command command = this.serializer.serialize(commandMessage, this.routingStrategy.getRoutingKey(commandMessage), this.priorityCalculator.determinePriority(commandMessage));
            CompletableFuture result = this.axonServerConnectionManager.getConnection(context).commandChannel().sendCommand(command);
            ((CompletableFuture)((CompletableFuture)((CompletableFuture)result.thenApply(commandResponse -> this.serializer.deserialize((CommandResponse)commandResponse))).exceptionally(GenericCommandResultMessage::asCommandResultMessage)).thenAccept(r -> commandCallback.onResult(commandMessage, r))).whenComplete((r, e) -> commandInTransit.end());
        }
        catch (Exception e2) {
            commandInTransit.end();
            commandCallback.onResult(commandMessage, GenericCommandResultMessage.asCommandResultMessage((Throwable)((Object)new AxonServerCommandDispatchException(ErrorCode.COMMAND_DISPATCH_ERROR.errorCode(), "Exception while dispatching a command to AxonServer", e2))));
        }
    }

    public Registration subscribe(String commandName, MessageHandler<? super CommandMessage<?>> messageHandler) {
        logger.debug("Subscribing command with name [{}]", (Object)commandName);
        Registration localRegistration = this.localSegment.subscribe(commandName, messageHandler);
        io.axoniq.axonserver.connector.Registration serverRegistration = this.axonServerConnectionManager.getConnection().commandChannel().registerCommandHandler(c -> {
            CompletableFuture<CommandResponse> result = new CompletableFuture<CommandResponse>();
            this.executorService.submit(new CommandProcessingTask((Command)c, this.serializer, result, this.localSegment));
            return result;
        }, this.loadFactorProvider.getFor(commandName), new String[]{commandName});
        return new AxonServerRegistration(localRegistration, () -> ((io.axoniq.axonserver.connector.Registration)serverRegistration).cancel());
    }

    public CommandBus localSegment() {
        return this.localSegment;
    }

    public Registration registerHandlerInterceptor(MessageHandlerInterceptor<? super CommandMessage<?>> handlerInterceptor) {
        return this.localSegment.registerHandlerInterceptor(handlerInterceptor);
    }

    public Registration registerDispatchInterceptor(MessageDispatchInterceptor<? super CommandMessage<?>> dispatchInterceptor) {
        return this.dispatchInterceptors.registerDispatchInterceptor(dispatchInterceptor);
    }

    @ShutdownHandler(phase=0x1FFFFFFF)
    public CompletableFuture<Void> disconnect() {
        if (this.axonServerConnectionManager.isConnected(this.axonServerConnectionManager.getDefaultContext())) {
            return this.axonServerConnectionManager.getConnection().commandChannel().prepareDisconnect();
        }
        return CompletableFuture.completedFuture(null);
    }

    @ShutdownHandler(phase=0)
    public CompletableFuture<Void> shutdownDispatching() {
        return this.shutdownLatch.initiateShutdown();
    }

    public static class Builder {
        private CommandCallback<Object, Object> defaultCommandCallback = NoOpCallback.INSTANCE;
        private AxonServerConnectionManager axonServerConnectionManager;
        private AxonServerConfiguration configuration;
        private CommandBus localSegment;
        private Serializer serializer;
        private RoutingStrategy routingStrategy;
        private CommandPriorityCalculator priorityCalculator = CommandPriorityCalculator.defaultCommandPriorityCalculator();
        private TargetContextResolver<? super CommandMessage<?>> targetContextResolver = c -> this.configuration.getContext();
        private ExecutorServiceBuilder executorServiceBuilder = ExecutorServiceBuilder.defaultCommandExecutorServiceBuilder();
        private CommandLoadFactorProvider loadFactorProvider = command -> 100;

        public Builder axonServerConnectionManager(AxonServerConnectionManager axonServerConnectionManager) {
            BuilderUtils.assertNonNull((Object)axonServerConnectionManager, (String)"AxonServerConnectionManager may not be null");
            this.axonServerConnectionManager = axonServerConnectionManager;
            return this;
        }

        public Builder configuration(AxonServerConfiguration configuration) {
            BuilderUtils.assertNonNull((Object)configuration, (String)"AxonServerConfiguration may not be null");
            this.configuration = configuration;
            return this;
        }

        public Builder localSegment(CommandBus localSegment) {
            BuilderUtils.assertNonNull((Object)localSegment, (String)"Local CommandBus may not be null");
            this.localSegment = localSegment;
            return this;
        }

        public Builder serializer(Serializer serializer) {
            BuilderUtils.assertNonNull((Object)serializer, (String)"Serializer may not be null");
            this.serializer = serializer;
            return this;
        }

        public Builder routingStrategy(RoutingStrategy routingStrategy) {
            BuilderUtils.assertNonNull((Object)routingStrategy, (String)"RoutingStrategy may not be null");
            this.routingStrategy = routingStrategy;
            return this;
        }

        public Builder defaultCommandCallback(CommandCallback<Object, Object> defaultCommandCallback) {
            this.defaultCommandCallback = (CommandCallback)ObjectUtils.getOrDefault(defaultCommandCallback, (Object)NoOpCallback.INSTANCE);
            return this;
        }

        public Builder priorityCalculator(CommandPriorityCalculator priorityCalculator) {
            BuilderUtils.assertNonNull((Object)priorityCalculator, (String)"CommandPriorityCalculator may not be null");
            this.priorityCalculator = priorityCalculator;
            return this;
        }

        public Builder targetContextResolver(TargetContextResolver<? super CommandMessage<?>> targetContextResolver) {
            BuilderUtils.assertNonNull(targetContextResolver, (String)"TargetContextResolver may not be null");
            this.targetContextResolver = targetContextResolver;
            return this;
        }

        public Builder executorServiceBuilder(ExecutorServiceBuilder executorServiceBuilder) {
            BuilderUtils.assertNonNull((Object)executorServiceBuilder, (String)"ExecutorServiceBuilder may not be null");
            this.executorServiceBuilder = executorServiceBuilder;
            return this;
        }

        public Builder loadFactorProvider(CommandLoadFactorProvider loadFactorProvider) {
            BuilderUtils.assertNonNull((Object)loadFactorProvider, (String)"CommandLoadFactorProvider may not be null");
            this.loadFactorProvider = loadFactorProvider;
            return this;
        }

        public AxonServerCommandBus build() {
            return new AxonServerCommandBus(this);
        }

        protected CommandSerializer buildSerializer() {
            return new CommandSerializer(this.serializer, this.configuration);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull((Object)this.axonServerConnectionManager, (String)"The AxonServerConnectionManager is a hard requirement and should be provided");
            BuilderUtils.assertNonNull((Object)this.configuration, (String)"The AxonServerConfiguration is a hard requirement and should be provided");
            BuilderUtils.assertNonNull((Object)this.localSegment, (String)"The Local CommandBus is a hard requirement and should be provided");
            BuilderUtils.assertNonNull((Object)this.serializer, (String)"The Serializer is a hard requirement and should be provided");
            BuilderUtils.assertNonNull((Object)this.routingStrategy, (String)"The RoutingStrategy is a hard requirement and should be provided");
        }
    }

    private static class CommandProcessingTask
    implements Runnable {
        private final long priority;
        private final CompletableFuture<CommandResponse> result;
        private final CommandBus localSegment;
        private final Command command;
        private final CommandSerializer serializer;

        public CommandProcessingTask(Command command, CommandSerializer serializer, CompletableFuture<CommandResponse> result, CommandBus localSegment) {
            this.command = command;
            this.serializer = serializer;
            this.priority = ProcessingInstructionHelper.priority(command.getProcessingInstructionsList());
            this.result = result;
            this.localSegment = localSegment;
        }

        public long getPriority() {
            return this.priority;
        }

        @Override
        public void run() {
            try {
                this.localSegment.dispatch(this.serializer.deserialize(this.command), (CommandCallback)new CommandCallback<Object, Object>(){

                    public void onResult(CommandMessage<?> commandMessage, CommandResultMessage<?> commandResultMessage) {
                        result.complete(serializer.serialize(commandResultMessage, command.getMessageIdentifier()));
                    }
                });
            }
            catch (Exception e) {
                this.result.completeExceptionally(e);
            }
        }
    }
}

