/*
 * Decompiled with CFR 0.152.
 */
package io.a2a.server.requesthandlers;

import io.a2a.jsonrpc.common.wrappers.ListTasksResult;
import io.a2a.server.ServerCallContext;
import io.a2a.server.agentexecution.AgentExecutor;
import io.a2a.server.agentexecution.RequestContext;
import io.a2a.server.agentexecution.SimpleRequestContextBuilder;
import io.a2a.server.config.A2AConfigProvider;
import io.a2a.server.events.EnhancedRunnable;
import io.a2a.server.events.EventConsumer;
import io.a2a.server.events.EventQueue;
import io.a2a.server.events.EventQueueItem;
import io.a2a.server.events.QueueManager;
import io.a2a.server.events.TaskQueueExistsException;
import io.a2a.server.requesthandlers.RequestHandler;
import io.a2a.server.tasks.PushNotificationConfigStore;
import io.a2a.server.tasks.PushNotificationSender;
import io.a2a.server.tasks.ResultAggregator;
import io.a2a.server.tasks.TaskManager;
import io.a2a.server.tasks.TaskStore;
import io.a2a.server.util.async.AsyncUtils;
import io.a2a.server.util.async.Internal;
import io.a2a.spec.A2AError;
import io.a2a.spec.DeleteTaskPushNotificationConfigParams;
import io.a2a.spec.Event;
import io.a2a.spec.EventKind;
import io.a2a.spec.GetTaskPushNotificationConfigParams;
import io.a2a.spec.InternalError;
import io.a2a.spec.InvalidParamsError;
import io.a2a.spec.ListTaskPushNotificationConfigParams;
import io.a2a.spec.ListTaskPushNotificationConfigResult;
import io.a2a.spec.ListTasksParams;
import io.a2a.spec.MessageSendParams;
import io.a2a.spec.PushNotificationConfig;
import io.a2a.spec.StreamingEventKind;
import io.a2a.spec.Task;
import io.a2a.spec.TaskIdParams;
import io.a2a.spec.TaskNotCancelableError;
import io.a2a.spec.TaskNotFoundError;
import io.a2a.spec.TaskPushNotificationConfig;
import io.a2a.spec.TaskQueryParams;
import io.a2a.spec.TaskState;
import io.a2a.spec.UnsupportedOperationError;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class DefaultRequestHandler
implements RequestHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRequestHandler.class);
    private static final String A2A_BLOCKING_AGENT_TIMEOUT_SECONDS = "a2a.blocking.agent.timeout.seconds";
    private static final String A2A_BLOCKING_CONSUMPTION_TIMEOUT_SECONDS = "a2a.blocking.consumption.timeout.seconds";
    @Inject
    A2AConfigProvider configProvider;
    int agentCompletionTimeoutSeconds;
    int consumptionCompletionTimeoutSeconds;
    private AgentExecutor agentExecutor;
    private TaskStore taskStore;
    private QueueManager queueManager;
    private PushNotificationConfigStore pushConfigStore;
    private PushNotificationSender pushSender;
    private Supplier<RequestContext.Builder> requestContextBuilder;
    private final ConcurrentMap<String, CompletableFuture<Void>> runningAgents = new ConcurrentHashMap<String, CompletableFuture<Void>>();
    private final Set<CompletableFuture<Void>> backgroundTasks = ConcurrentHashMap.newKeySet();
    private Executor executor;

    protected DefaultRequestHandler() {
        this.agentExecutor = null;
        this.taskStore = null;
        this.queueManager = null;
        this.pushConfigStore = null;
        this.pushSender = null;
        this.requestContextBuilder = null;
        this.executor = null;
    }

    @Inject
    public DefaultRequestHandler(AgentExecutor agentExecutor, TaskStore taskStore, QueueManager queueManager, PushNotificationConfigStore pushConfigStore, PushNotificationSender pushSender, @Internal Executor executor) {
        this.agentExecutor = agentExecutor;
        this.taskStore = taskStore;
        this.queueManager = queueManager;
        this.pushConfigStore = pushConfigStore;
        this.pushSender = pushSender;
        this.executor = executor;
        this.requestContextBuilder = () -> new SimpleRequestContextBuilder(taskStore, false);
    }

    @PostConstruct
    void initConfig() {
        this.agentCompletionTimeoutSeconds = Integer.parseInt(this.configProvider.getValue(A2A_BLOCKING_AGENT_TIMEOUT_SECONDS));
        this.consumptionCompletionTimeoutSeconds = Integer.parseInt(this.configProvider.getValue(A2A_BLOCKING_CONSUMPTION_TIMEOUT_SECONDS));
    }

    public static DefaultRequestHandler create(AgentExecutor agentExecutor, TaskStore taskStore, QueueManager queueManager, PushNotificationConfigStore pushConfigStore, PushNotificationSender pushSender, Executor executor) {
        DefaultRequestHandler handler = new DefaultRequestHandler(agentExecutor, taskStore, queueManager, pushConfigStore, pushSender, executor);
        handler.agentCompletionTimeoutSeconds = 5;
        handler.consumptionCompletionTimeoutSeconds = 2;
        return handler;
    }

    @Override
    public Task onGetTask(TaskQueryParams params, ServerCallContext context) throws A2AError {
        LOGGER.debug("onGetTask {}", (Object)params.id());
        Task task = this.taskStore.get(params.id());
        if (task == null) {
            LOGGER.debug("No task found for {}. Throwing TaskNotFoundError", (Object)params.id());
            throw new TaskNotFoundError();
        }
        task = DefaultRequestHandler.limitTaskHistory(task, params.historyLength());
        LOGGER.debug("Task found {}", (Object)task);
        return task;
    }

    private static Task limitTaskHistory(Task task, @Nullable Integer historyLength) {
        if (task.history() == null || historyLength == null || historyLength >= task.history().size()) {
            return task;
        }
        List limitedHistory = task.history().subList(task.history().size() - historyLength, task.history().size());
        return Task.builder((Task)task).history(limitedHistory).build();
    }

    @Override
    public ListTasksResult onListTasks(ListTasksParams params, ServerCallContext context) throws A2AError {
        LOGGER.debug("onListTasks with contextId={}, status={}, pageSize={}, pageToken={}, lastUpdatedAfter={}", new Object[]{params.contextId(), params.status(), params.pageSize(), params.pageToken(), params.lastUpdatedAfter()});
        if (params.lastUpdatedAfter() != null) {
            Instant now = Instant.now();
            if (params.lastUpdatedAfter().isAfter(now)) {
                HashMap<String, String> errorData = new HashMap<String, String>();
                errorData.put("parameter", "lastUpdatedAfter");
                errorData.put("reason", "Timestamp cannot be in the future");
                throw new InvalidParamsError(null, "Invalid params", errorData);
            }
        }
        ListTasksResult result = this.taskStore.list(params);
        LOGGER.debug("Found {} tasks (total: {})", (Object)result.pageSize(), (Object)result.totalSize());
        return result;
    }

    @Override
    public Task onCancelTask(TaskIdParams params, ServerCallContext context) throws A2AError {
        Task task = this.taskStore.get(params.id());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        if (task.status().state().isFinal()) {
            throw new TaskNotCancelableError("Task cannot be canceled - current state: " + task.status().state().asString());
        }
        TaskManager taskManager = new TaskManager(task.id(), task.contextId(), this.taskStore, null);
        ResultAggregator resultAggregator = new ResultAggregator(taskManager, null, this.executor);
        EventQueue queue = this.queueManager.tap(task.id());
        if (queue == null) {
            queue = this.queueManager.getEventQueueBuilder(task.id()).build();
        }
        this.agentExecutor.cancel(this.requestContextBuilder.get().setTaskId(task.id()).setContextId(task.contextId()).setTask(task).setServerCallContext(context).build(), queue);
        Optional.ofNullable((CompletableFuture)this.runningAgents.get(task.id())).ifPresent(cf -> cf.cancel(true));
        EventConsumer consumer = new EventConsumer(queue);
        EventKind type = resultAggregator.consumeAll(consumer);
        if (!(type instanceof Task)) {
            throw new InternalError("Agent did not return valid response for cancel");
        }
        Task tempTask = (Task)type;
        if (tempTask.status().state() != TaskState.CANCELED) {
            throw new TaskNotCancelableError("Task cannot be canceled - current state: " + tempTask.status().state().asString());
        }
        return tempTask;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public EventKind onMessageSend(MessageSendParams params, ServerCallContext context) throws A2AError {
        LOGGER.debug("onMessageSend - task: {}; context {}", (Object)params.message().taskId(), (Object)params.message().contextId());
        MessageSendSetup mss = this.initMessageSend(params, context);
        String taskId = mss.requestContext.getTaskId();
        LOGGER.debug("Request context taskId: {}", (Object)taskId);
        if (taskId == null) {
            throw new InternalError("Task ID is null in onMessageSend");
        }
        EventQueue queue = this.queueManager.createOrTap(taskId);
        ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null, this.executor);
        boolean blocking = params.configuration() != null && Boolean.TRUE.equals(params.configuration().blocking());
        boolean interruptedOrNonBlocking = false;
        EnhancedRunnable producerRunnable = this.registerAndExecuteAgentAsync(taskId, mss.requestContext, queue);
        ResultAggregator.EventTypeAndInterrupt etai = null;
        EventKind kind = null;
        try {
            Task taskResult;
            Runnable pushNotificationCallback = () -> this.sendPushNotification(taskId, resultAggregator);
            EventConsumer consumer = new EventConsumer(queue);
            producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback());
            CompletableFuture agentFuture = (CompletableFuture)this.runningAgents.get(taskId);
            etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, blocking);
            if (etai == null) {
                LOGGER.debug("No result, throwing InternalError");
                throw new InternalError("No result");
            }
            interruptedOrNonBlocking = etai.interrupted();
            LOGGER.debug("Was interrupted or non-blocking: {}", (Object)interruptedOrNonBlocking);
            kind = etai.eventType();
            if (mss.task() == null && kind instanceof Task) {
                Task createdTask = (Task)kind;
                if (this.shouldAddPushInfo(params)) {
                    LOGGER.debug("Storing push notification config for new task {}", (Object)createdTask.id());
                    this.pushConfigStore.setInfo(createdTask.id(), params.configuration().pushNotificationConfig());
                }
            }
            if (blocking && interruptedOrNonBlocking) {
                try {
                    if (agentFuture != null) {
                        try {
                            agentFuture.get(this.agentCompletionTimeoutSeconds, TimeUnit.SECONDS);
                            LOGGER.debug("Agent completed for task {}", (Object)taskId);
                        }
                        catch (TimeoutException e) {
                            LOGGER.debug("Agent still running for task {} after {}s", (Object)taskId, (Object)this.agentCompletionTimeoutSeconds);
                        }
                    }
                    queue.close(false, false);
                    LOGGER.debug("Closed queue for task {} to allow consumption completion", (Object)taskId);
                    if (etai.consumptionFuture() != null) {
                        etai.consumptionFuture().get(this.consumptionCompletionTimeoutSeconds, TimeUnit.SECONDS);
                        LOGGER.debug("Consumption completed for task {}", (Object)taskId);
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    String msg = String.format("Error waiting for task %s completion", taskId);
                    LOGGER.warn(msg, (Throwable)e);
                    throw new InternalError(msg);
                }
                catch (ExecutionException e) {
                    String msg = String.format("Error during task %s execution", taskId);
                    LOGGER.warn(msg, e.getCause());
                    throw new InternalError(msg);
                }
                catch (TimeoutException e) {
                    String msg = String.format("Timeout waiting for consumption to complete for task %s", taskId);
                    LOGGER.warn(msg, (Object)taskId);
                    throw new InternalError(msg);
                }
                String nonNullTaskId = taskId;
                Task updatedTask = this.taskStore.get(nonNullTaskId);
                if (updatedTask != null) {
                    kind = updatedTask;
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Fetched final task for {} with state {} and {} artifacts", new Object[]{nonNullTaskId, updatedTask.status().state(), updatedTask.artifacts().size()});
                    }
                }
            }
            if (kind instanceof Task && !taskId.equals((taskResult = (Task)kind).id())) {
                throw new InternalError("Task ID mismatch in agent response");
            }
            pushNotificationCallback.run();
        }
        catch (Throwable throwable) {
            CompletableFuture agentFuture = (CompletableFuture)this.runningAgents.remove(taskId);
            LOGGER.debug("Removed agent for task {} from runningAgents in finally block, size after: {}", (Object)taskId, (Object)this.runningAgents.size());
            this.trackBackgroundTask(this.cleanupProducer(agentFuture, etai != null ? etai.consumptionFuture() : null, taskId, queue, false));
            throw throwable;
        }
        CompletableFuture agentFuture = (CompletableFuture)this.runningAgents.remove(taskId);
        LOGGER.debug("Removed agent for task {} from runningAgents in finally block, size after: {}", (Object)taskId, (Object)this.runningAgents.size());
        this.trackBackgroundTask(this.cleanupProducer(agentFuture, etai != null ? etai.consumptionFuture() : null, taskId, queue, false));
        LOGGER.debug("Returning: {}", (Object)kind);
        return kind;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Flow.Publisher<StreamingEventKind> onMessageSendStream(MessageSendParams params, ServerCallContext context) throws A2AError {
        Flow.Publisher<StreamingEventKind> publisher;
        LOGGER.debug("onMessageSendStream START - task: {}; context: {}; runningAgents: {}; backgroundTasks: {}", new Object[]{params.message().taskId(), params.message().contextId(), this.runningAgents.size(), this.backgroundTasks.size()});
        MessageSendSetup mss = this.initMessageSend(params, context);
        @Nullable String initialTaskId = mss.requestContext.getTaskId();
        String queueTaskId = initialTaskId != null ? initialTaskId : "temp-" + String.valueOf(UUID.randomUUID());
        final AtomicReference<@NonNull String> taskId = new AtomicReference<String>(queueTaskId);
        EventQueue queue = this.queueManager.createOrTap(taskId.get());
        LOGGER.debug("Created/tapped queue for task {}: {}", (Object)taskId.get(), (Object)queue);
        final ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null, this.executor);
        EnhancedRunnable producerRunnable = this.registerAndExecuteAgentAsync(queueTaskId, mss.requestContext, queue);
        final EventConsumer consumer = new EventConsumer(queue);
        producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback());
        final AtomicBoolean backgroundConsumeStarted = new AtomicBoolean(false);
        try {
            Flow.Publisher<EventQueueItem> results = resultAggregator.consumeAndEmit(consumer);
            Flow.Publisher<EventQueueItem> processed = AsyncUtils.processor(AsyncUtils.createTubeConfig(), results, (errorConsumer, item) -> {
                EventKind latest;
                Event event = item.getEvent();
                if (event instanceof Task) {
                    Task createdTask = (Task)event;
                    if (!Objects.equals(taskId.get(), createdTask.id())) {
                        errorConsumer.accept(new InternalError("Task ID mismatch in agent response"));
                    }
                    try {
                        this.queueManager.add(createdTask.id(), queue);
                        taskId.set(createdTask.id());
                    }
                    catch (TaskQueueExistsException taskQueueExistsException) {
                        // empty catch block
                    }
                    if (this.pushConfigStore != null && params.configuration() != null && params.configuration().pushNotificationConfig() != null) {
                        this.pushConfigStore.setInfo(createdTask.id(), params.configuration().pushNotificationConfig());
                    }
                }
                String currentTaskId = (String)taskId.get();
                if (this.pushSender != null && currentTaskId != null && (latest = resultAggregator.getCurrentResult()) instanceof Task) {
                    Task latestTask = (Task)latest;
                    this.pushSender.sendNotification(latestTask);
                }
                return true;
            });
            Flow.Publisher<Event> eventPublisher = AsyncUtils.convertingProcessor(processed, EventQueueItem::getEvent);
            Flow.Publisher<StreamingEventKind> finalPublisher = AsyncUtils.convertingProcessor(eventPublisher, event -> (StreamingEventKind)event);
            publisher = subscriber -> {
                String currentTaskId = (String)taskId.get();
                LOGGER.debug("Creating subscription wrapper for task {}", (Object)currentTaskId);
                finalPublisher.subscribe(new Flow.Subscriber<StreamingEventKind>(){
                    private  @Nullable Flow.Subscription subscription;

                    @Override
                    public void onSubscribe(final Flow.Subscription subscription) {
                        LOGGER.debug("onSubscribe called for task {}", taskId.get());
                        this.subscription = subscription;
                        subscriber.onSubscribe(new Flow.Subscription(){

                            @Override
                            public void request(long n) {
                                LOGGER.debug("Subscription.request({}) for task {}", (Object)n, taskId.get());
                                subscription.request(n);
                            }

                            @Override
                            public void cancel() {
                                LOGGER.debug("Client cancelled subscription for task {}, starting background consumption", taskId.get());
                                this.startBackgroundConsumption();
                                subscription.cancel();
                            }
                        });
                    }

                    @Override
                    public void onNext(StreamingEventKind item) {
                        LOGGER.debug("onNext: {} for task {}", (Object)item.getClass().getSimpleName(), taskId.get());
                        subscriber.onNext(item);
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        LOGGER.error("onError for task {}", taskId.get(), (Object)throwable);
                        subscriber.onError(throwable);
                    }

                    @Override
                    public void onComplete() {
                        LOGGER.debug("onComplete for task {}", taskId.get());
                        try {
                            subscriber.onComplete();
                        }
                        catch (IllegalStateException e) {
                            if (e.getMessage() != null && e.getMessage().contains("Response has already been written")) {
                                LOGGER.debug("Client disconnected before onComplete, response already closed for task {}", taskId.get());
                            }
                            throw e;
                        }
                    }

                    private void startBackgroundConsumption() {
                        if (backgroundConsumeStarted.compareAndSet(false, true)) {
                            LOGGER.debug("Starting background consumption for task {}", taskId.get());
                            CompletableFuture<Void> bgTask = CompletableFuture.runAsync(() -> {
                                try {
                                    LOGGER.debug("Background consumption thread started for task {}", taskId.get());
                                    resultAggregator.consumeAll(consumer);
                                    LOGGER.debug("Background consumption completed for task {}", taskId.get());
                                }
                                catch (Exception e) {
                                    LOGGER.error("Error during background consumption for task {}", taskId.get(), (Object)e);
                                }
                            }, DefaultRequestHandler.this.executor);
                            DefaultRequestHandler.this.trackBackgroundTask(bgTask);
                        } else {
                            LOGGER.debug("Background consumption already started for task {}", taskId.get());
                        }
                    }
                });
            };
        }
        catch (Throwable throwable) {
            LOGGER.debug("onMessageSendStream FINALLY - task: {}; runningAgents: {}; backgroundTasks: {}", new Object[]{taskId.get(), this.runningAgents.size(), this.backgroundTasks.size()});
            CompletableFuture agentFuture = (CompletableFuture)this.runningAgents.remove(taskId.get());
            LOGGER.debug("Removed agent for task {} from runningAgents in finally block, size after: {}", (Object)taskId.get(), (Object)this.runningAgents.size());
            this.trackBackgroundTask(this.cleanupProducer(agentFuture, null, Objects.requireNonNull(taskId.get()), queue, true));
            throw throwable;
        }
        LOGGER.debug("onMessageSendStream FINALLY - task: {}; runningAgents: {}; backgroundTasks: {}", new Object[]{taskId.get(), this.runningAgents.size(), this.backgroundTasks.size()});
        CompletableFuture agentFuture = (CompletableFuture)this.runningAgents.remove(taskId.get());
        LOGGER.debug("Removed agent for task {} from runningAgents in finally block, size after: {}", (Object)taskId.get(), (Object)this.runningAgents.size());
        this.trackBackgroundTask(this.cleanupProducer(agentFuture, null, Objects.requireNonNull(taskId.get()), queue, true));
        return publisher;
    }

    @Override
    public TaskPushNotificationConfig onSetTaskPushNotificationConfig(TaskPushNotificationConfig params, ServerCallContext context) throws A2AError {
        if (this.pushConfigStore == null) {
            throw new UnsupportedOperationError();
        }
        Task task = this.taskStore.get(params.taskId());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        PushNotificationConfig pushNotificationConfig = this.pushConfigStore.setInfo(params.taskId(), params.pushNotificationConfig());
        return new TaskPushNotificationConfig(params.taskId(), pushNotificationConfig, params.tenant());
    }

    @Override
    public TaskPushNotificationConfig onGetTaskPushNotificationConfig(GetTaskPushNotificationConfigParams params, ServerCallContext context) throws A2AError {
        if (this.pushConfigStore == null) {
            throw new UnsupportedOperationError();
        }
        Task task = this.taskStore.get(params.id());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        ListTaskPushNotificationConfigResult listTaskPushNotificationConfigResult = this.pushConfigStore.getInfo(new ListTaskPushNotificationConfigParams(params.id()));
        if (listTaskPushNotificationConfigResult == null || listTaskPushNotificationConfigResult.isEmpty()) {
            throw new InternalError("No push notification config found");
        }
        @Nullable String configId = params.pushNotificationConfigId();
        return new TaskPushNotificationConfig(params.id(), this.getPushNotificationConfig(listTaskPushNotificationConfigResult, configId), params.tenant());
    }

    private PushNotificationConfig getPushNotificationConfig(ListTaskPushNotificationConfigResult notificationConfigList, @Nullable String configId) {
        if (configId != null) {
            for (TaskPushNotificationConfig notificationConfig : notificationConfigList.configs()) {
                if (!configId.equals(notificationConfig.pushNotificationConfig().id())) continue;
                return notificationConfig.pushNotificationConfig();
            }
        }
        return ((TaskPushNotificationConfig)notificationConfigList.configs().get(0)).pushNotificationConfig();
    }

    @Override
    public Flow.Publisher<StreamingEventKind> onResubscribeToTask(TaskIdParams params, ServerCallContext context) throws A2AError {
        LOGGER.debug("onResubscribeToTask - taskId: {}", (Object)params.id());
        Task task = this.taskStore.get(params.id());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        TaskManager taskManager = new TaskManager(task.id(), task.contextId(), this.taskStore, null);
        ResultAggregator resultAggregator = new ResultAggregator(taskManager, null, this.executor);
        EventQueue queue = this.queueManager.tap(task.id());
        LOGGER.debug("onResubscribeToTask - tapped queue: {}", queue != null ? Integer.valueOf(System.identityHashCode(queue)) : "null");
        if (queue == null) {
            if (task.status().state().isFinal()) {
                throw new TaskNotFoundError();
            }
            LOGGER.debug("Queue not found for active task {}, creating new queue for future events", (Object)task.id());
            queue = this.queueManager.createOrTap(task.id());
        }
        EventConsumer consumer = new EventConsumer(queue);
        Flow.Publisher<EventQueueItem> results = resultAggregator.consumeAndEmit(consumer);
        LOGGER.debug("onResubscribeToTask - returning publisher for taskId: {}", (Object)params.id());
        return AsyncUtils.convertingProcessor(results, item -> (StreamingEventKind)item.getEvent());
    }

    @Override
    public ListTaskPushNotificationConfigResult onListTaskPushNotificationConfig(ListTaskPushNotificationConfigParams params, ServerCallContext context) throws A2AError {
        if (this.pushConfigStore == null) {
            throw new UnsupportedOperationError();
        }
        Task task = this.taskStore.get(params.id());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        return this.pushConfigStore.getInfo(params);
    }

    @Override
    public void onDeleteTaskPushNotificationConfig(DeleteTaskPushNotificationConfigParams params, ServerCallContext context) {
        if (this.pushConfigStore == null) {
            throw new UnsupportedOperationError();
        }
        Task task = this.taskStore.get(params.id());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        this.pushConfigStore.deleteInfo(params.id(), params.pushNotificationConfigId());
    }

    private boolean shouldAddPushInfo(MessageSendParams params) {
        return this.pushConfigStore != null && params.configuration() != null && params.configuration().pushNotificationConfig() != null;
    }

    private EnhancedRunnable registerAndExecuteAgentAsync(final String taskId, final RequestContext requestContext, final EventQueue queue) {
        LOGGER.debug("Registering agent execution for task {}, runningAgents.size() before: {}", (Object)taskId, (Object)this.runningAgents.size());
        this.logThreadStats("AGENT START");
        EnhancedRunnable runnable = new EnhancedRunnable(){

            @Override
            public void run() {
                LOGGER.debug("Agent execution starting for task {}", (Object)taskId);
                DefaultRequestHandler.this.agentExecutor.execute(requestContext, queue);
                LOGGER.debug("Agent execution completed for task {}", (Object)taskId);
            }
        };
        CompletionStage cf = CompletableFuture.runAsync(runnable, this.executor).whenComplete((v, err) -> {
            if (err != null) {
                LOGGER.error("Agent execution failed for task {}", (Object)taskId, err);
                runnable.setError((Throwable)err);
            }
            this.logThreadStats("AGENT COMPLETE END");
            runnable.invokeDoneCallbacks();
        });
        this.runningAgents.put(taskId, (CompletableFuture<Void>)cf);
        LOGGER.debug("Registered agent for task {}, runningAgents.size() after: {}", (Object)taskId, (Object)this.runningAgents.size());
        return runnable;
    }

    private void trackBackgroundTask(CompletableFuture<Void> task) {
        this.backgroundTasks.add(task);
        LOGGER.debug("Tracking background task (total: {}): {}", (Object)this.backgroundTasks.size(), task);
        task.whenComplete((result, throwable) -> {
            try {
                if (throwable != null) {
                    Throwable cause = throwable;
                    if (throwable instanceof CompletionException && throwable.getCause() != null) {
                        cause = throwable.getCause();
                    }
                    if (cause instanceof CancellationException) {
                        LOGGER.debug("Background task cancelled: {}", (Object)task);
                    } else {
                        LOGGER.error("Background task failed", throwable);
                    }
                }
            }
            finally {
                this.backgroundTasks.remove(task);
                LOGGER.debug("Removed background task (remaining: {}): {}", (Object)this.backgroundTasks.size(), (Object)task);
            }
        });
    }

    public CompletableFuture<Void> waitForBackgroundTasks() {
        CompletableFuture[] tasks = this.backgroundTasks.toArray(new CompletableFuture[0]);
        if (tasks.length == 0) {
            return CompletableFuture.completedFuture(null);
        }
        LOGGER.debug("Waiting for {} background tasks to complete", (Object)tasks.length);
        return CompletableFuture.allOf(tasks);
    }

    private CompletableFuture<Void> cleanupProducer(@Nullable CompletableFuture<Void> agentFuture, @Nullable CompletableFuture<Void> consumptionFuture, String taskId, EventQueue queue, boolean isStreaming) {
        LOGGER.debug("Starting cleanup for task {} (streaming={})", (Object)taskId, (Object)isStreaming);
        this.logThreadStats("CLEANUP START");
        if (agentFuture == null) {
            LOGGER.debug("No running agent found for task {}, cleanup complete", (Object)taskId);
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> bothComplete = agentFuture;
        if (consumptionFuture != null) {
            bothComplete = CompletableFuture.allOf(agentFuture, consumptionFuture);
            LOGGER.debug("Cleanup will wait for both agent and consumption to complete for task {}", (Object)taskId);
        }
        return bothComplete.whenComplete((v, t) -> {
            if (t != null) {
                LOGGER.debug("Agent/consumption completed with error for task {}", (Object)taskId, t);
            } else {
                LOGGER.debug("Agent and consumption both completed successfully for task {}", (Object)taskId);
            }
            LOGGER.debug("{} call, closing ChildQueue for task {} (immediate=false, notifyParent=true)", (Object)(isStreaming ? "Streaming" : "Non-streaming"), (Object)taskId);
            queue.close(false, true);
            LOGGER.debug("Queue cleanup completed for task {}", (Object)taskId);
            this.logThreadStats("CLEANUP END");
        });
    }

    private MessageSendSetup initMessageSend(MessageSendParams params, ServerCallContext context) {
        TaskManager taskManager = new TaskManager(params.message().taskId(), params.message().contextId(), this.taskStore, params.message());
        Task task = taskManager.getTask();
        if (task != null) {
            LOGGER.debug("Found task updating with message {}", (Object)params.message());
            task = taskManager.updateWithMessage(params.message(), task);
            if (this.shouldAddPushInfo(params)) {
                LOGGER.debug("Adding push info");
                this.pushConfigStore.setInfo(task.id(), params.configuration().pushNotificationConfig());
            }
        }
        RequestContext requestContext = this.requestContextBuilder.get().setParams(params).setTaskId(task == null ? null : task.id()).setContextId(params.message().contextId()).setTask(task).setServerCallContext(context).build();
        return new MessageSendSetup(taskManager, task, requestContext);
    }

    private void sendPushNotification(String taskId, ResultAggregator resultAggregator) {
        EventKind latest;
        if (this.pushSender != null && (latest = resultAggregator.getCurrentResult()) instanceof Task) {
            Task latestTask = (Task)latest;
            this.pushSender.sendNotification(latestTask);
        }
    }

    private void logThreadStats(String label) {
        if (!LOGGER.isDebugEnabled()) {
            return;
        }
        ThreadGroup rootGroup = Thread.currentThread().getThreadGroup();
        while (rootGroup.getParent() != null) {
            rootGroup = rootGroup.getParent();
        }
        int activeThreads = rootGroup.activeCount();
        LOGGER.debug("=== THREAD STATS: {} ===", (Object)label);
        LOGGER.debug("Active threads: {}", (Object)activeThreads);
        LOGGER.debug("Running agents: {}", (Object)this.runningAgents.size());
        LOGGER.debug("Background tasks: {}", (Object)this.backgroundTasks.size());
        LOGGER.debug("Queue manager active queues: {}", (Object)this.queueManager.getClass().getSimpleName());
        if (!this.runningAgents.isEmpty()) {
            LOGGER.debug("Running agent tasks:");
            this.runningAgents.forEach((taskId, future) -> LOGGER.debug("  - Task {}: {}", taskId, (Object)(future.isDone() ? "DONE" : "RUNNING")));
        }
        if (!this.backgroundTasks.isEmpty()) {
            LOGGER.debug("Background tasks:");
            this.backgroundTasks.forEach(task -> LOGGER.debug("  - {}: {}", task, (Object)(task.isDone() ? "DONE" : "RUNNING")));
        }
        LOGGER.debug("=== END THREAD STATS ===");
    }

    private record MessageSendSetup(TaskManager taskManager, @Nullable Task task, RequestContext requestContext) {
    }
}

