/*
 * Decompiled with CFR 0.152.
 */
package io.a2a.extras.queuemanager.replicated.core;

import io.a2a.extras.common.events.TaskFinalizedEvent;
import io.a2a.extras.queuemanager.replicated.core.ReplicatedEventQueueItem;
import io.a2a.extras.queuemanager.replicated.core.ReplicationStrategy;
import io.a2a.server.events.EventEnqueueHook;
import io.a2a.server.events.EventQueue;
import io.a2a.server.events.EventQueueFactory;
import io.a2a.server.events.EventQueueItem;
import io.a2a.server.events.InMemoryQueueManager;
import io.a2a.server.events.MainEventBus;
import io.a2a.server.events.QueueClosedEvent;
import io.a2a.server.events.QueueManager;
import io.a2a.server.tasks.TaskStateProvider;
import io.a2a.spec.Event;
import io.a2a.spec.Task;
import io.a2a.spec.TaskStatusUpdateEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.event.TransactionPhase;
import jakarta.enterprise.inject.Alternative;
import jakarta.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
@Alternative
@Priority(value=50)
public class ReplicatedQueueManager
implements QueueManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicatedQueueManager.class);
    private InMemoryQueueManager delegate;
    private ReplicationStrategy replicationStrategy;
    private TaskStateProvider taskStateProvider;

    protected ReplicatedQueueManager() {
        this.delegate = null;
        this.replicationStrategy = null;
        this.taskStateProvider = null;
    }

    @Inject
    public ReplicatedQueueManager(ReplicationStrategy replicationStrategy, TaskStateProvider taskStateProvider, MainEventBus mainEventBus) {
        this.replicationStrategy = replicationStrategy;
        this.taskStateProvider = taskStateProvider;
        this.delegate = new InMemoryQueueManager((EventQueueFactory)new ReplicatingEventQueueFactory(), taskStateProvider, mainEventBus);
    }

    public void add(String taskId, EventQueue queue) {
        this.delegate.add(taskId, queue);
    }

    public EventQueue get(String taskId) {
        return this.delegate.get(taskId);
    }

    public EventQueue tap(String taskId) {
        return this.delegate.tap(taskId);
    }

    public void close(String taskId) {
        this.delegate.close(taskId);
    }

    public EventQueue createOrTap(String taskId) {
        return this.delegate.createOrTap(taskId);
    }

    public void awaitQueuePollerStart(EventQueue eventQueue) throws InterruptedException {
        this.delegate.awaitQueuePollerStart(eventQueue);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onReplicatedEvent(@Observes ReplicatedEventQueueItem replicatedEvent) {
        if (!(replicatedEvent.isClosedEvent() || replicatedEvent.isTaskEvent() || this.taskStateProvider.isTaskActive(replicatedEvent.getTaskId()))) {
            LOGGER.debug("Skipping replicated event for inactive task {}", (Object)replicatedEvent.getTaskId());
            return;
        }
        EventQueue childQueue = null;
        EventQueue mainQueue = this.delegate.get(replicatedEvent.getTaskId());
        try {
            if (mainQueue == null) {
                LOGGER.debug("Creating MainQueue for replicated event on task {}", (Object)replicatedEvent.getTaskId());
                childQueue = this.delegate.createOrTap(replicatedEvent.getTaskId());
                mainQueue = this.delegate.get(replicatedEvent.getTaskId());
            }
            if (mainQueue != null) {
                mainQueue.enqueueItem((EventQueueItem)replicatedEvent);
            } else {
                LOGGER.warn("MainQueue not found for task {}, cannot enqueue replicated event. This may happen if the queue was already cleaned up.", (Object)replicatedEvent.getTaskId());
            }
        }
        finally {
            block13: {
                if (childQueue != null) {
                    try {
                        childQueue.close();
                    }
                    catch (Exception ignore) {
                        if (!LOGGER.isDebugEnabled()) break block13;
                        ignore.printStackTrace();
                    }
                }
            }
        }
    }

    public void onTaskFinalized(@Observes(during=TransactionPhase.AFTER_SUCCESS) TaskFinalizedEvent event) {
        String taskId = event.getTaskId();
        Task finalTask = (Task)event.getTask();
        LOGGER.debug("Task {} finalized - sending TaskStatusUpdateEvent then poison pill (QueueClosedEvent) after transaction commit", (Object)taskId);
        TaskStatusUpdateEvent finalStatusEvent = TaskStatusUpdateEvent.builder().taskId(taskId).contextId(finalTask.contextId()).status(finalTask.status()).build();
        this.replicationStrategy.send(taskId, (Event)finalStatusEvent);
        QueueClosedEvent closedEvent = new QueueClosedEvent(taskId);
        this.replicationStrategy.send(taskId, (Event)closedEvent);
    }

    public EventQueue.EventQueueBuilder getEventQueueBuilder(String taskId) {
        return super.getEventQueueBuilder(taskId).hook((EventEnqueueHook)new ReplicationHook(taskId));
    }

    public int getActiveChildQueueCount(String taskId) {
        return this.delegate.getActiveChildQueueCount(taskId);
    }

    private class ReplicatingEventQueueFactory
    implements EventQueueFactory {
        private ReplicatingEventQueueFactory() {
        }

        public EventQueue.EventQueueBuilder builder(String taskId) {
            return ReplicatedQueueManager.this.delegate.createBaseEventQueueBuilder(taskId).hook((EventEnqueueHook)new ReplicationHook(taskId));
        }
    }

    private class ReplicationHook
    implements EventEnqueueHook {
        private final String taskId;

        public ReplicationHook(String taskId) {
            this.taskId = taskId;
        }

        public void onEnqueue(EventQueueItem item) {
            if (!item.isReplicated() && this.taskId != null) {
                ReplicatedQueueManager.this.replicationStrategy.send(this.taskId, item.getEvent());
            }
        }
    }
}

