/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.commandhandling.distributed;

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.CommandResultMessage;
import org.axonframework.commandhandling.distributed.CommandBusConnector;
import org.axonframework.commandhandling.distributed.DistributedCommandBusConfiguration;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.messaging.QualifiedName;
import org.axonframework.messaging.unitofwork.ProcessingContext;
import org.axonframework.util.PriorityRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedCommandBus
implements CommandBus {
    private static final Logger logger = LoggerFactory.getLogger(DistributedCommandBus.class);
    private final CommandBus localSegment;
    private final CommandBusConnector connector;
    private final ExecutorService executorService;
    private final int loadFactor;

    public DistributedCommandBus(@Nonnull CommandBus localSegment, @Nonnull CommandBusConnector connector, @Nonnull DistributedCommandBusConfiguration configuration) {
        this.localSegment = Objects.requireNonNull(localSegment, "The given CommandBus localSegment cannot be null.");
        this.connector = Objects.requireNonNull(connector, "The given Connector cannot be null.");
        this.loadFactor = configuration.loadFactor();
        this.executorService = configuration.executorServiceFactory().createExecutorService(configuration, new PriorityBlockingQueue<Runnable>(1000));
        connector.onIncomingCommand(new DistributedHandler());
    }

    @Override
    public DistributedCommandBus subscribe(@Nonnull QualifiedName name, @Nonnull CommandHandler handler) {
        CommandHandler commandHandler = Objects.requireNonNull(handler, "The given handler cannot be null.");
        this.localSegment.subscribe(name, commandHandler);
        this.connector.subscribe(name, this.loadFactor);
        return this;
    }

    @Override
    public CompletableFuture<CommandResultMessage> dispatch(@Nonnull CommandMessage command, @Nullable ProcessingContext processingContext) {
        return this.connector.dispatch(command, processingContext);
    }

    @Override
    public void describeTo(@Nonnull ComponentDescriptor descriptor) {
        descriptor.describeWrapperOf(this.localSegment);
        descriptor.describeProperty("connector", this.connector);
    }

    private class DistributedHandler
    implements CommandBusConnector.Handler {
        private static final AtomicLong TASK_SEQUENCE = new AtomicLong(Long.MIN_VALUE);

        private DistributedHandler() {
        }

        @Override
        public void handle(@Nonnull CommandMessage commandMessage, @Nonnull CommandBusConnector.ResultCallback callback) {
            int priority = commandMessage.priority().orElse(0);
            if (logger.isDebugEnabled()) {
                logger.debug("Received command [{}] for processing with priority [{}] and routing key [{}]", new Object[]{commandMessage.type(), commandMessage.priority().orElse(0), commandMessage.routingKey().orElse(null)});
            }
            long sequence = TASK_SEQUENCE.incrementAndGet();
            DistributedCommandBus.this.executorService.execute(new PriorityRunnable(() -> this.doHandleCommand(commandMessage, callback), priority, sequence));
        }

        private void doHandleCommand(CommandMessage commandMessage, CommandBusConnector.ResultCallback callback) {
            if (logger.isDebugEnabled()) {
                logger.debug("Processing incoming command [{}] with priority [{}] and routing key [{}]", new Object[]{commandMessage.type(), commandMessage.priority().orElse(0), commandMessage.routingKey().orElse(null)});
            }
            DistributedCommandBus.this.localSegment.dispatch(commandMessage, null).whenComplete((resultMessage, e) -> {
                try {
                    if (e == null) {
                        this.handleSuccess(commandMessage, callback, (CommandResultMessage)resultMessage);
                    } else {
                        this.handleError(commandMessage, callback, (Throwable)e);
                    }
                }
                catch (Throwable ex) {
                    logger.error("Error handling response of command [{}]", (Object)commandMessage.type(), (Object)ex);
                    this.handleError(commandMessage, callback, ex);
                }
            });
        }

        private void handleError(CommandMessage commandMessage, CommandBusConnector.ResultCallback callback, Throwable e) {
            logger.error("Error processing incoming command [{}]", (Object)commandMessage.type(), (Object)e);
            callback.onError(e);
        }

        private void handleSuccess(CommandMessage commandMessage, CommandBusConnector.ResultCallback callback, CommandResultMessage resultMessage) {
            logger.debug("Successfully processed command [{}] with result [{}]", (Object)commandMessage.type(), (Object)resultMessage);
            callback.onSuccess(resultMessage);
        }
    }
}

