/*
 * Decompiled with CFR 0.152.
 */
package io.a2a.extras.queuemanager.replicated.core;

import io.a2a.extras.queuemanager.replicated.core.ReplicatedEvent;
import io.a2a.extras.queuemanager.replicated.core.ReplicationStrategy;
import io.a2a.server.events.EventEnqueueHook;
import io.a2a.server.events.EventQueue;
import io.a2a.server.events.EventQueueFactory;
import io.a2a.server.events.InMemoryQueueManager;
import io.a2a.server.events.QueueManager;
import io.a2a.spec.Event;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Alternative;
import jakarta.inject.Inject;

@ApplicationScoped
@Alternative
@Priority(value=50)
public class ReplicatedQueueManager
implements QueueManager {
    private final InMemoryQueueManager delegate;
    private final ThreadLocal<Boolean> isHandlingReplicatedEvent = new ThreadLocal();
    @Inject
    private ReplicationStrategy replicationStrategy;

    public ReplicatedQueueManager() {
        this.delegate = new InMemoryQueueManager((EventQueueFactory)new ReplicatingEventQueueFactory());
    }

    ReplicatedQueueManager(ReplicationStrategy replicationStrategy) {
        this.delegate = new InMemoryQueueManager((EventQueueFactory)new ReplicatingEventQueueFactory());
        this.replicationStrategy = replicationStrategy;
    }

    public void add(String taskId, EventQueue queue) {
        this.delegate.add(taskId, queue);
    }

    public EventQueue get(String taskId) {
        return this.delegate.get(taskId);
    }

    public EventQueue tap(String taskId) {
        return this.delegate.tap(taskId);
    }

    public void close(String taskId) {
        this.delegate.close(taskId);
    }

    public EventQueue createOrTap(String taskId) {
        EventQueue queue = this.delegate.createOrTap(taskId);
        return queue;
    }

    public void awaitQueuePollerStart(EventQueue eventQueue) throws InterruptedException {
        this.delegate.awaitQueuePollerStart(eventQueue);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onReplicatedEvent(@Observes ReplicatedEvent replicatedEvent) {
        this.isHandlingReplicatedEvent.set(true);
        try {
            Event event;
            EventQueue queue = this.delegate.get(replicatedEvent.getTaskId());
            if (queue == null) {
                queue = this.delegate.createOrTap(replicatedEvent.getTaskId());
            }
            if (queue != null && (event = replicatedEvent.getEventAsGeneric()) != null) {
                queue.enqueueEvent(event);
            }
        }
        finally {
            this.isHandlingReplicatedEvent.remove();
        }
    }

    public EventQueue.EventQueueBuilder getEventQueueBuilder(String taskId) {
        return super.getEventQueueBuilder(taskId).hook((EventEnqueueHook)new ReplicationHook(taskId));
    }

    private class ReplicatingEventQueueFactory
    implements EventQueueFactory {
        private ReplicatingEventQueueFactory() {
        }

        public EventQueue.EventQueueBuilder builder(String taskId) {
            return ReplicatedQueueManager.this.delegate.getEventQueueBuilder(taskId).hook((EventEnqueueHook)new ReplicationHook(taskId));
        }
    }

    private class ReplicationHook
    implements EventEnqueueHook {
        private final String taskId;

        public ReplicationHook(String taskId) {
            this.taskId = taskId;
        }

        public void onEnqueue(Event event) {
            if (ReplicatedQueueManager.this.isHandlingReplicatedEvent.get() != Boolean.TRUE && ReplicatedQueueManager.this.replicationStrategy != null && this.taskId != null) {
                ReplicatedQueueManager.this.replicationStrategy.send(this.taskId, event);
            }
        }
    }
}

