/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.queryhandling.distributed;

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.axonframework.common.FutureUtils;
import org.axonframework.common.Registration;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.messaging.DelayedMessageStream;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.QualifiedName;
import org.axonframework.messaging.unitofwork.ProcessingContext;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryHandler;
import org.axonframework.queryhandling.QueryMessage;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.axonframework.queryhandling.distributed.DistributedQueryBusConfiguration;
import org.axonframework.queryhandling.distributed.QueryBusConnector;
import org.axonframework.util.PriorityRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedQueryBus
implements QueryBus {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final int QUERY_AND_RESPONSE_QUEUE_CAPACITY = 1000;
    private final QueryBus localSegment;
    private final QueryBusConnector connector;
    private final ExecutorService queryingExecutor;
    private final Map<QueryMessage, QueryBusConnector.UpdateCallback> updateRegistry = new ConcurrentHashMap<QueryMessage, QueryBusConnector.UpdateCallback>();

    public DistributedQueryBus(@Nonnull QueryBus localSegment, @Nonnull QueryBusConnector connector, @Nonnull DistributedQueryBusConfiguration configuration) {
        this.localSegment = localSegment;
        this.connector = connector;
        this.queryingExecutor = configuration.queryExecutorServiceFactory().createExecutorService(configuration, new PriorityBlockingQueue<Runnable>(1000));
        connector.onIncomingQuery(new DistributedHandler());
    }

    @Override
    public QueryBus subscribe(@Nonnull QualifiedName queryName, @Nonnull QueryHandler queryHandler) {
        this.localSegment.subscribe(queryName, queryHandler);
        FutureUtils.joinAndUnwrap(this.connector.subscribe(queryName));
        return this;
    }

    @Override
    @Nonnull
    public MessageStream<QueryResponseMessage> query(@Nonnull QueryMessage query, @Nullable ProcessingContext context) {
        return this.connector.query(query, context);
    }

    @Override
    @Nonnull
    public MessageStream<QueryResponseMessage> subscriptionQuery(@Nonnull QueryMessage query, @Nullable ProcessingContext context, int updateBufferSize) {
        return this.connector.subscriptionQuery(query, context, updateBufferSize);
    }

    @Override
    @Nonnull
    public MessageStream<SubscriptionQueryUpdateMessage> subscribeToUpdates(@Nonnull QueryMessage query, int updateBufferSize) {
        return this.subscriptionQuery(query, null, updateBufferSize).filter(e -> e.message() instanceof SubscriptionQueryUpdateMessage).cast();
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> emitUpdate(@Nonnull Predicate<QueryMessage> filter, @Nonnull Supplier<SubscriptionQueryUpdateMessage> updateSupplier, @Nullable ProcessingContext context) {
        ArrayList tasks = new ArrayList();
        this.updateRegistry.forEach((message, sender) -> {
            if (filter.test((QueryMessage)message)) {
                tasks.add(sender.sendUpdate((SubscriptionQueryUpdateMessage)updateSupplier.get()));
            }
        });
        return CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]));
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> completeSubscriptions(@Nonnull Predicate<QueryMessage> filter, @Nullable ProcessingContext context) {
        ArrayList tasks = new ArrayList();
        this.updateRegistry.forEach((message, sender) -> {
            if (filter.test((QueryMessage)message)) {
                tasks.add(sender.complete());
            }
        });
        return CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]));
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> completeSubscriptionsExceptionally(@Nonnull Predicate<QueryMessage> filter, @Nonnull Throwable cause, @Nullable ProcessingContext context) {
        ArrayList tasks = new ArrayList();
        this.updateRegistry.forEach((message, sender) -> {
            if (filter.test((QueryMessage)message)) {
                tasks.add(sender.completeExceptionally(cause));
            }
        });
        return CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]));
    }

    @Override
    public void describeTo(@Nonnull ComponentDescriptor descriptor) {
        descriptor.describeWrapperOf(this.localSegment);
        descriptor.describeProperty("connector", this.connector);
    }

    private class DistributedHandler
    implements QueryBusConnector.Handler {
        private static final AtomicLong TASK_SEQUENCE = new AtomicLong(Long.MIN_VALUE);

        private DistributedHandler() {
        }

        @Override
        public MessageStream<QueryResponseMessage> query(@Nonnull QueryMessage query) {
            int priority = query.priority().orElse(0);
            if (logger.isDebugEnabled()) {
                logger.debug("Received query [{}] for processing with priority [{}].", (Object)query.type(), (Object)priority);
            }
            long sequence = TASK_SEQUENCE.incrementAndGet();
            CompletableFuture localResult = new CompletableFuture();
            DistributedQueryBus.this.queryingExecutor.execute(new PriorityRunnable(() -> {
                try {
                    MessageStream<QueryResponseMessage> result = DistributedQueryBus.this.localSegment.query(query, null);
                    result.first().asCompletableFuture().whenComplete((firstMessage, error) -> {
                        if (error != null) {
                            localResult.completeExceptionally((Throwable)error);
                        } else {
                            localResult.complete(result);
                        }
                    });
                }
                catch (Exception e) {
                    localResult.completeExceptionally(e);
                }
            }, priority, sequence));
            return DelayedMessageStream.create(localResult);
        }

        @Override
        @Nonnull
        public Registration registerUpdateHandler(@Nonnull QueryMessage subscriptionQueryMessage, @Nonnull QueryBusConnector.UpdateCallback updateCallback) {
            DistributedQueryBus.this.updateRegistry.put(subscriptionQueryMessage, updateCallback);
            return () -> DistributedQueryBus.this.updateRegistry.remove(subscriptionQueryMessage, updateCallback);
        }
    }
}

