/*
 * Decompiled with CFR 0.152.
 */
package io.a2a.server.events;

import io.a2a.server.events.EventQueue;
import io.a2a.server.events.EventQueueFactory;
import io.a2a.server.events.MainEventBus;
import io.a2a.server.events.NoTaskQueueException;
import io.a2a.server.events.QueueManager;
import io.a2a.server.events.TaskQueueExistsException;
import io.a2a.server.tasks.TaskStateProvider;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class InMemoryQueueManager
implements QueueManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryQueueManager.class);
    private final ConcurrentMap<String, EventQueue> queues = new ConcurrentHashMap<String, EventQueue>();
    private EventQueueFactory factory;
    private TaskStateProvider taskStateProvider;
    MainEventBus mainEventBus;

    protected InMemoryQueueManager() {
        this.factory = null;
        this.taskStateProvider = null;
    }

    @Inject
    public InMemoryQueueManager(TaskStateProvider taskStateProvider, MainEventBus mainEventBus) {
        this.mainEventBus = mainEventBus;
        this.factory = new DefaultEventQueueFactory();
        this.taskStateProvider = taskStateProvider;
    }

    public InMemoryQueueManager(EventQueueFactory factory, TaskStateProvider taskStateProvider, MainEventBus mainEventBus) {
        this.factory = factory;
        this.taskStateProvider = taskStateProvider;
        this.mainEventBus = mainEventBus;
    }

    @Override
    public void add(String taskId, EventQueue queue) {
        EventQueue existing = this.queues.putIfAbsent(taskId, queue);
        if (existing != null) {
            throw new TaskQueueExistsException();
        }
    }

    @Override
    public @Nullable EventQueue get(String taskId) {
        return (EventQueue)this.queues.get(taskId);
    }

    @Override
    public @Nullable EventQueue tap(String taskId) {
        EventQueue queue = (EventQueue)this.queues.get(taskId);
        return queue == null ? null : queue.tap();
    }

    @Override
    public void close(String taskId) {
        EventQueue existing = (EventQueue)this.queues.remove(taskId);
        if (existing == null) {
            throw new NoTaskQueueException();
        }
        LOGGER.debug("Closing queue {} for task {}", (Object)System.identityHashCode(existing), (Object)taskId);
        existing.close();
    }

    @Override
    public EventQueue createOrTap(String taskId) {
        EventQueue main;
        LOGGER.debug("createOrTap called for task {}, current map size: {}", (Object)taskId, (Object)this.queues.size());
        EventQueue existing = (EventQueue)this.queues.get(taskId);
        if (existing != null && existing.isClosed()) {
            boolean isFinalized;
            boolean bl = isFinalized = this.taskStateProvider != null && this.taskStateProvider.isTaskFinalized(taskId);
            if (isFinalized) {
                LOGGER.debug("Removing closed queue {} for finalized task {}", (Object)System.identityHashCode(existing), (Object)taskId);
                this.queues.remove(taskId);
                existing = null;
            } else {
                LOGGER.debug("Queue {} for task {} is closed but task not finalized - keeping for late-arriving events", (Object)System.identityHashCode(existing), (Object)taskId);
            }
        }
        EventQueue newQueue = null;
        if (existing == null) {
            newQueue = this.factory.builder(taskId).build();
            existing = this.queues.putIfAbsent(taskId, newQueue);
        }
        EventQueue eventQueue = main = existing == null ? newQueue : existing;
        if (main == null) {
            throw new IllegalStateException("Failed to create or retrieve queue for task " + taskId);
        }
        EventQueue result = main.tap();
        if (existing == null) {
            LOGGER.debug("Created new MainQueue {} for task {}, returning ChildQueue {} (map size: {})", new Object[]{System.identityHashCode(main), taskId, System.identityHashCode(result), this.queues.size()});
        } else {
            LOGGER.debug("Tapped existing MainQueue {} -> ChildQueue {} for task {}", new Object[]{System.identityHashCode(main), System.identityHashCode(result), taskId});
        }
        return result;
    }

    @Override
    public void awaitQueuePollerStart(EventQueue eventQueue) throws InterruptedException {
        eventQueue.awaitQueuePollerStart();
    }

    @Override
    public EventQueue.EventQueueBuilder getEventQueueBuilder(String taskId) {
        return this.factory.builder(taskId);
    }

    @Override
    public int getActiveChildQueueCount(String taskId) {
        EventQueue queue = (EventQueue)this.queues.get(taskId);
        if (queue == null || queue.isClosed()) {
            return -1;
        }
        if (queue instanceof EventQueue.MainQueue) {
            EventQueue.MainQueue mainQueue = (EventQueue.MainQueue)queue;
            return mainQueue.getActiveChildCount();
        }
        return -1;
    }

    @Override
    public EventQueue.EventQueueBuilder createBaseEventQueueBuilder(String taskId) {
        return EventQueue.builder(this.mainEventBus).taskId(taskId).addOnCloseCallback(this.getCleanupCallback(taskId)).taskStateProvider(this.taskStateProvider);
    }

    public Runnable getCleanupCallback(String taskId) {
        return () -> {
            boolean isFinalized;
            LOGGER.debug("Queue close callback invoked for task {}", (Object)taskId);
            boolean bl = isFinalized = this.taskStateProvider != null && this.taskStateProvider.isTaskFinalized(taskId);
            if (!isFinalized) {
                LOGGER.debug("Task {} is not finalized, keeping queue in map for late-arriving events", (Object)taskId);
                return;
            }
            LOGGER.debug("Task {} is finalized, removing queue from map", (Object)taskId);
            EventQueue removed = (EventQueue)this.queues.remove(taskId);
            if (removed != null) {
                LOGGER.debug("Removed closed queue for task {} from QueueManager (map size: {})", (Object)taskId, (Object)this.queues.size());
            } else {
                LOGGER.debug("Queue for task {} was already removed from map", (Object)taskId);
            }
        };
    }

    private class DefaultEventQueueFactory
    implements EventQueueFactory {
        private DefaultEventQueueFactory() {
        }

        @Override
        public EventQueue.EventQueueBuilder builder(String taskId) {
            return InMemoryQueueManager.this.createBaseEventQueueBuilder(taskId);
        }
    }
}

