/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling.async;

import jakarta.annotation.Nonnull;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventProcessingStrategy;
import org.axonframework.eventhandling.async.EventProcessorTask;
import org.axonframework.eventhandling.async.SequencingPolicy;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.LegacyMessageSupportingContext;
import org.axonframework.messaging.unitofwork.LegacyUnitOfWork;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsynchronousEventProcessingStrategy
implements EventProcessingStrategy {
    private static final Logger logger = LoggerFactory.getLogger(AsynchronousEventProcessingStrategy.class);
    private final String scheduledEventsKey = String.valueOf(this) + "_SCHEDULED_EVENTS";
    private final Executor executor;
    private final SequencingPolicy sequencingPolicy;
    private final ConcurrentMap<Object, EventProcessorTask> currentTasks = new ConcurrentHashMap<Object, EventProcessorTask>();

    public AsynchronousEventProcessingStrategy(Executor executor, SequencingPolicy sequencingPolicy) {
        this.executor = Objects.requireNonNull(executor);
        this.sequencingPolicy = Objects.requireNonNull(sequencingPolicy);
    }

    @Override
    public void handle(@Nonnull List<? extends EventMessage> events, @Nonnull Consumer<List<? extends EventMessage>> processor) {
        if (CurrentUnitOfWork.isStarted()) {
            LegacyUnitOfWork<?> unitOfWorkRoot = CurrentUnitOfWork.get().root();
            unitOfWorkRoot.getOrComputeResource(this.scheduledEventsKey, key -> {
                ArrayList allEvents = new ArrayList();
                unitOfWorkRoot.afterCommit(uow -> this.schedule(allEvents, processor));
                return allEvents;
            }).addAll(events);
        } else {
            this.schedule(events, processor);
        }
    }

    protected void schedule(List<? extends EventMessage> events, Consumer<List<? extends EventMessage>> processor) {
        HashMap<Object, List> groupedEvents = new HashMap<Object, List>();
        for (EventMessage eventMessage : events) {
            groupedEvents.computeIfAbsent(this.sequencingPolicy.getSequenceIdentifierFor(eventMessage, new LegacyMessageSupportingContext(eventMessage)).orElse(null), key -> new ArrayList()).add(eventMessage);
        }
        groupedEvents.forEach((sequenceIdentifier, eventGroup) -> {
            if (sequenceIdentifier == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Scheduling Event for full concurrent processing {}", (Object)events.getClass().getSimpleName());
                }
                this.newProcessingScheduler(NoActionCallback.INSTANCE).scheduleEvents((List<? extends EventMessage>)eventGroup, processor);
            } else {
                if (logger.isDebugEnabled()) {
                    logger.debug("Scheduling task of type [{}] for sequential processing in group [{}]", (Object)events.getClass().getSimpleName(), (Object)sequenceIdentifier.toString());
                }
                this.assignEventsToScheduler((List<? extends EventMessage>)eventGroup, sequenceIdentifier, processor);
            }
        });
    }

    private void assignEventsToScheduler(List<? extends EventMessage> events, Object sequenceIdentifier, Consumer<List<? extends EventMessage>> processor) {
        boolean taskScheduled = false;
        while (!taskScheduled) {
            EventProcessorTask currentScheduler = (EventProcessorTask)this.currentTasks.get(sequenceIdentifier);
            if (currentScheduler == null) {
                this.currentTasks.putIfAbsent(sequenceIdentifier, this.newProcessingScheduler(new SchedulerCleanUp(sequenceIdentifier)));
                continue;
            }
            taskScheduled = currentScheduler.scheduleEvents(events, processor);
            if (taskScheduled) continue;
            this.currentTasks.remove(sequenceIdentifier, currentScheduler);
        }
    }

    protected EventProcessorTask newProcessingScheduler(EventProcessorTask.ShutdownCallback shutDownCallback) {
        if (logger.isDebugEnabled()) {
            logger.debug("Initializing new processing scheduler.");
        }
        return new EventProcessorTask(this.executor, shutDownCallback);
    }

    private final class SchedulerCleanUp
    implements EventProcessorTask.ShutdownCallback {
        private final Object sequenceIdentifier;

        private SchedulerCleanUp(Object sequenceIdentifier) {
            this.sequenceIdentifier = sequenceIdentifier;
        }

        @Override
        public void afterShutdown(EventProcessorTask scheduler) {
            if (logger.isDebugEnabled()) {
                logger.debug("Cleaning up processing scheduler for sequence [{}]", (Object)this.sequenceIdentifier.toString());
            }
            AsynchronousEventProcessingStrategy.this.currentTasks.remove(this.sequenceIdentifier, scheduler);
        }
    }

    private static enum NoActionCallback implements EventProcessorTask.ShutdownCallback
    {
        INSTANCE;


        @Override
        public void afterShutdown(EventProcessorTask scheduler) {
        }
    }
}

