/*
 * Decompiled with CFR 0.152.
 */
package io.a2a.server.events;

import io.a2a.server.events.EventEnqueueHook;
import io.a2a.server.events.EventQueueClosedException;
import io.a2a.server.events.EventQueueItem;
import io.a2a.server.events.LocalEventQueueItem;
import io.a2a.server.events.MainEventBus;
import io.a2a.server.tasks.TaskStateProvider;
import io.a2a.spec.Event;
import io.a2a.spec.Task;
import io.a2a.spec.TaskArtifactUpdateEvent;
import io.a2a.spec.TaskStatusUpdateEvent;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class EventQueue
implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventQueue.class);
    public static final int DEFAULT_QUEUE_SIZE = 1000;
    private final int queueSize;
    private volatile boolean closed = false;

    protected EventQueue() {
        this(1000);
    }

    protected EventQueue(int queueSize) {
        if (queueSize <= 0) {
            throw new IllegalArgumentException("Queue size must be greater than 0");
        }
        this.queueSize = queueSize;
        LOGGER.trace("Creating {} with queue size: {}", (Object)this, (Object)queueSize);
    }

    protected EventQueue(EventQueue parent) {
        this(1000);
        LOGGER.trace("Creating {}, parent: {}", (Object)this, (Object)parent);
    }

    static EventQueueBuilder builder(MainEventBus mainEventBus) {
        return new EventQueueBuilder().mainEventBus(mainEventBus);
    }

    public int getQueueSize() {
        return this.queueSize;
    }

    public abstract void awaitQueuePollerStart() throws InterruptedException;

    public abstract void signalQueuePollerStarted();

    public void enqueueEvent(Event event) {
        this.enqueueItem(new LocalEventQueueItem(event));
    }

    public abstract void enqueueItem(EventQueueItem var1);

    public void enqueueLocalOnly(EventQueueItem item) {
        throw new UnsupportedOperationException("enqueueLocalOnly is only supported on ChildQueue for subscribe scenarios");
    }

    public void enqueueEventLocalOnly(Event event) {
        this.enqueueLocalOnly(new LocalEventQueueItem(event));
    }

    public abstract EventQueue tap();

    public abstract @Nullable EventQueueItem dequeueEventItem(int var1) throws EventQueueClosedException;

    public void taskDone() {
    }

    public abstract int size();

    @Override
    public abstract void close();

    public abstract void close(boolean var1);

    public abstract void close(boolean var1, boolean var2);

    public boolean isClosed() {
        return this.closed;
    }

    protected void doClose() {
        this.doClose(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void doClose(boolean immediate) {
        EventQueue eventQueue = this;
        synchronized (eventQueue) {
            if (this.closed) {
                return;
            }
            LOGGER.debug("Closing {} (immediate={})", (Object)this, (Object)immediate);
            this.closed = true;
        }
    }

    public static class EventQueueBuilder {
        private int queueSize = 1000;
        private @Nullable EventEnqueueHook hook;
        private @Nullable String taskId;
        private List<Runnable> onCloseCallbacks = new ArrayList<Runnable>();
        private @Nullable TaskStateProvider taskStateProvider;
        private @Nullable MainEventBus mainEventBus;

        public EventQueueBuilder queueSize(int queueSize) {
            this.queueSize = queueSize;
            return this;
        }

        public EventQueueBuilder hook(EventEnqueueHook hook) {
            this.hook = hook;
            return this;
        }

        public EventQueueBuilder taskId(String taskId) {
            this.taskId = taskId;
            return this;
        }

        public EventQueueBuilder addOnCloseCallback(Runnable onCloseCallback) {
            if (onCloseCallback != null) {
                this.onCloseCallbacks.add(onCloseCallback);
            }
            return this;
        }

        public EventQueueBuilder taskStateProvider(TaskStateProvider taskStateProvider) {
            this.taskStateProvider = taskStateProvider;
            return this;
        }

        public EventQueueBuilder mainEventBus(MainEventBus mainEventBus) {
            this.mainEventBus = mainEventBus;
            return this;
        }

        public EventQueue build() {
            if (this.mainEventBus == null) {
                throw new IllegalStateException("MainEventBus is required for EventQueue creation");
            }
            if (this.taskId == null) {
                throw new IllegalStateException("taskId is required for EventQueue creation");
            }
            return new MainQueue(this.queueSize, this.hook, this.taskId, this.onCloseCallbacks, this.taskStateProvider, this.mainEventBus);
        }
    }

    static class ChildQueue
    extends EventQueue {
        private final MainQueue parent;
        private final BlockingQueue<EventQueueItem> queue = new LinkedBlockingDeque<EventQueueItem>();
        private volatile boolean immediateClose = false;
        private volatile boolean awaitingFinalEvent = false;

        public ChildQueue(MainQueue parent) {
            this.parent = parent;
        }

        @Override
        public void enqueueEvent(Event event) {
            this.parent.enqueueEvent(event);
        }

        @Override
        public void enqueueItem(EventQueueItem item) {
            this.parent.enqueueItem(item);
        }

        private void internalEnqueueItem(EventQueueItem item) {
            Event event = item.getEvent();
            if (this.isClosed() && this.immediateClose) {
                LOGGER.warn("ChildQueue is immediately closed. Event will not be enqueued. {} {}", (Object)this, (Object)event);
                return;
            }
            if (!this.queue.offer(item)) {
                LOGGER.warn("ChildQueue {} is full. Closing immediately.", (Object)this);
                this.close(true);
            } else {
                LOGGER.debug("Enqueued event {} {}", (Object)(event instanceof Throwable ? event.toString() : event), (Object)this);
                if (this.awaitingFinalEvent && this.isFinalEvent(event)) {
                    this.awaitingFinalEvent = false;
                    LOGGER.debug("ChildQueue {} received awaited final event", (Object)System.identityHashCode(this));
                }
            }
        }

        private boolean isFinalEvent(Event event) {
            if (event instanceof Task) {
                Task task = (Task)event;
                return task.status() != null && task.status().state() != null && task.status().state().isFinal();
            }
            if (event instanceof TaskStatusUpdateEvent) {
                TaskStatusUpdateEvent statusUpdate = (TaskStatusUpdateEvent)event;
                return statusUpdate.isFinal();
            }
            return false;
        }

        @Override
        public void enqueueLocalOnly(EventQueueItem item) {
            this.internalEnqueueItem(item);
        }

        @Override
        public @Nullable EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueClosedException {
            this.signalQueuePollerStarted();
            if (this.isClosed() && (this.queue.isEmpty() || this.immediateClose) && !this.awaitingFinalEvent) {
                LOGGER.debug("ChildQueue is closed{}, sending termination message. {} (queueSize={})", new Object[]{this.immediateClose ? " (immediate)" : " and empty", this, this.queue.size()});
                throw new EventQueueClosedException();
            }
            if (waitMilliSeconds <= 0) {
                EventQueueItem item = (EventQueueItem)this.queue.poll();
                if (item != null) {
                    Event event = item.getEvent();
                    LOGGER.debug("Dequeued event item (no wait) {} {}", (Object)this, (Object)(event instanceof Throwable ? event.toString() : event));
                }
                return item;
            }
            try {
                LOGGER.trace("Polling ChildQueue {} (wait={}ms)", (Object)System.identityHashCode(this), (Object)waitMilliSeconds);
                EventQueueItem item = this.queue.poll(waitMilliSeconds, TimeUnit.MILLISECONDS);
                if (item != null) {
                    Event event = item.getEvent();
                    LOGGER.debug("Dequeued event item (waiting) {} {}", (Object)this, (Object)(event instanceof Throwable ? event.toString() : event));
                } else {
                    LOGGER.trace("Dequeue timeout (null) from ChildQueue {}", (Object)System.identityHashCode(this));
                }
                return item;
            }
            catch (InterruptedException e) {
                LOGGER.debug("Interrupted dequeue (waiting) {}", (Object)this);
                Thread.currentThread().interrupt();
                return null;
            }
        }

        @Override
        public EventQueue tap() {
            throw new IllegalStateException("Can only tap the main queue");
        }

        @Override
        public int size() {
            return this.queue.size();
        }

        @Override
        public void awaitQueuePollerStart() throws InterruptedException {
            this.parent.awaitQueuePollerStart();
        }

        @Override
        public void signalQueuePollerStarted() {
            this.parent.signalQueuePollerStarted();
        }

        @Override
        protected void doClose(boolean immediate) {
            super.doClose(immediate);
            if (immediate) {
                this.immediateClose = true;
                int clearedCount = this.queue.size();
                this.queue.clear();
                LOGGER.debug("Cleared {} events from ChildQueue for immediate close: {}", (Object)clearedCount, (Object)this);
            }
        }

        void expectFinalEvent() {
            this.awaitingFinalEvent = true;
            LOGGER.debug("ChildQueue {} now awaiting final event", (Object)System.identityHashCode(this));
        }

        @Override
        public void close() {
            this.close(false);
        }

        @Override
        public void close(boolean immediate) {
            this.close(immediate, true);
        }

        @Override
        public void close(boolean immediate, boolean notifyParent) {
            this.doClose(immediate);
            if (notifyParent) {
                this.parent.childClosing(this, immediate);
            } else {
                LOGGER.debug("Closing {} without notifying parent (keeping MainQueue alive)", (Object)this);
            }
        }
    }

    static class MainQueue
    extends EventQueue {
        private final List<ChildQueue> children = new CopyOnWriteArrayList<ChildQueue>();
        protected final Semaphore semaphore;
        private final CountDownLatch pollingStartedLatch = new CountDownLatch(1);
        private final AtomicBoolean pollingStarted = new AtomicBoolean(false);
        private final @Nullable EventEnqueueHook enqueueHook;
        private final String taskId;
        private final List<Runnable> onCloseCallbacks;
        private final @Nullable TaskStateProvider taskStateProvider;
        private final MainEventBus mainEventBus;

        MainQueue(int queueSize, @Nullable EventEnqueueHook hook, String taskId, List<Runnable> onCloseCallbacks, @Nullable TaskStateProvider taskStateProvider, @Nullable MainEventBus mainEventBus) {
            super(queueSize);
            this.semaphore = new Semaphore(queueSize, true);
            this.enqueueHook = hook;
            this.taskId = taskId;
            this.onCloseCallbacks = List.copyOf(onCloseCallbacks);
            this.taskStateProvider = taskStateProvider;
            this.mainEventBus = Objects.requireNonNull(mainEventBus, "MainEventBus is required");
            LOGGER.debug("Created MainQueue for task {} with {} onClose callbacks, TaskStateProvider: {}, MainEventBus configured", new Object[]{taskId, onCloseCallbacks.size(), taskStateProvider != null});
        }

        @Override
        public EventQueue tap() {
            ChildQueue child = new ChildQueue(this);
            this.children.add(child);
            return child;
        }

        public int getChildCount() {
            return this.children.size();
        }

        @Nullable EventEnqueueHook getEnqueueHook() {
            return this.enqueueHook;
        }

        @Override
        public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueClosedException {
            throw new UnsupportedOperationException("MainQueue cannot be consumed directly - use tap() to create a ChildQueue for consumption");
        }

        @Override
        public int size() {
            return this.getQueueSize() - this.semaphore.availablePermits();
        }

        @Override
        public void enqueueItem(EventQueueItem item) {
            Event event = item.getEvent();
            this.validateEventIds(event);
            if (this.isFinalEvent(event)) {
                LOGGER.debug("Final event detected, notifying {} children to expect it", (Object)this.children.size());
                for (ChildQueue child : this.children) {
                    child.expectFinalEvent();
                }
            }
            try {
                this.semaphore.acquire();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Unable to acquire the semaphore to enqueue the event", e);
            }
            LOGGER.debug("Enqueued event {} {}", (Object)(event instanceof Throwable ? event.toString() : event), (Object)this);
            this.mainEventBus.submit(this.taskId, this, item);
        }

        private void validateEventIds(Event event) {
            if (this.taskId == null) {
                return;
            }
            String eventTaskId = null;
            String eventType = null;
            if (event instanceof Task) {
                Task task = (Task)event;
                eventTaskId = task.id();
                eventType = "Task";
            } else if (event instanceof TaskStatusUpdateEvent) {
                TaskStatusUpdateEvent statusEvent = (TaskStatusUpdateEvent)event;
                eventTaskId = statusEvent.taskId();
                eventType = "TaskStatusUpdateEvent";
            } else if (event instanceof TaskArtifactUpdateEvent) {
                TaskArtifactUpdateEvent artifactEvent = (TaskArtifactUpdateEvent)event;
                eventTaskId = artifactEvent.taskId();
                eventType = "TaskArtifactUpdateEvent";
            }
            if (eventTaskId != null && !eventTaskId.equals(this.taskId)) {
                throw new IllegalArgumentException(String.format("Event taskId mismatch: queue=%s, event=%s, eventType=%s", this.taskId, eventTaskId, eventType));
            }
        }

        private boolean isFinalEvent(Event event) {
            if (event instanceof Task) {
                Task task = (Task)event;
                return task.status() != null && task.status().state() != null && task.status().state().isFinal();
            }
            if (event instanceof TaskStatusUpdateEvent) {
                TaskStatusUpdateEvent statusUpdate = (TaskStatusUpdateEvent)event;
                return statusUpdate.isFinal();
            }
            return false;
        }

        @Override
        public void awaitQueuePollerStart() throws InterruptedException {
            LOGGER.debug("Waiting for queue poller to start on {}", (Object)this);
            this.pollingStartedLatch.await(10L, TimeUnit.SECONDS);
            LOGGER.debug("Queue poller started on {}", (Object)this);
        }

        @Override
        public void signalQueuePollerStarted() {
            if (this.pollingStarted.get()) {
                return;
            }
            LOGGER.debug("Signalling that queue polling started {}", (Object)this);
            this.pollingStartedLatch.countDown();
            this.pollingStarted.set(true);
        }

        void childClosing(ChildQueue child, boolean immediate) {
            this.children.remove(child);
            if (!this.children.isEmpty()) {
                LOGGER.debug("MainQueue staying open: {} children remaining", (Object)this.children.size());
                return;
            }
            if (this.taskStateProvider != null && this.taskId != null) {
                boolean isFinalized = this.taskStateProvider.isTaskFinalized(this.taskId);
                if (!isFinalized) {
                    LOGGER.debug("MainQueue for task {} has no children, but task is not finalized - keeping queue open for potential resubscriptions", (Object)this.taskId);
                    return;
                }
                LOGGER.debug("MainQueue for task {} has no children and task is finalized - closing queue", (Object)this.taskId);
            } else {
                LOGGER.debug("MainQueue has no children and no TaskStateProvider - closing queue (legacy behavior)");
            }
            this.doClose(immediate);
        }

        void distributeToChildren(EventQueueItem item) {
            int childCount = this.children.size();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("MainQueue[{}]: Distributing event {} to {} children", new Object[]{this.taskId, item.getEvent().getClass().getSimpleName(), childCount});
            }
            this.children.forEach(child -> {
                LOGGER.debug("MainQueue[{}]: Enqueueing event {} to child queue", (Object)this.taskId, (Object)item.getEvent().getClass().getSimpleName());
                child.internalEnqueueItem(item);
            });
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("MainQueue[{}]: Completed distribution of {} to {} children", new Object[]{this.taskId, item.getEvent().getClass().getSimpleName(), childCount});
            }
        }

        void releaseSemaphore() {
            this.semaphore.release();
        }

        public int getActiveChildCount() {
            return this.children.size();
        }

        @Override
        protected void doClose(boolean immediate) {
            if (!this.onCloseCallbacks.isEmpty()) {
                LOGGER.debug("Invoking {} onClose callbacks for task {} BEFORE closing", (Object)this.onCloseCallbacks.size(), (Object)this.taskId);
                for (Runnable callback : this.onCloseCallbacks) {
                    try {
                        callback.run();
                    }
                    catch (Exception e) {
                        LOGGER.error("Error in onClose callback for task {}", (Object)this.taskId, (Object)e);
                    }
                }
            }
            super.doClose(immediate);
        }

        @Override
        public void close() {
            this.close(false);
        }

        @Override
        public void close(boolean immediate) {
            this.doClose(immediate);
            if (immediate) {
                this.children.forEach(child -> child.doClose(immediate));
            }
            this.children.clear();
        }

        @Override
        public void close(boolean immediate, boolean notifyParent) {
            throw new UnsupportedOperationException("MainQueue does not support notifyParent parameter - use close(boolean) instead");
        }

        String getTaskId() {
            return this.taskId;
        }
    }
}

