/*
 * Decompiled with CFR 0.152.
 */
package io.a2a.server.events;

import io.a2a.server.events.EventEnqueueHook;
import io.a2a.server.events.EventQueueClosedException;
import io.a2a.server.events.EventQueueItem;
import io.a2a.server.events.LocalEventQueueItem;
import io.a2a.server.tasks.TaskStateProvider;
import io.a2a.spec.Event;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class EventQueue
implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventQueue.class);
    public static final int DEFAULT_QUEUE_SIZE = 1000;
    private final int queueSize;
    protected final BlockingQueue<EventQueueItem> queue = new LinkedBlockingDeque<EventQueueItem>();
    protected final Semaphore semaphore;
    private volatile boolean closed = false;

    protected EventQueue() {
        this(1000);
    }

    protected EventQueue(int queueSize) {
        if (queueSize <= 0) {
            throw new IllegalArgumentException("Queue size must be greater than 0");
        }
        this.queueSize = queueSize;
        this.semaphore = new Semaphore(queueSize, true);
        LOGGER.trace("Creating {} with queue size: {}", (Object)this, (Object)queueSize);
    }

    protected EventQueue(EventQueue parent) {
        this(1000);
        LOGGER.trace("Creating {}, parent: {}", (Object)this, (Object)parent);
    }

    static EventQueueBuilder builder() {
        return new EventQueueBuilder();
    }

    public int getQueueSize() {
        return this.queueSize;
    }

    public abstract void awaitQueuePollerStart() throws InterruptedException;

    abstract void signalQueuePollerStarted();

    public void enqueueEvent(Event event) {
        this.enqueueItem(new LocalEventQueueItem(event));
    }

    public void enqueueItem(EventQueueItem item) {
        Event event = item.getEvent();
        if (this.closed) {
            LOGGER.warn("Queue is closed. Event will not be enqueued. {} {}", (Object)this, (Object)event);
            return;
        }
        try {
            this.semaphore.acquire();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Unable to acquire the semaphore to enqueue the event", e);
        }
        this.queue.add(item);
        LOGGER.debug("Enqueued event {} {}", (Object)(event instanceof Throwable ? event.toString() : event), (Object)this);
    }

    abstract EventQueue tap();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueClosedException {
        if (this.closed && this.queue.isEmpty()) {
            LOGGER.debug("Queue is closed, and empty. Sending termination message. {}", (Object)this);
            throw new EventQueueClosedException();
        }
        try {
            if (waitMilliSeconds <= 0) {
                EventQueueItem event;
                EventQueueItem item = (EventQueueItem)this.queue.poll();
                if (item != null) {
                    event = item.getEvent();
                    LOGGER.debug("Dequeued event item (no wait) {} {}", (Object)this, event instanceof Throwable ? event.toString() : event);
                    this.semaphore.release();
                }
                event = item;
                return event;
            }
            LOGGER.trace("Polling queue {} (wait={}ms)", (Object)System.identityHashCode(this), (Object)waitMilliSeconds);
            EventQueueItem item = this.queue.poll(waitMilliSeconds, TimeUnit.MILLISECONDS);
            if (item != null) {
                Event event = item.getEvent();
                LOGGER.debug("Dequeued event item (waiting) {} {}", (Object)this, (Object)(event instanceof Throwable ? event.toString() : event));
                this.semaphore.release();
            } else {
                LOGGER.trace("Dequeue timeout (null) from queue {}", (Object)System.identityHashCode(this));
            }
            EventQueueItem eventQueueItem = item;
            return eventQueueItem;
        }
        finally {
            this.signalQueuePollerStarted();
        }
    }

    public void taskDone() {
    }

    @Override
    public abstract void close();

    public abstract void close(boolean var1);

    public abstract void close(boolean var1, boolean var2);

    public boolean isClosed() {
        return this.closed;
    }

    protected void doClose() {
        this.doClose(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void doClose(boolean immediate) {
        EventQueue eventQueue = this;
        synchronized (eventQueue) {
            if (this.closed) {
                return;
            }
            LOGGER.debug("Closing {} (immediate={})", (Object)this, (Object)immediate);
            this.closed = true;
        }
        if (immediate) {
            this.queue.clear();
            LOGGER.debug("Cleared queue for immediate close: {}", (Object)this);
        }
    }

    public static class EventQueueBuilder {
        private int queueSize = 1000;
        private EventEnqueueHook hook;
        private String taskId;
        private List<Runnable> onCloseCallbacks = new ArrayList<Runnable>();
        private TaskStateProvider taskStateProvider;

        public EventQueueBuilder queueSize(int queueSize) {
            this.queueSize = queueSize;
            return this;
        }

        public EventQueueBuilder hook(EventEnqueueHook hook) {
            this.hook = hook;
            return this;
        }

        public EventQueueBuilder taskId(String taskId) {
            this.taskId = taskId;
            return this;
        }

        public EventQueueBuilder addOnCloseCallback(Runnable onCloseCallback) {
            if (onCloseCallback != null) {
                this.onCloseCallbacks.add(onCloseCallback);
            }
            return this;
        }

        public EventQueueBuilder taskStateProvider(TaskStateProvider taskStateProvider) {
            this.taskStateProvider = taskStateProvider;
            return this;
        }

        public EventQueue build() {
            if (this.hook != null || !this.onCloseCallbacks.isEmpty() || this.taskStateProvider != null) {
                return new MainQueue(this.queueSize, this.hook, this.taskId, this.onCloseCallbacks, this.taskStateProvider);
            }
            return new MainQueue(this.queueSize);
        }
    }

    static class ChildQueue
    extends EventQueue {
        private final MainQueue parent;

        public ChildQueue(MainQueue parent) {
            this.parent = parent;
        }

        @Override
        public void enqueueEvent(Event event) {
            this.parent.enqueueEvent(event);
        }

        private void internalEnqueueItem(EventQueueItem item) {
            super.enqueueItem(item);
        }

        @Override
        EventQueue tap() {
            throw new IllegalStateException("Can only tap the main queue");
        }

        @Override
        public void awaitQueuePollerStart() throws InterruptedException {
            this.parent.awaitQueuePollerStart();
        }

        @Override
        void signalQueuePollerStarted() {
            this.parent.signalQueuePollerStarted();
        }

        @Override
        public void close() {
            this.close(false);
        }

        @Override
        public void close(boolean immediate) {
            this.close(immediate, true);
        }

        @Override
        public void close(boolean immediate, boolean notifyParent) {
            this.doClose(immediate);
            if (notifyParent) {
                this.parent.childClosing(this, immediate);
            } else {
                LOGGER.debug("Closing {} without notifying parent (keeping MainQueue alive)", (Object)this);
            }
        }
    }

    static class MainQueue
    extends EventQueue {
        private final List<ChildQueue> children = new CopyOnWriteArrayList<ChildQueue>();
        private final CountDownLatch pollingStartedLatch = new CountDownLatch(1);
        private final AtomicBoolean pollingStarted = new AtomicBoolean(false);
        private final EventEnqueueHook enqueueHook;
        private final String taskId;
        private final List<Runnable> onCloseCallbacks;
        private final TaskStateProvider taskStateProvider;

        MainQueue() {
            this.enqueueHook = null;
            this.taskId = null;
            this.onCloseCallbacks = List.of();
            this.taskStateProvider = null;
        }

        MainQueue(int queueSize) {
            super(queueSize);
            this.enqueueHook = null;
            this.taskId = null;
            this.onCloseCallbacks = List.of();
            this.taskStateProvider = null;
        }

        MainQueue(EventEnqueueHook hook) {
            this.enqueueHook = hook;
            this.taskId = null;
            this.onCloseCallbacks = List.of();
            this.taskStateProvider = null;
        }

        MainQueue(int queueSize, EventEnqueueHook hook) {
            super(queueSize);
            this.enqueueHook = hook;
            this.taskId = null;
            this.onCloseCallbacks = List.of();
            this.taskStateProvider = null;
        }

        MainQueue(int queueSize, EventEnqueueHook hook, String taskId, List<Runnable> onCloseCallbacks, TaskStateProvider taskStateProvider) {
            super(queueSize);
            this.enqueueHook = hook;
            this.taskId = taskId;
            this.onCloseCallbacks = List.copyOf(onCloseCallbacks);
            this.taskStateProvider = taskStateProvider;
            LOGGER.debug("Created MainQueue for task {} with {} onClose callbacks and TaskStateProvider: {}", new Object[]{taskId, onCloseCallbacks.size(), taskStateProvider != null});
        }

        @Override
        EventQueue tap() {
            ChildQueue child = new ChildQueue(this);
            this.children.add(child);
            return child;
        }

        @Override
        public void enqueueItem(EventQueueItem item) {
            Event event = item.getEvent();
            try {
                this.semaphore.acquire();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Unable to acquire the semaphore to enqueue the event", e);
            }
            this.queue.add(item);
            LOGGER.debug("Enqueued event {} {}", (Object)(event instanceof Throwable ? event.toString() : event), (Object)this);
            this.children.forEach(eq -> eq.internalEnqueueItem(item));
            if (this.enqueueHook != null) {
                this.enqueueHook.onEnqueue(item);
            }
        }

        @Override
        public void awaitQueuePollerStart() throws InterruptedException {
            LOGGER.debug("Waiting for queue poller to start on {}", (Object)this);
            this.pollingStartedLatch.await(10L, TimeUnit.SECONDS);
            LOGGER.debug("Queue poller started on {}", (Object)this);
        }

        @Override
        void signalQueuePollerStarted() {
            if (this.pollingStarted.get()) {
                return;
            }
            LOGGER.debug("Signalling that queue polling started {}", (Object)this);
            this.pollingStartedLatch.countDown();
            this.pollingStarted.set(true);
        }

        void childClosing(ChildQueue child, boolean immediate) {
            this.children.remove(child);
            if (immediate) {
                LOGGER.debug("MainQueue closing immediately (immediate=true)");
                this.doClose(immediate);
                return;
            }
            if (!this.children.isEmpty()) {
                LOGGER.debug("MainQueue staying open: {} children remaining", (Object)this.children.size());
                return;
            }
            if (this.taskStateProvider != null && this.taskId != null) {
                boolean isFinalized = this.taskStateProvider.isTaskFinalized(this.taskId);
                if (!isFinalized) {
                    LOGGER.debug("MainQueue for task {} has no children, but task is not finalized - keeping queue open for potential resubscriptions", (Object)this.taskId);
                    return;
                }
                LOGGER.debug("MainQueue for task {} has no children and task is finalized - closing queue", (Object)this.taskId);
            } else {
                LOGGER.debug("MainQueue has no children and no TaskStateProvider - closing queue (legacy behavior)");
            }
            this.doClose(immediate);
        }

        public int getActiveChildCount() {
            return this.children.size();
        }

        @Override
        protected void doClose(boolean immediate) {
            if (!this.onCloseCallbacks.isEmpty()) {
                LOGGER.debug("Invoking {} onClose callbacks for task {} BEFORE closing", (Object)this.onCloseCallbacks.size(), (Object)this.taskId);
                for (Runnable callback : this.onCloseCallbacks) {
                    try {
                        callback.run();
                    }
                    catch (Exception e) {
                        LOGGER.error("Error in onClose callback for task {}", (Object)this.taskId, (Object)e);
                    }
                }
            }
            super.doClose(immediate);
        }

        @Override
        public void close() {
            this.close(false);
        }

        @Override
        public void close(boolean immediate) {
            this.doClose(immediate);
            if (immediate) {
                this.children.forEach(child -> child.doClose(immediate));
            }
            this.children.clear();
        }

        @Override
        public void close(boolean immediate, boolean notifyParent) {
            throw new UnsupportedOperationException("MainQueue does not support notifyParent parameter - use close(boolean) instead");
        }
    }
}

