/*
 * Decompiled with CFR 0.152.
 */
package io.a2a.server.requesthandlers;

import io.a2a.jsonrpc.common.wrappers.ListTasksResult;
import io.a2a.server.ServerCallContext;
import io.a2a.server.agentexecution.AgentExecutor;
import io.a2a.server.agentexecution.RequestContext;
import io.a2a.server.agentexecution.SimpleRequestContextBuilder;
import io.a2a.server.config.A2AConfigProvider;
import io.a2a.server.events.EnhancedRunnable;
import io.a2a.server.events.EventConsumer;
import io.a2a.server.events.EventQueue;
import io.a2a.server.events.EventQueueItem;
import io.a2a.server.events.MainEventBusProcessor;
import io.a2a.server.events.QueueManager;
import io.a2a.server.requesthandlers.RequestHandler;
import io.a2a.server.tasks.AgentEmitter;
import io.a2a.server.tasks.PushNotificationConfigStore;
import io.a2a.server.tasks.ResultAggregator;
import io.a2a.server.tasks.TaskManager;
import io.a2a.server.tasks.TaskStore;
import io.a2a.server.util.async.AsyncUtils;
import io.a2a.server.util.async.EventConsumerExecutorProducer;
import io.a2a.server.util.async.Internal;
import io.a2a.spec.A2AError;
import io.a2a.spec.DeleteTaskPushNotificationConfigParams;
import io.a2a.spec.Event;
import io.a2a.spec.EventKind;
import io.a2a.spec.GetTaskPushNotificationConfigParams;
import io.a2a.spec.InternalError;
import io.a2a.spec.InvalidParamsError;
import io.a2a.spec.ListTaskPushNotificationConfigParams;
import io.a2a.spec.ListTaskPushNotificationConfigResult;
import io.a2a.spec.ListTasksParams;
import io.a2a.spec.Message;
import io.a2a.spec.MessageSendParams;
import io.a2a.spec.PushNotificationConfig;
import io.a2a.spec.StreamingEventKind;
import io.a2a.spec.Task;
import io.a2a.spec.TaskIdParams;
import io.a2a.spec.TaskNotCancelableError;
import io.a2a.spec.TaskNotFoundError;
import io.a2a.spec.TaskPushNotificationConfig;
import io.a2a.spec.TaskQueryParams;
import io.a2a.spec.TaskState;
import io.a2a.spec.TaskStatusUpdateEvent;
import io.a2a.spec.UnsupportedOperationError;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class DefaultRequestHandler
implements RequestHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRequestHandler.class);
    private static final Logger THREAD_STATS_LOGGER = LoggerFactory.getLogger((String)"io.a2a.server.diagnostics.ThreadStats");
    private static final String A2A_BLOCKING_AGENT_TIMEOUT_SECONDS = "a2a.blocking.agent.timeout.seconds";
    private static final String A2A_BLOCKING_CONSUMPTION_TIMEOUT_SECONDS = "a2a.blocking.consumption.timeout.seconds";
    @Inject
    A2AConfigProvider configProvider;
    int agentCompletionTimeoutSeconds;
    int consumptionCompletionTimeoutSeconds;
    private AgentExecutor agentExecutor;
    private TaskStore taskStore;
    private QueueManager queueManager;
    private PushNotificationConfigStore pushConfigStore;
    private MainEventBusProcessor mainEventBusProcessor;
    private Supplier<RequestContext.Builder> requestContextBuilder;
    private final ConcurrentMap<String, CompletableFuture<Void>> runningAgents = new ConcurrentHashMap<String, CompletableFuture<Void>>();
    private Executor executor;
    private Executor eventConsumerExecutor;

    protected DefaultRequestHandler() {
        this.agentExecutor = null;
        this.taskStore = null;
        this.queueManager = null;
        this.pushConfigStore = null;
        this.mainEventBusProcessor = null;
        this.requestContextBuilder = null;
        this.executor = null;
        this.eventConsumerExecutor = null;
    }

    @Inject
    public DefaultRequestHandler(AgentExecutor agentExecutor, TaskStore taskStore, QueueManager queueManager, PushNotificationConfigStore pushConfigStore, MainEventBusProcessor mainEventBusProcessor, @Internal Executor executor, @EventConsumerExecutorProducer.EventConsumerExecutor Executor eventConsumerExecutor) {
        this.agentExecutor = agentExecutor;
        this.taskStore = taskStore;
        this.queueManager = queueManager;
        this.pushConfigStore = pushConfigStore;
        this.mainEventBusProcessor = mainEventBusProcessor;
        this.executor = executor;
        this.eventConsumerExecutor = eventConsumerExecutor;
        this.requestContextBuilder = () -> new SimpleRequestContextBuilder(taskStore, false);
    }

    @PostConstruct
    void initConfig() {
        this.agentCompletionTimeoutSeconds = Integer.parseInt(this.configProvider.getValue(A2A_BLOCKING_AGENT_TIMEOUT_SECONDS));
        this.consumptionCompletionTimeoutSeconds = Integer.parseInt(this.configProvider.getValue(A2A_BLOCKING_CONSUMPTION_TIMEOUT_SECONDS));
    }

    public static DefaultRequestHandler create(AgentExecutor agentExecutor, TaskStore taskStore, QueueManager queueManager, PushNotificationConfigStore pushConfigStore, MainEventBusProcessor mainEventBusProcessor, Executor executor, Executor eventConsumerExecutor) {
        DefaultRequestHandler handler = new DefaultRequestHandler(agentExecutor, taskStore, queueManager, pushConfigStore, mainEventBusProcessor, executor, eventConsumerExecutor);
        handler.agentCompletionTimeoutSeconds = 5;
        handler.consumptionCompletionTimeoutSeconds = 2;
        return handler;
    }

    @Override
    public Task onGetTask(TaskQueryParams params, ServerCallContext context) throws A2AError {
        LOGGER.debug("onGetTask {}", (Object)params.id());
        Task task = this.taskStore.get(params.id());
        if (task == null) {
            LOGGER.debug("No task found for {}. Throwing TaskNotFoundError", (Object)params.id());
            throw new TaskNotFoundError();
        }
        task = DefaultRequestHandler.limitTaskHistory(task, params.historyLength());
        LOGGER.debug("Task found {}", (Object)task);
        return task;
    }

    private static Task limitTaskHistory(Task task, @Nullable Integer historyLength) {
        if (task.history() == null || historyLength == null || historyLength >= task.history().size()) {
            return task;
        }
        List limitedHistory = task.history().subList(task.history().size() - historyLength, task.history().size());
        return Task.builder((Task)task).history(limitedHistory).build();
    }

    @Override
    public ListTasksResult onListTasks(ListTasksParams params, ServerCallContext context) throws A2AError {
        LOGGER.debug("onListTasks with contextId={}, status={}, pageSize={}, pageToken={}, statusTimestampAfter={}", new Object[]{params.contextId(), params.status(), params.pageSize(), params.pageToken(), params.statusTimestampAfter()});
        if (params.statusTimestampAfter() != null) {
            Instant now = Instant.now();
            if (params.statusTimestampAfter().isAfter(now)) {
                HashMap<String, String> errorData = new HashMap<String, String>();
                errorData.put("parameter", "lastUpdatedAfter");
                errorData.put("reason", "Timestamp cannot be in the future");
                throw new InvalidParamsError(null, "Invalid params", errorData);
            }
            long millis = params.statusTimestampAfter().toEpochMilli();
            if (millis < 0L) {
                HashMap<String, Object> errorData = new HashMap<String, Object>();
                errorData.put("parameter", "statusTimestampAfter");
                errorData.put("reason", "Must be a non-negative timestamp value, got: " + millis);
                throw new InvalidParamsError(null, "Invalid params", errorData);
            }
        }
        ListTasksResult result = this.taskStore.list(params);
        LOGGER.debug("Found {} tasks (total: {})", (Object)result.pageSize(), (Object)result.totalSize());
        return result;
    }

    @Override
    public Task onCancelTask(TaskIdParams params, ServerCallContext context) throws A2AError {
        Task task = this.taskStore.get(params.id());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        if (task.status().state().isFinal()) {
            throw new TaskNotCancelableError("Task cannot be canceled - current state: " + String.valueOf(task.status().state()));
        }
        TaskManager taskManager = new TaskManager(task.id(), task.contextId(), this.taskStore, null);
        ResultAggregator resultAggregator = new ResultAggregator(taskManager, null, this.executor, this.eventConsumerExecutor);
        EventQueue queue = this.queueManager.createOrTap(task.id());
        RequestContext cancelRequestContext = this.requestContextBuilder.get().setTaskId(task.id()).setContextId(task.contextId()).setTask(task).setServerCallContext(context).build();
        AgentEmitter emitter = new AgentEmitter(cancelRequestContext, queue);
        try {
            this.agentExecutor.cancel(cancelRequestContext, emitter);
        }
        catch (TaskNotCancelableError e) {
            LOGGER.info("Task {} is not cancelable", (Object)task.id());
            throw e;
        }
        catch (A2AError e) {
            LOGGER.warn("Agent cancellation threw A2AError for task {}: {} - {}", new Object[]{task.id(), ((Object)((Object)e)).getClass().getSimpleName(), e.getMessage(), e});
            throw e;
        }
        catch (Exception e) {
            LOGGER.error("Agent cancellation threw unexpected exception for task {}", (Object)task.id(), (Object)e);
            throw new InternalError("Agent cancellation failed: " + e.getMessage());
        }
        Optional.ofNullable((CompletableFuture)this.runningAgents.get(task.id())).ifPresent(cf -> cf.cancel(true));
        EventConsumer consumer = new EventConsumer(queue);
        EventKind type = resultAggregator.consumeAll(consumer);
        if (!(type instanceof Task)) {
            throw new InternalError("Agent did not return valid response for cancel");
        }
        Task tempTask = (Task)type;
        if (tempTask.status().state() != TaskState.TASK_STATE_CANCELED) {
            throw new TaskNotCancelableError("Task cannot be canceled - current state: " + String.valueOf(tempTask.status().state()));
        }
        return tempTask;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public EventKind onMessageSend(MessageSendParams params, ServerCallContext context) throws A2AError {
        CompletableFuture agentFuture;
        Object kind;
        ResultAggregator.EventTypeAndInterrupt etai;
        AtomicReference<String> taskId;
        EventQueue queue;
        String queueTaskId;
        block22: {
            boolean blocking;
            LOGGER.debug("onMessageSend - task: {}; context {}", (Object)params.message().taskId(), (Object)params.message().contextId());
            MessageSendSetup mss = this.initMessageSend(params, context);
            queueTaskId = Objects.requireNonNull(mss.requestContext.getTaskId(), "TaskId must be non-null after RequestContext.build()");
            LOGGER.debug("Queue taskId: {}", (Object)queueTaskId);
            queue = this.queueManager.createOrTap(queueTaskId);
            taskId = new AtomicReference<String>(queueTaskId);
            ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null, this.executor, this.eventConsumerExecutor);
            boolean bl = blocking = params.configuration() != null && Boolean.TRUE.equals(params.configuration().blocking());
            if (params.configuration() != null && params.configuration().blocking() != null) {
                LOGGER.debug("DefaultRequestHandler: Client requested blocking={} for task {}", (Object)params.configuration().blocking(), (Object)taskId.get());
            } else if (params.configuration() != null) {
                LOGGER.debug("DefaultRequestHandler: Client sent configuration but blocking=null, using default blocking={} for task {}", (Object)blocking, (Object)taskId.get());
            } else {
                LOGGER.debug("DefaultRequestHandler: Client sent no configuration, using default blocking={} for task {}", (Object)blocking, (Object)taskId.get());
            }
            LOGGER.debug("DefaultRequestHandler: Final blocking decision: {} for task {}", (Object)blocking, (Object)taskId.get());
            boolean interruptedOrNonBlocking = false;
            EventConsumer consumer = new EventConsumer(queue);
            EnhancedRunnable producerRunnable = this.registerAndExecuteAgentAsync(queueTaskId, mss.requestContext, queue, consumer.createAgentRunnableDoneCallback());
            etai = null;
            kind = null;
            try {
                Task taskResult;
                agentFuture = (CompletableFuture)this.runningAgents.get(queueTaskId);
                etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, blocking);
                if (etai == null) {
                    LOGGER.debug("No result, throwing InternalError");
                    throw new InternalError("No result");
                }
                interruptedOrNonBlocking = etai.interrupted();
                LOGGER.debug("DefaultRequestHandler: interruptedOrNonBlocking={} (blocking={}, eventType={})", new Object[]{interruptedOrNonBlocking, blocking, kind != null ? kind.getClass().getSimpleName() : null});
                kind = etai.eventType();
                if (mss.task() == null && kind instanceof Task) {
                    Task createdTask = (Task)kind;
                    if (this.shouldAddPushInfo(params)) {
                        LOGGER.debug("Storing push notification config for new task {} (original taskId from params: {})", (Object)createdTask.id(), (Object)params.message().taskId());
                        this.pushConfigStore.setInfo(createdTask.id(), params.configuration().pushNotificationConfig());
                    }
                }
                if (blocking && interruptedOrNonBlocking) {
                    LOGGER.debug("DefaultRequestHandler: Entering blocking fire-and-forget handling for task {}", (Object)taskId.get());
                    try {
                        if (agentFuture != null) {
                            try {
                                agentFuture.get(this.agentCompletionTimeoutSeconds, TimeUnit.SECONDS);
                                LOGGER.debug("DefaultRequestHandler: Step 1 - Agent completed for task {}", (Object)taskId.get());
                            }
                            catch (TimeoutException e) {
                                LOGGER.debug("DefaultRequestHandler: Step 1 - Agent still running for task {} after {}s timeout", (Object)taskId.get(), (Object)this.agentCompletionTimeoutSeconds);
                            }
                        }
                        queue.close(false, false);
                        LOGGER.debug("DefaultRequestHandler: Step 2 - Closed queue for task {} to allow consumption completion", (Object)taskId.get());
                        if (etai.consumptionFuture() != null) {
                            etai.consumptionFuture().get(this.consumptionCompletionTimeoutSeconds, TimeUnit.SECONDS);
                            LOGGER.debug("DefaultRequestHandler: Step 3 - Consumption completed for task {}", (Object)taskId.get());
                        }
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        String msg = String.format("Error waiting for task %s completion", taskId.get());
                        LOGGER.warn(msg, (Throwable)e);
                        throw new InternalError(msg);
                    }
                    catch (ExecutionException e) {
                        String msg = String.format("Error during task %s execution", taskId.get());
                        LOGGER.warn(msg, e.getCause());
                        throw new InternalError(msg);
                    }
                    catch (TimeoutException e) {
                        String msg = String.format("Timeout waiting for task %s consumption", taskId.get());
                        LOGGER.warn(msg, (Throwable)e);
                        throw new InternalError(msg);
                    }
                    String nonNullTaskId = Objects.requireNonNull(taskId.get(), "taskId cannot be null");
                    Task updatedTask = this.taskStore.get(nonNullTaskId);
                    if (updatedTask != null) {
                        kind = updatedTask;
                        LOGGER.debug("DefaultRequestHandler: Step 5 - Fetched current task for {} with state {} and {} artifacts", new Object[]{taskId.get(), updatedTask.status().state(), updatedTask.artifacts().size()});
                    } else {
                        LOGGER.warn("DefaultRequestHandler: Step 5 - Task {} not found in TaskStore!", (Object)taskId.get());
                    }
                }
                String finalTaskId = Objects.requireNonNull(taskId.get(), "taskId cannot be null");
                if (kind instanceof Task && !finalTaskId.equals((taskResult = (Task)kind).id())) {
                    throw new InternalError("Task ID mismatch in agent response");
                }
                if (blocking || etai == null || !etai.interrupted()) break block22;
            }
            catch (Throwable throwable) {
                if (!blocking && etai != null && etai.interrupted()) {
                    LOGGER.debug("DefaultRequestHandler: Non-blocking call in finally - closing ChildQueue IMMEDIATELY for task {} to free EventConsumer", (Object)taskId.get());
                    queue.close(true);
                }
                CompletableFuture agentFuture2 = (CompletableFuture)this.runningAgents.remove(queueTaskId);
                String cleanupTaskId = Objects.requireNonNull(taskId.get(), "taskId cannot be null");
                LOGGER.debug("Removed agent for task {} from runningAgents in finally block, size after: {}", (Object)cleanupTaskId, (Object)this.runningAgents.size());
                this.cleanupProducer(agentFuture2, etai != null ? etai.consumptionFuture() : null, cleanupTaskId, queue, false).whenComplete((res, err) -> {
                    if (err != null) {
                        LOGGER.error("Error during async cleanup for task {}", taskId.get(), err);
                    }
                });
                throw throwable;
            }
            LOGGER.debug("DefaultRequestHandler: Non-blocking call in finally - closing ChildQueue IMMEDIATELY for task {} to free EventConsumer", (Object)taskId.get());
            queue.close(true);
        }
        agentFuture = (CompletableFuture)this.runningAgents.remove(queueTaskId);
        String cleanupTaskId = Objects.requireNonNull(taskId.get(), "taskId cannot be null");
        LOGGER.debug("Removed agent for task {} from runningAgents in finally block, size after: {}", (Object)cleanupTaskId, (Object)this.runningAgents.size());
        this.cleanupProducer(agentFuture, etai != null ? etai.consumptionFuture() : null, cleanupTaskId, queue, false).whenComplete((res, err) -> {
            if (err != null) {
                LOGGER.error("Error during async cleanup for task {}", taskId.get(), err);
            }
        });
        LOGGER.debug("Returning: {}", kind);
        return kind;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Flow.Publisher<StreamingEventKind> onMessageSendStream(MessageSendParams params, ServerCallContext context) throws A2AError {
        LOGGER.debug("onMessageSendStream START - task: {}; context: {}; runningAgents: {}", new Object[]{params.message().taskId(), params.message().contextId(), this.runningAgents.size()});
        MessageSendSetup mss = this.initMessageSend(params, context);
        String queueTaskId = Objects.requireNonNull(mss.requestContext.getTaskId(), "TaskId must be non-null after RequestContext.build()");
        final AtomicReference<@NonNull String> taskId = new AtomicReference<String>(queueTaskId);
        final EventQueue queue = this.queueManager.createOrTap(queueTaskId);
        LOGGER.debug("Created/tapped queue for task {}: {}", (Object)taskId.get(), (Object)queue);
        if (mss.task() == null && this.shouldAddPushInfo(params)) {
            Objects.requireNonNull(taskId.get(), "taskId was null");
            LOGGER.debug("Storing push notification config for new streaming task {} EARLY (original taskId from params: {})", (Object)taskId.get(), (Object)params.message().taskId());
            this.pushConfigStore.setInfo(taskId.get(), params.configuration().pushNotificationConfig());
        }
        ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null, this.executor, this.eventConsumerExecutor);
        EventConsumer consumer = new EventConsumer(queue);
        EnhancedRunnable producerRunnable = this.registerAndExecuteAgentAsync(queueTaskId, mss.requestContext, queue, consumer.createAgentRunnableDoneCallback());
        context.setEventConsumerCancelCallback(consumer::cancel);
        try {
            Flow.Publisher<EventQueueItem> results = resultAggregator.consumeAndEmit(consumer);
            Flow.Publisher<EventQueueItem> processed = AsyncUtils.processor(AsyncUtils.createTubeConfig(), results, (errorConsumer, item) -> {
                Event event = item.getEvent();
                if (event instanceof Task) {
                    Task createdTask = (Task)event;
                    String currentId = Objects.requireNonNull((String)taskId.get(), "taskId cannot be null");
                    if (!currentId.equals(createdTask.id())) {
                        errorConsumer.accept(new InternalError("Task ID mismatch: expected " + currentId + " but got " + createdTask.id()));
                    }
                }
                return true;
            });
            Flow.Publisher<Event> eventPublisher = AsyncUtils.convertingProcessor(processed, EventQueueItem::getEvent);
            Flow.Publisher<StreamingEventKind> finalPublisher = AsyncUtils.convertingProcessor(eventPublisher, event -> (StreamingEventKind)event);
            Flow.Publisher<StreamingEventKind> publisher = subscriber -> {
                String currentTaskId = (String)taskId.get();
                LOGGER.debug("Creating subscription wrapper for task {}", (Object)currentTaskId);
                finalPublisher.subscribe(new Flow.Subscriber<StreamingEventKind>(){
                    private  @Nullable Flow.Subscription subscription;

                    @Override
                    public void onSubscribe(final Flow.Subscription subscription) {
                        LOGGER.debug("onSubscribe called for task {}", taskId.get());
                        this.subscription = subscription;
                        subscriber.onSubscribe(new Flow.Subscription(){

                            @Override
                            public void request(long n) {
                                LOGGER.debug("Subscription.request({}) for task {}", (Object)n, taskId.get());
                                subscription.request(n);
                            }

                            @Override
                            public void cancel() {
                                LOGGER.debug("Client cancelled subscription for task {}, closing ChildQueue immediately", taskId.get());
                                queue.close(true);
                                subscription.cancel();
                            }
                        });
                    }

                    @Override
                    public void onNext(StreamingEventKind item) {
                        LOGGER.debug("onNext: {} for task {}", (Object)item.getClass().getSimpleName(), taskId.get());
                        subscriber.onNext(item);
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        LOGGER.error("onError for task {}", taskId.get(), (Object)throwable);
                        subscriber.onError(throwable);
                    }

                    @Override
                    public void onComplete() {
                        LOGGER.debug("onComplete for task {}", taskId.get());
                        try {
                            subscriber.onComplete();
                        }
                        catch (IllegalStateException e) {
                            if (e.getMessage() != null && e.getMessage().contains("Response has already been written")) {
                                LOGGER.debug("Client disconnected before onComplete, response already closed for task {}", taskId.get());
                            }
                            throw e;
                        }
                    }
                });
            };
            return publisher;
        }
        finally {
            String idOfTask = taskId.get();
            if (idOfTask != null) {
                LOGGER.debug("onMessageSendStream FINALLY - task: {}; runningAgents: {}", (Object)idOfTask, (Object)this.runningAgents.size());
                CompletableFuture agentFuture = (CompletableFuture)this.runningAgents.remove(idOfTask);
                LOGGER.debug("Removed agent for task {} from runningAgents in finally block, size after: {}", (Object)taskId.get(), (Object)this.runningAgents.size());
                this.cleanupProducer(agentFuture, null, idOfTask, queue, true).whenComplete((res, err) -> {
                    if (err != null) {
                        LOGGER.error("Error during async cleanup for streaming task {}", taskId.get(), err);
                    }
                });
            }
        }
    }

    @Override
    public TaskPushNotificationConfig onCreateTaskPushNotificationConfig(TaskPushNotificationConfig params, ServerCallContext context) throws A2AError {
        if (this.pushConfigStore == null) {
            throw new UnsupportedOperationError();
        }
        Task task = this.taskStore.get(params.taskId());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        PushNotificationConfig pushNotificationConfig = this.pushConfigStore.setInfo(params.taskId(), params.config());
        return new TaskPushNotificationConfig(params.taskId(), pushNotificationConfig, params.tenant());
    }

    @Override
    public TaskPushNotificationConfig onGetTaskPushNotificationConfig(GetTaskPushNotificationConfigParams params, ServerCallContext context) throws A2AError {
        if (this.pushConfigStore == null) {
            throw new UnsupportedOperationError();
        }
        Task task = this.taskStore.get(params.taskId());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        ListTaskPushNotificationConfigResult listTaskPushNotificationConfigResult = this.pushConfigStore.getInfo(new ListTaskPushNotificationConfigParams(params.taskId()));
        if (listTaskPushNotificationConfigResult == null || listTaskPushNotificationConfigResult.isEmpty()) {
            throw new InternalError("No push notification config found");
        }
        String configId = params.id();
        return new TaskPushNotificationConfig(params.taskId(), this.getPushNotificationConfig(listTaskPushNotificationConfigResult, configId), params.tenant());
    }

    private PushNotificationConfig getPushNotificationConfig(ListTaskPushNotificationConfigResult notificationConfigList, String configId) {
        for (TaskPushNotificationConfig notificationConfig : notificationConfigList.configs()) {
            if (!configId.equals(notificationConfig.config().id())) continue;
            return notificationConfig.config();
        }
        return ((TaskPushNotificationConfig)notificationConfigList.configs().get(0)).config();
    }

    @Override
    public Flow.Publisher<StreamingEventKind> onSubscribeToTask(TaskIdParams params, ServerCallContext context) throws A2AError {
        LOGGER.debug("onSubscribeToTask - taskId: {}", (Object)params.id());
        Task task = this.taskStore.get(params.id());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        TaskManager taskManager = new TaskManager(task.id(), task.contextId(), this.taskStore, null);
        ResultAggregator resultAggregator = new ResultAggregator(taskManager, null, this.executor, this.eventConsumerExecutor);
        EventQueue queue = this.queueManager.tap(task.id());
        LOGGER.debug("onSubscribeToTask - tapped queue: {}", queue != null ? Integer.valueOf(System.identityHashCode(queue)) : "null");
        if (queue == null) {
            if (task.status().state().isFinal()) {
                throw new TaskNotFoundError();
            }
            LOGGER.debug("Queue not found for active task {}, creating new queue for future events", (Object)task.id());
            queue = this.queueManager.createOrTap(task.id());
        }
        queue.enqueueEventLocalOnly((Event)task);
        LOGGER.debug("onSubscribeToTask - enqueued current task state as first event for taskId: {}", (Object)params.id());
        EventConsumer consumer = new EventConsumer(queue);
        Flow.Publisher<EventQueueItem> results = resultAggregator.consumeAndEmit(consumer);
        LOGGER.debug("onSubscribeToTask - returning publisher for taskId: {}", (Object)params.id());
        return AsyncUtils.convertingProcessor(results, item -> (StreamingEventKind)item.getEvent());
    }

    @Override
    public ListTaskPushNotificationConfigResult onListTaskPushNotificationConfig(ListTaskPushNotificationConfigParams params, ServerCallContext context) throws A2AError {
        if (this.pushConfigStore == null) {
            throw new UnsupportedOperationError();
        }
        Task task = this.taskStore.get(params.id());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        return this.pushConfigStore.getInfo(params);
    }

    @Override
    public void onDeleteTaskPushNotificationConfig(DeleteTaskPushNotificationConfigParams params, ServerCallContext context) {
        if (this.pushConfigStore == null) {
            throw new UnsupportedOperationError();
        }
        Task task = this.taskStore.get(params.taskId());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        this.pushConfigStore.deleteInfo(params.taskId(), params.id());
    }

    private boolean shouldAddPushInfo(MessageSendParams params) {
        return this.pushConfigStore != null && params.configuration() != null && params.configuration().pushNotificationConfig() != null;
    }

    private EnhancedRunnable registerAndExecuteAgentAsync(final String taskId, final RequestContext requestContext, final EventQueue queue, EnhancedRunnable.DoneCallback doneCallback) {
        LOGGER.debug("Registering agent execution for task {}, runningAgents.size() before: {}", (Object)taskId, (Object)this.runningAgents.size());
        this.logThreadStats("AGENT START");
        EnhancedRunnable runnable = new EnhancedRunnable(){

            @Override
            public void run() {
                LOGGER.debug("Agent execution starting for task {}", (Object)taskId);
                AgentEmitter emitter = new AgentEmitter(requestContext, queue);
                try {
                    DefaultRequestHandler.this.agentExecutor.execute(requestContext, emitter);
                }
                catch (A2AError e) {
                    LOGGER.warn("Agent execution threw A2AError for task {}: {} - {}", new Object[]{taskId, ((Object)((Object)e)).getClass().getSimpleName(), e.getMessage(), e});
                    emitter.fail(e);
                }
                catch (RuntimeException e) {
                    LOGGER.error("Agent execution threw unexpected RuntimeException for task {}", (Object)taskId, (Object)e);
                    emitter.fail((A2AError)new InternalError("Agent execution failed: " + e.getMessage()));
                }
                catch (Exception e) {
                    LOGGER.error("Agent execution threw unexpected Exception for task {}", (Object)taskId, (Object)e);
                    emitter.fail((A2AError)new InternalError("Agent execution failed: " + e.getMessage()));
                }
                LOGGER.debug("Agent execution completed for task {}", (Object)taskId);
            }
        };
        runnable.addDoneCallback(doneCallback);
        runnable.markStarted();
        CompletionStage cf = CompletableFuture.runAsync(runnable, this.executor).whenComplete((v, err) -> {
            if (err != null) {
                LOGGER.error("Agent execution failed for task {}", (Object)taskId, err);
                runnable.setError((Throwable)err);
            }
            this.logThreadStats("AGENT COMPLETE END");
            runnable.invokeDoneCallbacks();
        });
        this.runningAgents.put(taskId, (CompletableFuture<Void>)cf);
        LOGGER.debug("Registered agent for task {}, runningAgents.size() after: {}", (Object)taskId, (Object)this.runningAgents.size());
        return runnable;
    }

    private CompletableFuture<Void> cleanupProducer(@Nullable CompletableFuture<Void> agentFuture, @Nullable CompletableFuture<Void> consumptionFuture, String taskId, EventQueue queue, boolean isStreaming) {
        LOGGER.debug("Starting cleanup for task {} (streaming={})", (Object)taskId, (Object)isStreaming);
        this.logThreadStats("CLEANUP START");
        if (agentFuture == null) {
            LOGGER.debug("No running agent found for task {}, cleanup complete", (Object)taskId);
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> bothComplete = agentFuture;
        if (consumptionFuture != null) {
            bothComplete = CompletableFuture.allOf(agentFuture, consumptionFuture);
            LOGGER.debug("Cleanup will wait for both agent and consumption to complete for task {}", (Object)taskId);
        }
        return bothComplete.whenComplete((v, t) -> {
            if (t != null) {
                LOGGER.debug("Agent/consumption completed with error for task {}", (Object)taskId, t);
            } else {
                LOGGER.debug("Agent and consumption both completed successfully for task {}", (Object)taskId);
            }
            if (isStreaming) {
                LOGGER.debug("Streaming call for task {}: queue lifecycle managed by EventConsumer (agentCompleted flag)", (Object)taskId);
            } else {
                LOGGER.debug("Non-streaming call, closing ChildQueue for task {} (immediate=false, notifyParent=true)", (Object)taskId);
                queue.close(false, true);
            }
            LOGGER.debug("Queue cleanup completed for task {}", (Object)taskId);
            this.logThreadStats("CLEANUP END");
        });
    }

    private MessageSendSetup initMessageSend(MessageSendParams params, ServerCallContext context) {
        boolean taskIdChanged;
        RequestContext requestContext = this.requestContextBuilder.get().setParams(params).setTaskId(params.message().taskId()).setContextId(params.message().contextId()).setTask(null).setServerCallContext(context).build();
        String taskId = Objects.requireNonNull(requestContext.getTaskId(), "TaskId must be non-null after RequestContext.build()");
        TaskManager taskManager = new TaskManager(taskId, params.message().contextId(), this.taskStore, params.message());
        Task task = taskManager.getTask();
        if (task != null) {
            LOGGER.debug("Found task updating with message {}", (Object)params.message());
            task = taskManager.updateWithMessage(params.message(), task);
            if (this.pushConfigStore != null && params.configuration() != null && params.configuration().pushNotificationConfig() != null) {
                LOGGER.debug("Adding push info");
                this.pushConfigStore.setInfo(task.id(), params.configuration().pushNotificationConfig());
            }
        }
        boolean bl = taskIdChanged = !taskId.equals(params.message().taskId());
        if (task != null || taskIdChanged) {
            MessageSendParams paramsToUse;
            if (taskIdChanged) {
                Message updatedMessage = Message.builder((Message)params.message()).taskId(taskId).build();
                paramsToUse = new MessageSendParams(updatedMessage, params.configuration(), params.metadata());
            } else {
                paramsToUse = params;
            }
            requestContext = this.requestContextBuilder.get().setParams(paramsToUse).setTask(task).setServerCallContext(context).build();
        }
        return new MessageSendSetup(taskManager, task, requestContext);
    }

    private void logThreadStats(String label) {
        if (!THREAD_STATS_LOGGER.isDebugEnabled()) {
            return;
        }
        ThreadGroup rootGroup = Thread.currentThread().getThreadGroup();
        while (rootGroup.getParent() != null) {
            rootGroup = rootGroup.getParent();
        }
        int activeThreads = rootGroup.activeCount();
        Thread[] threads = new Thread[activeThreads * 2];
        int count = rootGroup.enumerate(threads);
        int eventConsumerThreads = 0;
        int agentExecutorThreads = 0;
        for (int i = 0; i < count; ++i) {
            if (threads[i] == null) continue;
            String name = threads[i].getName();
            if (name.startsWith("a2a-event-consumer-")) {
                ++eventConsumerThreads;
                continue;
            }
            if (!name.startsWith("a2a-agent-executor-")) continue;
            ++agentExecutorThreads;
        }
        THREAD_STATS_LOGGER.debug("=== THREAD STATS: {} ===", (Object)label);
        THREAD_STATS_LOGGER.debug("Total active threads: {}", (Object)activeThreads);
        THREAD_STATS_LOGGER.debug("EventConsumer threads: {}", (Object)eventConsumerThreads);
        THREAD_STATS_LOGGER.debug("AgentExecutor threads: {}", (Object)agentExecutorThreads);
        THREAD_STATS_LOGGER.debug("Running agents: {}", (Object)this.runningAgents.size());
        THREAD_STATS_LOGGER.debug("Queue manager active queues: {}", (Object)this.queueManager.getClass().getSimpleName());
        if (!this.runningAgents.isEmpty()) {
            THREAD_STATS_LOGGER.debug("Running agent tasks:");
            this.runningAgents.forEach((taskId, future) -> THREAD_STATS_LOGGER.debug("  - Task {}: {}", taskId, (Object)(future.isDone() ? "DONE" : "RUNNING")));
        }
        THREAD_STATS_LOGGER.debug("=== END THREAD STATS ===");
    }

    private boolean isFinalEvent(EventKind eventKind) {
        if (!(eventKind instanceof Event)) {
            return false;
        }
        Event event = (Event)eventKind;
        if (event instanceof Task) {
            Task task = (Task)event;
            return task.status() != null && task.status().state() != null && task.status().state().isFinal();
        }
        if (event instanceof TaskStatusUpdateEvent) {
            TaskStatusUpdateEvent statusUpdate = (TaskStatusUpdateEvent)event;
            return statusUpdate.isFinal();
        }
        return false;
    }

    private record MessageSendSetup(TaskManager taskManager, @Nullable Task task, RequestContext requestContext) {
    }
}

