/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling;

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import org.axonframework.common.FutureUtils;
import org.axonframework.common.Registration;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventSubscribers;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.unitofwork.ProcessingContext;

public class SimpleEventBus
implements EventBus {
    private final Context.ResourceKey<List<EventMessage>> eventsKey = Context.ResourceKey.withLabel("EventBus_Events");
    private final EventSubscribers eventSubscribers = new EventSubscribers();

    @Override
    public Registration subscribe(@Nonnull BiFunction<List<? extends EventMessage>, ProcessingContext, CompletableFuture<?>> eventsBatchConsumer) {
        return this.eventSubscribers.subscribe(eventsBatchConsumer);
    }

    @Override
    public CompletableFuture<Void> publish(@Nullable ProcessingContext context, @Nonnull List<EventMessage> events) {
        if (context == null) {
            this.eventSubscribers.notifySubscribers(events, context);
            return FutureUtils.emptyCompletedFuture();
        }
        this.registerEventPublishingHooks(context, events);
        return FutureUtils.emptyCompletedFuture();
    }

    private void registerEventPublishingHooks(@Nonnull ProcessingContext context, @Nonnull List<EventMessage> events) {
        if (context.isCommitted()) {
            throw new IllegalStateException("It is not allowed to publish events when the ProcessingContext has already been committed. Please start a new ProcessingContext before publishing events.");
        }
        List eventQueue = context.computeResourceIfAbsent(this.eventsKey, () -> {
            ArrayList queue = new ArrayList();
            context.onPrepareCommit(ctx -> {
                List<EventMessage> queuedEvents = ctx.getResource(this.eventsKey);
                return this.processEventsInPhase(queuedEvents, (ProcessingContext)ctx, this.eventSubscribers::notifySubscribers);
            });
            context.doFinally(ctx -> ctx.removeResource(this.eventsKey));
            return queue;
        });
        eventQueue.addAll(events);
    }

    private CompletableFuture<Void> processEventsInPhase(List<EventMessage> queuedEvents, ProcessingContext context, BiFunction<List<? extends EventMessage>, ProcessingContext, CompletableFuture<Void>> processor) {
        ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
        int processedItems = queuedEvents.size();
        futures.add(processor.apply(new ArrayList<EventMessage>(queuedEvents), context));
        while (processedItems < queuedEvents.size()) {
            ArrayList<EventMessage> newMessages = new ArrayList<EventMessage>(queuedEvents.subList(processedItems, queuedEvents.size()));
            processedItems = queuedEvents.size();
            futures.add(processor.apply(newMessages, context));
        }
        return CompletableFuture.allOf((CompletableFuture[])futures.toArray(CompletableFuture[]::new));
    }

    @Override
    public void describeTo(@Nonnull ComponentDescriptor descriptor) {
        descriptor.describeProperty("eventsKey", this.eventsKey);
        descriptor.describeProperty("eventSubscribers", this.eventSubscribers);
    }
}

