/*
 * Decompiled with CFR 0.152.
 */
package io.a2a.server.requesthandlers;

import io.a2a.server.ServerCallContext;
import io.a2a.server.agentexecution.AgentExecutor;
import io.a2a.server.agentexecution.RequestContext;
import io.a2a.server.agentexecution.SimpleRequestContextBuilder;
import io.a2a.server.events.EnhancedRunnable;
import io.a2a.server.events.EventConsumer;
import io.a2a.server.events.EventQueue;
import io.a2a.server.events.EventQueueItem;
import io.a2a.server.events.QueueManager;
import io.a2a.server.events.TaskQueueExistsException;
import io.a2a.server.requesthandlers.RequestHandler;
import io.a2a.server.tasks.PushNotificationConfigStore;
import io.a2a.server.tasks.PushNotificationSender;
import io.a2a.server.tasks.ResultAggregator;
import io.a2a.server.tasks.TaskManager;
import io.a2a.server.tasks.TaskStore;
import io.a2a.server.util.async.AsyncUtils;
import io.a2a.server.util.async.Internal;
import io.a2a.spec.DeleteTaskPushNotificationConfigParams;
import io.a2a.spec.Event;
import io.a2a.spec.EventKind;
import io.a2a.spec.GetTaskPushNotificationConfigParams;
import io.a2a.spec.InternalError;
import io.a2a.spec.JSONRPCError;
import io.a2a.spec.ListTaskPushNotificationConfigParams;
import io.a2a.spec.MessageSendParams;
import io.a2a.spec.PushNotificationConfig;
import io.a2a.spec.StreamingEventKind;
import io.a2a.spec.Task;
import io.a2a.spec.TaskIdParams;
import io.a2a.spec.TaskNotCancelableError;
import io.a2a.spec.TaskNotFoundError;
import io.a2a.spec.TaskPushNotificationConfig;
import io.a2a.spec.TaskQueryParams;
import io.a2a.spec.TaskState;
import io.a2a.spec.UnsupportedOperationError;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class DefaultRequestHandler
implements RequestHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRequestHandler.class);
    private final AgentExecutor agentExecutor;
    private final TaskStore taskStore;
    private final QueueManager queueManager;
    private final PushNotificationConfigStore pushConfigStore;
    private final PushNotificationSender pushSender;
    private final Supplier<RequestContext.Builder> requestContextBuilder;
    private final ConcurrentMap<String, CompletableFuture<Void>> runningAgents = new ConcurrentHashMap<String, CompletableFuture<Void>>();
    private final Set<CompletableFuture<Void>> backgroundTasks = ConcurrentHashMap.newKeySet();
    private final Executor executor;

    @Inject
    public DefaultRequestHandler(AgentExecutor agentExecutor, TaskStore taskStore, QueueManager queueManager, PushNotificationConfigStore pushConfigStore, PushNotificationSender pushSender, @Internal Executor executor) {
        this.agentExecutor = agentExecutor;
        this.taskStore = taskStore;
        this.queueManager = queueManager;
        this.pushConfigStore = pushConfigStore;
        this.pushSender = pushSender;
        this.executor = executor;
        this.requestContextBuilder = () -> new SimpleRequestContextBuilder(taskStore, false);
    }

    @Override
    public Task onGetTask(TaskQueryParams params, ServerCallContext context) throws JSONRPCError {
        LOGGER.debug("onGetTask {}", (Object)params.id());
        Task task = this.taskStore.get(params.id());
        if (task == null) {
            LOGGER.debug("No task found for {}. Throwing TaskNotFoundError", (Object)params.id());
            throw new TaskNotFoundError();
        }
        if (params.historyLength() != null && task.getHistory() != null && params.historyLength() < task.getHistory().size()) {
            List history = params.historyLength() <= 0 ? new ArrayList() : task.getHistory().subList(task.getHistory().size() - params.historyLength(), task.getHistory().size() - 1);
            task = new Task.Builder(task).history(history).build();
        }
        LOGGER.debug("Task found {}", (Object)task);
        return task;
    }

    @Override
    public Task onCancelTask(TaskIdParams params, ServerCallContext context) throws JSONRPCError {
        Task task = this.taskStore.get(params.id());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        if (task.getStatus().state().isFinal()) {
            throw new TaskNotCancelableError("Task cannot be canceled - current state: " + task.getStatus().state().asString());
        }
        TaskManager taskManager = new TaskManager(task.getId(), task.getContextId(), this.taskStore, null);
        ResultAggregator resultAggregator = new ResultAggregator(taskManager, null, this.executor);
        EventQueue queue = this.queueManager.tap(task.getId());
        if (queue == null) {
            queue = this.queueManager.getEventQueueBuilder(task.getId()).build();
        }
        this.agentExecutor.cancel(this.requestContextBuilder.get().setTaskId(task.getId()).setContextId(task.getContextId()).setTask(task).setServerCallContext(context).build(), queue);
        Optional.ofNullable((CompletableFuture)this.runningAgents.get(task.getId())).ifPresent(cf -> cf.cancel(true));
        EventConsumer consumer = new EventConsumer(queue);
        EventKind type = resultAggregator.consumeAll(consumer);
        if (!(type instanceof Task)) {
            throw new InternalError("Agent did not return valid response for cancel");
        }
        Task tempTask = (Task)type;
        if (tempTask.getStatus().state() != TaskState.CANCELED) {
            throw new TaskNotCancelableError("Task cannot be canceled - current state: " + tempTask.getStatus().state().asString());
        }
        return tempTask;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public EventKind onMessageSend(MessageSendParams params, ServerCallContext context) throws JSONRPCError {
        LOGGER.debug("onMessageSend - task: {}; context {}", (Object)params.message().getTaskId(), (Object)params.message().getContextId());
        MessageSendSetup mss = this.initMessageSend(params, context);
        String taskId = mss.requestContext.getTaskId();
        LOGGER.debug("Request context taskId: {}", (Object)taskId);
        EventQueue queue = this.queueManager.createOrTap(taskId);
        ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null, this.executor);
        boolean blocking = true;
        if (params.configuration() != null && Boolean.FALSE.equals(params.configuration().blocking())) {
            blocking = false;
        }
        boolean interruptedOrNonBlocking = false;
        EnhancedRunnable producerRunnable = this.registerAndExecuteAgentAsync(taskId, mss.requestContext, queue);
        ResultAggregator.EventTypeAndInterrupt etai = null;
        try {
            Task taskResult;
            Runnable pushNotificationCallback = () -> this.sendPushNotification(taskId, resultAggregator);
            EventConsumer consumer = new EventConsumer(queue);
            producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback());
            etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, blocking, pushNotificationCallback);
            if (etai == null) {
                LOGGER.debug("No result, throwing InternalError");
                throw new InternalError("No result");
            }
            interruptedOrNonBlocking = etai.interrupted();
            LOGGER.debug("Was interrupted or non-blocking: {}", (Object)interruptedOrNonBlocking);
            EventKind kind = etai.eventType();
            if (kind instanceof Task && !taskId.equals((taskResult = (Task)kind).getId())) {
                throw new InternalError("Task ID mismatch in agent response");
            }
            pushNotificationCallback.run();
        }
        catch (Throwable throwable) {
            CompletableFuture agentFuture = (CompletableFuture)this.runningAgents.remove(taskId);
            LOGGER.debug("Removed agent for task {} from runningAgents in finally block, size after: {}", (Object)taskId, (Object)this.runningAgents.size());
            this.trackBackgroundTask(this.cleanupProducer(agentFuture, etai != null ? etai.consumptionFuture() : null, taskId, queue, false));
            throw throwable;
        }
        CompletableFuture agentFuture = (CompletableFuture)this.runningAgents.remove(taskId);
        LOGGER.debug("Removed agent for task {} from runningAgents in finally block, size after: {}", (Object)taskId, (Object)this.runningAgents.size());
        this.trackBackgroundTask(this.cleanupProducer(agentFuture, etai != null ? etai.consumptionFuture() : null, taskId, queue, false));
        LOGGER.debug("Returning: {}", (Object)etai.eventType());
        return etai.eventType();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Flow.Publisher<StreamingEventKind> onMessageSendStream(MessageSendParams params, ServerCallContext context) throws JSONRPCError {
        Flow.Publisher<StreamingEventKind> publisher;
        LOGGER.debug("onMessageSendStream START - task: {}; context: {}; runningAgents: {}; backgroundTasks: {}", new Object[]{params.message().getTaskId(), params.message().getContextId(), this.runningAgents.size(), this.backgroundTasks.size()});
        MessageSendSetup mss = this.initMessageSend(params, context);
        final AtomicReference<String> taskId = new AtomicReference<String>(mss.requestContext.getTaskId());
        EventQueue queue = this.queueManager.createOrTap(taskId.get());
        LOGGER.debug("Created/tapped queue for task {}: {}", (Object)taskId.get(), (Object)queue);
        final ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null, this.executor);
        EnhancedRunnable producerRunnable = this.registerAndExecuteAgentAsync(taskId.get(), mss.requestContext, queue);
        final EventConsumer consumer = new EventConsumer(queue);
        producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback());
        final AtomicBoolean backgroundConsumeStarted = new AtomicBoolean(false);
        try {
            Flow.Publisher<EventQueueItem> results = resultAggregator.consumeAndEmit(consumer);
            Flow.Publisher<EventQueueItem> processed = AsyncUtils.processor(AsyncUtils.createTubeConfig(), results, (errorConsumer, item) -> {
                EventKind latest;
                Event event = item.getEvent();
                if (event instanceof Task) {
                    Task createdTask = (Task)event;
                    if (!Objects.equals(taskId.get(), createdTask.getId())) {
                        errorConsumer.accept(new InternalError("Task ID mismatch in agent response"));
                    }
                    try {
                        this.queueManager.add(createdTask.getId(), queue);
                        taskId.set(createdTask.getId());
                    }
                    catch (TaskQueueExistsException taskQueueExistsException) {
                        // empty catch block
                    }
                    if (this.pushConfigStore != null && params.configuration() != null && params.configuration().pushNotificationConfig() != null) {
                        this.pushConfigStore.setInfo(createdTask.getId(), params.configuration().pushNotificationConfig());
                    }
                }
                if (this.pushSender != null && taskId.get() != null && (latest = resultAggregator.getCurrentResult()) instanceof Task) {
                    Task latestTask = (Task)latest;
                    this.pushSender.sendNotification(latestTask);
                }
                return true;
            });
            Flow.Publisher<Event> eventPublisher = AsyncUtils.convertingProcessor(processed, EventQueueItem::getEvent);
            Flow.Publisher<StreamingEventKind> finalPublisher = AsyncUtils.convertingProcessor(eventPublisher, event -> (StreamingEventKind)event);
            publisher = subscriber -> {
                LOGGER.debug("Creating subscription wrapper for task {}", taskId.get());
                finalPublisher.subscribe(new Flow.Subscriber<StreamingEventKind>(){
                    private Flow.Subscription subscription;

                    @Override
                    public void onSubscribe(final Flow.Subscription subscription) {
                        LOGGER.debug("onSubscribe called for task {}", taskId.get());
                        this.subscription = subscription;
                        subscriber.onSubscribe(new Flow.Subscription(){

                            @Override
                            public void request(long n) {
                                LOGGER.debug("Subscription.request({}) for task {}", (Object)n, taskId.get());
                                subscription.request(n);
                            }

                            @Override
                            public void cancel() {
                                LOGGER.debug("Client cancelled subscription for task {}, starting background consumption", taskId.get());
                                this.startBackgroundConsumption();
                                subscription.cancel();
                            }
                        });
                    }

                    @Override
                    public void onNext(StreamingEventKind item) {
                        LOGGER.debug("onNext: {} for task {}", (Object)item.getClass().getSimpleName(), taskId.get());
                        subscriber.onNext(item);
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        LOGGER.error("onError for task {}", taskId.get(), (Object)throwable);
                        subscriber.onError(throwable);
                    }

                    @Override
                    public void onComplete() {
                        LOGGER.debug("onComplete for task {}", taskId.get());
                        try {
                            subscriber.onComplete();
                        }
                        catch (IllegalStateException e) {
                            if (e.getMessage() != null && e.getMessage().contains("Response has already been written")) {
                                LOGGER.debug("Client disconnected before onComplete, response already closed for task {}", taskId.get());
                            }
                            throw e;
                        }
                    }

                    private void startBackgroundConsumption() {
                        if (backgroundConsumeStarted.compareAndSet(false, true)) {
                            LOGGER.debug("Starting background consumption for task {}", taskId.get());
                            CompletableFuture<Void> bgTask = CompletableFuture.runAsync(() -> {
                                try {
                                    LOGGER.debug("Background consumption thread started for task {}", taskId.get());
                                    resultAggregator.consumeAll(consumer);
                                    LOGGER.debug("Background consumption completed for task {}", taskId.get());
                                }
                                catch (Exception e) {
                                    LOGGER.error("Error during background consumption for task {}", taskId.get(), (Object)e);
                                }
                            }, DefaultRequestHandler.this.executor);
                            DefaultRequestHandler.this.trackBackgroundTask(bgTask);
                        } else {
                            LOGGER.debug("Background consumption already started for task {}", taskId.get());
                        }
                    }
                });
            };
        }
        catch (Throwable throwable) {
            LOGGER.debug("onMessageSendStream FINALLY - task: {}; runningAgents: {}; backgroundTasks: {}", new Object[]{taskId.get(), this.runningAgents.size(), this.backgroundTasks.size()});
            CompletableFuture agentFuture = (CompletableFuture)this.runningAgents.remove(taskId.get());
            LOGGER.debug("Removed agent for task {} from runningAgents in finally block, size after: {}", (Object)taskId.get(), (Object)this.runningAgents.size());
            this.trackBackgroundTask(this.cleanupProducer(agentFuture, null, taskId.get(), queue, true));
            throw throwable;
        }
        LOGGER.debug("onMessageSendStream FINALLY - task: {}; runningAgents: {}; backgroundTasks: {}", new Object[]{taskId.get(), this.runningAgents.size(), this.backgroundTasks.size()});
        CompletableFuture agentFuture = (CompletableFuture)this.runningAgents.remove(taskId.get());
        LOGGER.debug("Removed agent for task {} from runningAgents in finally block, size after: {}", (Object)taskId.get(), (Object)this.runningAgents.size());
        this.trackBackgroundTask(this.cleanupProducer(agentFuture, null, taskId.get(), queue, true));
        return publisher;
    }

    @Override
    public TaskPushNotificationConfig onSetTaskPushNotificationConfig(TaskPushNotificationConfig params, ServerCallContext context) throws JSONRPCError {
        if (this.pushConfigStore == null) {
            throw new UnsupportedOperationError();
        }
        Task task = this.taskStore.get(params.taskId());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        PushNotificationConfig pushNotificationConfig = this.pushConfigStore.setInfo(params.taskId(), params.pushNotificationConfig());
        return new TaskPushNotificationConfig(params.taskId(), pushNotificationConfig);
    }

    @Override
    public TaskPushNotificationConfig onGetTaskPushNotificationConfig(GetTaskPushNotificationConfigParams params, ServerCallContext context) throws JSONRPCError {
        if (this.pushConfigStore == null) {
            throw new UnsupportedOperationError();
        }
        Task task = this.taskStore.get(params.id());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        List<PushNotificationConfig> pushNotificationConfigList = this.pushConfigStore.getInfo(params.id());
        if (pushNotificationConfigList == null || pushNotificationConfigList.isEmpty()) {
            throw new InternalError("No push notification config found");
        }
        return new TaskPushNotificationConfig(params.id(), this.getPushNotificationConfig(pushNotificationConfigList, params.pushNotificationConfigId()));
    }

    private PushNotificationConfig getPushNotificationConfig(List<PushNotificationConfig> notificationConfigList, String configId) {
        if (configId != null) {
            for (PushNotificationConfig notificationConfig : notificationConfigList) {
                if (!configId.equals(notificationConfig.id())) continue;
                return notificationConfig;
            }
        }
        return notificationConfigList.get(0);
    }

    @Override
    public Flow.Publisher<StreamingEventKind> onResubscribeToTask(TaskIdParams params, ServerCallContext context) throws JSONRPCError {
        LOGGER.debug("onResubscribeToTask - taskId: {}", (Object)params.id());
        Task task = this.taskStore.get(params.id());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        TaskManager taskManager = new TaskManager(task.getId(), task.getContextId(), this.taskStore, null);
        ResultAggregator resultAggregator = new ResultAggregator(taskManager, null, this.executor);
        EventQueue queue = this.queueManager.tap(task.getId());
        LOGGER.debug("onResubscribeToTask - tapped queue: {}", queue != null ? Integer.valueOf(System.identityHashCode(queue)) : "null");
        if (queue == null) {
            if (task.getStatus().state().isFinal()) {
                throw new TaskNotFoundError();
            }
            LOGGER.debug("Queue not found for active task {}, creating new queue for future events", (Object)task.getId());
            queue = this.queueManager.createOrTap(task.getId());
        }
        EventConsumer consumer = new EventConsumer(queue);
        Flow.Publisher<EventQueueItem> results = resultAggregator.consumeAndEmit(consumer);
        LOGGER.debug("onResubscribeToTask - returning publisher for taskId: {}", (Object)params.id());
        return AsyncUtils.convertingProcessor(results, item -> (StreamingEventKind)item.getEvent());
    }

    @Override
    public List<TaskPushNotificationConfig> onListTaskPushNotificationConfig(ListTaskPushNotificationConfigParams params, ServerCallContext context) throws JSONRPCError {
        if (this.pushConfigStore == null) {
            throw new UnsupportedOperationError();
        }
        Task task = this.taskStore.get(params.id());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        List<PushNotificationConfig> pushNotificationConfigList = this.pushConfigStore.getInfo(params.id());
        ArrayList<TaskPushNotificationConfig> taskPushNotificationConfigList = new ArrayList<TaskPushNotificationConfig>();
        if (pushNotificationConfigList != null) {
            for (PushNotificationConfig pushNotificationConfig : pushNotificationConfigList) {
                TaskPushNotificationConfig taskPushNotificationConfig = new TaskPushNotificationConfig(params.id(), pushNotificationConfig);
                taskPushNotificationConfigList.add(taskPushNotificationConfig);
            }
        }
        return taskPushNotificationConfigList;
    }

    @Override
    public void onDeleteTaskPushNotificationConfig(DeleteTaskPushNotificationConfigParams params, ServerCallContext context) {
        if (this.pushConfigStore == null) {
            throw new UnsupportedOperationError();
        }
        Task task = this.taskStore.get(params.id());
        if (task == null) {
            throw new TaskNotFoundError();
        }
        this.pushConfigStore.deleteInfo(params.id(), params.pushNotificationConfigId());
    }

    private boolean shouldAddPushInfo(MessageSendParams params) {
        return this.pushConfigStore != null && params.configuration() != null && params.configuration().pushNotificationConfig() != null;
    }

    private EnhancedRunnable registerAndExecuteAgentAsync(final String taskId, final RequestContext requestContext, final EventQueue queue) {
        LOGGER.debug("Registering agent execution for task {}, runningAgents.size() before: {}", (Object)taskId, (Object)this.runningAgents.size());
        this.logThreadStats("AGENT START");
        EnhancedRunnable runnable = new EnhancedRunnable(){

            @Override
            public void run() {
                LOGGER.debug("Agent execution starting for task {}", (Object)taskId);
                DefaultRequestHandler.this.agentExecutor.execute(requestContext, queue);
                LOGGER.debug("Agent execution completed for task {}", (Object)taskId);
            }
        };
        CompletionStage cf = CompletableFuture.runAsync(runnable, this.executor).whenComplete((v, err) -> {
            if (err != null) {
                LOGGER.error("Agent execution failed for task {}", (Object)taskId, err);
                runnable.setError((Throwable)err);
            }
            this.logThreadStats("AGENT COMPLETE END");
            runnable.invokeDoneCallbacks();
        });
        this.runningAgents.put(taskId, (CompletableFuture<Void>)cf);
        LOGGER.debug("Registered agent for task {}, runningAgents.size() after: {}", (Object)taskId, (Object)this.runningAgents.size());
        return runnable;
    }

    private void trackBackgroundTask(CompletableFuture<Void> task) {
        this.backgroundTasks.add(task);
        LOGGER.debug("Tracking background task (total: {}): {}", (Object)this.backgroundTasks.size(), task);
        task.whenComplete((result, throwable) -> {
            try {
                if (throwable != null) {
                    Throwable cause = throwable;
                    if (throwable instanceof CompletionException && throwable.getCause() != null) {
                        cause = throwable.getCause();
                    }
                    if (cause instanceof CancellationException) {
                        LOGGER.debug("Background task cancelled: {}", (Object)task);
                    } else {
                        LOGGER.error("Background task failed", throwable);
                    }
                }
            }
            finally {
                this.backgroundTasks.remove(task);
                LOGGER.debug("Removed background task (remaining: {}): {}", (Object)this.backgroundTasks.size(), (Object)task);
            }
        });
    }

    public CompletableFuture<Void> waitForBackgroundTasks() {
        CompletableFuture[] tasks = this.backgroundTasks.toArray(new CompletableFuture[0]);
        if (tasks.length == 0) {
            return CompletableFuture.completedFuture(null);
        }
        LOGGER.debug("Waiting for {} background tasks to complete", (Object)tasks.length);
        return CompletableFuture.allOf(tasks);
    }

    private CompletableFuture<Void> cleanupProducer(CompletableFuture<Void> agentFuture, CompletableFuture<Void> consumptionFuture, String taskId, EventQueue queue, boolean isStreaming) {
        LOGGER.debug("Starting cleanup for task {} (streaming={})", (Object)taskId, (Object)isStreaming);
        this.logThreadStats("CLEANUP START");
        if (agentFuture == null) {
            LOGGER.debug("No running agent found for task {}, cleanup complete", (Object)taskId);
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> bothComplete = agentFuture;
        if (consumptionFuture != null) {
            bothComplete = CompletableFuture.allOf(agentFuture, consumptionFuture);
            LOGGER.debug("Cleanup will wait for both agent and consumption to complete for task {}", (Object)taskId);
        }
        return bothComplete.whenComplete((v, t) -> {
            if (t != null) {
                LOGGER.debug("Agent/consumption completed with error for task {}", (Object)taskId, t);
            } else {
                LOGGER.debug("Agent and consumption both completed successfully for task {}", (Object)taskId);
            }
            LOGGER.debug("{} call, closing ChildQueue for task {} (immediate=false, notifyParent=true)", (Object)(isStreaming ? "Streaming" : "Non-streaming"), (Object)taskId);
            queue.close(false, true);
            LOGGER.debug("Queue cleanup completed for task {}", (Object)taskId);
            this.logThreadStats("CLEANUP END");
        });
    }

    private MessageSendSetup initMessageSend(MessageSendParams params, ServerCallContext context) {
        TaskManager taskManager = new TaskManager(params.message().getTaskId(), params.message().getContextId(), this.taskStore, params.message());
        Task task = taskManager.getTask();
        if (task != null) {
            LOGGER.debug("Found task updating with message {}", (Object)params.message());
            task = taskManager.updateWithMessage(params.message(), task);
            if (this.shouldAddPushInfo(params)) {
                LOGGER.debug("Adding push info");
                this.pushConfigStore.setInfo(task.getId(), params.configuration().pushNotificationConfig());
            }
        }
        RequestContext requestContext = this.requestContextBuilder.get().setParams(params).setTaskId(task == null ? null : task.getId()).setContextId(params.message().getContextId()).setTask(task).setServerCallContext(context).build();
        return new MessageSendSetup(taskManager, task, requestContext);
    }

    private void sendPushNotification(String taskId, ResultAggregator resultAggregator) {
        EventKind latest;
        if (this.pushSender != null && taskId != null && (latest = resultAggregator.getCurrentResult()) instanceof Task) {
            Task latestTask = (Task)latest;
            this.pushSender.sendNotification(latestTask);
        }
    }

    private void logThreadStats(String label) {
        if (!LOGGER.isDebugEnabled()) {
            return;
        }
        ThreadGroup rootGroup = Thread.currentThread().getThreadGroup();
        while (rootGroup.getParent() != null) {
            rootGroup = rootGroup.getParent();
        }
        int activeThreads = rootGroup.activeCount();
        LOGGER.debug("=== THREAD STATS: {} ===", (Object)label);
        LOGGER.debug("Active threads: {}", (Object)activeThreads);
        LOGGER.debug("Running agents: {}", (Object)this.runningAgents.size());
        LOGGER.debug("Background tasks: {}", (Object)this.backgroundTasks.size());
        LOGGER.debug("Queue manager active queues: {}", (Object)this.queueManager.getClass().getSimpleName());
        if (!this.runningAgents.isEmpty()) {
            LOGGER.debug("Running agent tasks:");
            this.runningAgents.forEach((taskId, future) -> LOGGER.debug("  - Task {}: {}", taskId, (Object)(future.isDone() ? "DONE" : "RUNNING")));
        }
        if (!this.backgroundTasks.isEmpty()) {
            LOGGER.debug("Background tasks:");
            this.backgroundTasks.forEach(task -> LOGGER.debug("  - {}: {}", task, (Object)(task.isDone() ? "DONE" : "RUNNING")));
        }
        LOGGER.debug("=== END THREAD STATS ===");
    }

    private record MessageSendSetup(TaskManager taskManager, Task task, RequestContext requestContext) {
    }
}

