/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.command;

import io.axoniq.axonserver.connector.AxonServerConnection;
import io.axoniq.axonserver.connector.Registration;
import io.axoniq.axonserver.connector.impl.AsyncRegistration;
import io.axoniq.axonserver.grpc.command.Command;
import io.axoniq.axonserver.grpc.command.CommandResponse;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.command.CommandConverter;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.CommandResultMessage;
import org.axonframework.commandhandling.distributed.CommandBusConnector;
import org.axonframework.common.Assert;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.lifecycle.ShutdownLatch;
import org.axonframework.messaging.QualifiedName;
import org.axonframework.messaging.unitofwork.ProcessingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AxonServerCommandBusConnector
implements CommandBusConnector {
    private static final Logger logger = LoggerFactory.getLogger(AxonServerCommandBusConnector.class);
    private final AxonServerConnection connection;
    private final String clientId;
    private final String componentName;
    private CommandBusConnector.Handler incomingHandler;
    private final Map<QualifiedName, Registration> subscriptions = new ConcurrentHashMap<QualifiedName, Registration>();
    private final ShutdownLatch shutdownLatch = new ShutdownLatch();

    public AxonServerCommandBusConnector(@Nonnull AxonServerConnection connection, @Nonnull AxonServerConfiguration configuration) {
        this.connection = Objects.requireNonNull(connection, "The AxonServerConnection must not be null.");
        Objects.requireNonNull(configuration, "The AxonServerConfiguration must not be null.");
        this.clientId = configuration.getClientId();
        this.componentName = configuration.getComponentName();
    }

    public void start() {
        this.shutdownLatch.initialize();
        logger.trace("The AxonServerCommandBusConnector started.");
    }

    @Nonnull
    public CompletableFuture<CommandResultMessage<?>> dispatch(@Nonnull CommandMessage command, @Nullable ProcessingContext processingContext) {
        this.shutdownLatch.ifShuttingDown("Cannot dispatch new commands as this bus is being shutdown");
        try (ShutdownLatch.ActivityHandle commandInTransit = this.shutdownLatch.registerActivity();){
            CompletionStage completionStage = ((CompletableFuture)this.connection.commandChannel().sendCommand(CommandConverter.convertCommandMessage(command, this.clientId, this.componentName)).thenCompose(CommandConverter::convertCommandResponse)).whenComplete((commandResponse, throwable) -> commandInTransit.end());
            return completionStage;
        }
    }

    public void subscribe(@Nonnull QualifiedName commandName, int loadFactor) {
        Assert.isTrue((loadFactor >= 0 ? 1 : 0) != 0, () -> "Load factor must be greater than 0.");
        logger.debug("Subscribing to command [{}] with load factor [{}]", (Object)commandName, (Object)loadFactor);
        Registration registration = this.connection.commandChannel().registerCommandHandler(this::handle, loadFactor, new String[]{commandName.name()});
        if (registration instanceof AsyncRegistration) {
            AsyncRegistration asyncRegistration = (AsyncRegistration)registration;
            try {
                asyncRegistration.awaitAck(2000L, TimeUnit.MILLISECONDS);
            }
            catch (TimeoutException e) {
                throw new RuntimeException("Timed out waiting for subscription acknowledgment for command: " + String.valueOf(commandName), e);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Thread interrupted while waiting for subscription acknowledgment", e);
            }
        }
        this.subscriptions.put(commandName, registration);
    }

    private CompletableFuture<CommandResponse> handle(Command command) {
        logger.debug("Received incoming command [{}]", (Object)command.getName());
        try {
            CompletableFuture<CommandResponse> result = new CompletableFuture<CommandResponse>();
            this.incomingHandler.handle(CommandConverter.convertCommand(command), (CommandBusConnector.ResultCallback)new FutureResultCallback(result, command));
            return result;
        }
        catch (Exception e) {
            logger.error("Error processing incoming command: {}", (Object)command.getName(), (Object)e);
            CompletableFuture<CommandResponse> errorResult = new CompletableFuture<CommandResponse>();
            errorResult.completeExceptionally(e);
            return errorResult;
        }
    }

    public boolean unsubscribe(@Nonnull QualifiedName commandName) {
        Registration subscription = this.subscriptions.remove(commandName);
        if (subscription != null) {
            subscription.cancel();
            return true;
        }
        return false;
    }

    public void onIncomingCommand(@Nonnull CommandBusConnector.Handler handler) {
        this.incomingHandler = handler;
    }

    public CompletableFuture<Void> disconnect() {
        if (!this.connection.isConnected()) {
            return CompletableFuture.completedFuture(null);
        }
        logger.trace("Disconnecting the AxonServerCommandBusConnector.");
        return this.connection.commandChannel().prepareDisconnect();
    }

    public CompletableFuture<Void> shutdownDispatching() {
        logger.trace("Shutting down dispatching of AxonServerCommandBusConnector.");
        return this.shutdownLatch.initiateShutdown();
    }

    public void describeTo(@Nonnull ComponentDescriptor descriptor) {
        descriptor.describeProperty("connection", (Object)this.connection);
        descriptor.describeProperty("clientId", this.clientId);
        descriptor.describeProperty("componentName", this.componentName);
    }

    private record FutureResultCallback(@Nonnull CompletableFuture<CommandResponse> result, @Nonnull Command command) implements CommandBusConnector.ResultCallback
    {
        public void onSuccess(CommandResultMessage<?> resultMessage) {
            logger.debug("Command [{}] completed successfully with result [{}]", (Object)this.command.getName(), resultMessage);
            this.result.complete(CommandConverter.convertResultMessage(resultMessage, this.command.getMessageIdentifier()));
        }

        public void onError(@Nonnull Throwable cause) {
            logger.info("Command [{}] raised an exception [{}]", (Object)this.command.getName(), (Object)cause.getMessage());
            this.result.completeExceptionally(cause);
        }
    }
}

