/*
 * Decompiled with CFR 0.152.
 */
package com.embabel.agent.a2a.server.support;

import com.embabel.agent.a2a.server.support.TaskStateManager;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.a2a.spec.SendStreamingMessageResponse;
import io.a2a.spec.StreamingEventKind;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import kotlin.Metadata;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.SourceDebugExtension;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Service
@Metadata(mv={2, 1, 0}, k=1, xi=48, d1={"\u0000D\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0007\n\u0002\u0010\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0003\b\u0017\u0018\u00002\u00020\u0001B\u0017\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u00a2\u0006\u0004\b\u0006\u0010\u0007J(\u0010\u0012\u001a\u00020\u000f2\u0006\u0010\u0013\u001a\u00020\u000e2\n\b\u0002\u0010\u0014\u001a\u0004\u0018\u00010\u000e2\n\b\u0002\u0010\u0015\u001a\u0004\u0018\u00010\u000eH\u0016J\u0018\u0010\u0016\u001a\u00020\u000f2\u0006\u0010\u0014\u001a\u00020\u000e2\u0006\u0010\u0017\u001a\u00020\u000eH\u0016J$\u0010\u0018\u001a\u00020\u00192\u0006\u0010\u0013\u001a\u00020\u000e2\u0006\u0010\u001a\u001a\u00020\u001b2\n\b\u0002\u0010\u0014\u001a\u0004\u0018\u00010\u000eH\u0016J\u0010\u0010\u001c\u001a\u00020\u00192\u0006\u0010\u0013\u001a\u00020\u000eH\u0016J\b\u0010\u001d\u001a\u00020\u0019H\u0016R\u000e\u0010\u0002\u001a\u00020\u0003X\u0092\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0004\u001a\u00020\u0005X\u0092\u0004\u00a2\u0006\u0002\n\u0000R\u0018\u0010\b\u001a\n \n*\u0004\u0018\u00010\t0\tX\u0092\u0004\u00a2\u0006\u0004\n\u0002\u0010\u000bR\u001a\u0010\f\u001a\u000e\u0012\u0004\u0012\u00020\u000e\u0012\u0004\u0012\u00020\u000f0\rX\u0092\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0010\u001a\u00020\u0011X\u0092\u0004\u00a2\u0006\u0002\n\u0000\u00a8\u0006\u001e"}, d2={"Lcom/embabel/agent/a2a/server/support/A2AStreamingHandler;", "", "objectMapper", "Lcom/fasterxml/jackson/databind/ObjectMapper;", "taskStateManager", "Lcom/embabel/agent/a2a/server/support/TaskStateManager;", "<init>", "(Lcom/fasterxml/jackson/databind/ObjectMapper;Lcom/embabel/agent/a2a/server/support/TaskStateManager;)V", "logger", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "Lorg/slf4j/Logger;", "activeStreams", "Ljava/util/concurrent/ConcurrentHashMap;", "", "Lorg/springframework/web/servlet/mvc/method/annotation/SseEmitter;", "scheduler", "Ljava/util/concurrent/ScheduledExecutorService;", "createStream", "streamId", "taskId", "contextId", "resubscribeToTask", "newStreamId", "sendStreamEvent", "", "event", "Lio/a2a/spec/StreamingEventKind;", "closeStream", "shutdown", "embabel-agent-a2a"})
@SourceDebugExtension(value={"SMAP\nA2AStreamingHandler.kt\nKotlin\n*S Kotlin\n*F\n+ 1 A2AStreamingHandler.kt\ncom/embabel/agent/a2a/server/support/A2AStreamingHandler\n+ 2 _Collections.kt\nkotlin/collections/CollectionsKt___CollectionsKt\n*L\n1#1,165:1\n1863#2,2:166\n*S KotlinDebug\n*F\n+ 1 A2AStreamingHandler.kt\ncom/embabel/agent/a2a/server/support/A2AStreamingHandler\n*L\n94#1:166,2\n*E\n"})
public class A2AStreamingHandler {
    @NotNull
    private final ObjectMapper objectMapper;
    @NotNull
    private final TaskStateManager taskStateManager;
    private final Logger logger;
    @NotNull
    private final ConcurrentHashMap<String, SseEmitter> activeStreams;
    @NotNull
    private final ScheduledExecutorService scheduler;

    public A2AStreamingHandler(@NotNull ObjectMapper objectMapper, @NotNull TaskStateManager taskStateManager) {
        Intrinsics.checkNotNullParameter((Object)objectMapper, (String)"objectMapper");
        Intrinsics.checkNotNullParameter((Object)taskStateManager, (String)"taskStateManager");
        this.objectMapper = objectMapper;
        this.taskStateManager = taskStateManager;
        this.logger = LoggerFactory.getLogger(A2AStreamingHandler.class);
        this.activeStreams = new ConcurrentHashMap();
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        Intrinsics.checkNotNullExpressionValue((Object)scheduledExecutorService, (String)"newScheduledThreadPool(...)");
        this.scheduler = scheduledExecutorService;
    }

    @NotNull
    public SseEmitter createStream(@NotNull String streamId, @Nullable String taskId, @Nullable String contextId) {
        Intrinsics.checkNotNullParameter((Object)streamId, (String)"streamId");
        this.logger.info("Creating SSE stream for streamId: {}, taskId: {}", (Object)streamId, (Object)taskId);
        SseEmitter emitter = new SseEmitter(Long.valueOf(Long.MAX_VALUE));
        ((Map)this.activeStreams).put(streamId, emitter);
        if (taskId != null && contextId != null) {
            this.taskStateManager.registerTask(taskId, contextId, streamId);
        }
        emitter.onCompletion(() -> A2AStreamingHandler.createStream$lambda$0(this, streamId));
        emitter.onTimeout(() -> A2AStreamingHandler.createStream$lambda$1(this, streamId));
        return emitter;
    }

    public static /* synthetic */ SseEmitter createStream$default(A2AStreamingHandler a2AStreamingHandler, String string, String string2, String string3, int n, Object object) {
        if (object != null) {
            throw new UnsupportedOperationException("Super calls with default arguments not supported in this target, function: createStream");
        }
        if ((n & 2) != 0) {
            string2 = null;
        }
        if ((n & 4) != 0) {
            string3 = null;
        }
        return a2AStreamingHandler.createStream(string, string2, string3);
    }

    @NotNull
    public SseEmitter resubscribeToTask(@NotNull String taskId, @NotNull String newStreamId) {
        Intrinsics.checkNotNullParameter((Object)taskId, (String)"taskId");
        Intrinsics.checkNotNullParameter((Object)newStreamId, (String)"newStreamId");
        this.logger.info("Resubscribing to task: taskId={}, newStreamId={}", (Object)taskId, (Object)newStreamId);
        if (!this.taskStateManager.taskExists(taskId)) {
            throw new IllegalArgumentException("Task not found: " + taskId);
        }
        TaskStateManager.TaskInfo taskInfo = this.taskStateManager.getTaskInfo(taskId);
        Intrinsics.checkNotNull((Object)taskInfo);
        TaskStateManager.TaskInfo taskInfo2 = taskInfo;
        SseEmitter emitter = this.createStream(newStreamId, taskId, taskInfo2.getContextId());
        this.taskStateManager.updateStreamId(taskId, newStreamId);
        Thread.startVirtualThread(() -> A2AStreamingHandler.resubscribeToTask$lambda$3(taskInfo2, this, taskId, newStreamId, emitter));
        return emitter;
    }

    public void sendStreamEvent(@NotNull String streamId, @NotNull StreamingEventKind event, @Nullable String taskId) {
        Intrinsics.checkNotNullParameter((Object)streamId, (String)"streamId");
        Intrinsics.checkNotNullParameter((Object)event, (String)"event");
        SseEmitter sseEmitter = this.activeStreams.get(streamId);
        if (sseEmitter == null) {
            A2AStreamingHandler $this$sendStreamEvent_u24lambda_u244 = this;
            boolean bl = false;
            $this$sendStreamEvent_u24lambda_u244.logger.warn("No active stream found for streamId: {}", (Object)streamId);
            return;
        }
        SseEmitter emitter = sseEmitter;
        try {
            String string = taskId;
            if (string != null) {
                String tid = string;
                boolean bl = false;
                this.taskStateManager.recordEvent(tid, event);
            }
            SendStreamingMessageResponse response = new SendStreamingMessageResponse("2.0", (Object)streamId, event, null);
            SseEmitter.SseEventBuilder sseEventBuilder = SseEmitter.event().data((Object)this.objectMapper.writeValueAsString((Object)response), MediaType.APPLICATION_JSON);
            Intrinsics.checkNotNullExpressionValue((Object)sseEventBuilder, (String)"data(...)");
            SseEmitter.SseEventBuilder eventData = sseEventBuilder;
            emitter.send(eventData);
        }
        catch (Exception e) {
            this.logger.error("Error sending stream event", (Throwable)e);
            emitter.completeWithError((Throwable)e);
        }
    }

    public static /* synthetic */ void sendStreamEvent$default(A2AStreamingHandler a2AStreamingHandler, String string, StreamingEventKind streamingEventKind, String string2, int n, Object object) {
        if (object != null) {
            throw new UnsupportedOperationException("Super calls with default arguments not supported in this target, function: sendStreamEvent");
        }
        if ((n & 4) != 0) {
            string2 = null;
        }
        a2AStreamingHandler.sendStreamEvent(string, streamingEventKind, string2);
    }

    public void closeStream(@NotNull String streamId) {
        block0: {
            SseEmitter emitter;
            Intrinsics.checkNotNullParameter((Object)streamId, (String)"streamId");
            SseEmitter sseEmitter = emitter = this.activeStreams.remove(streamId);
            if (sseEmitter == null) break block0;
            sseEmitter.complete();
        }
    }

    public void shutdown() {
        this.scheduler.shutdown();
        try {
            if (!this.scheduler.awaitTermination(5L, TimeUnit.SECONDS)) {
                this.scheduler.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            this.scheduler.shutdownNow();
        }
    }

    private static final void createStream$lambda$0(A2AStreamingHandler this$0, String $streamId) {
        this$0.logger.info("Stream completed for streamId: {}", (Object)$streamId);
        this$0.activeStreams.remove($streamId);
    }

    private static final void createStream$lambda$1(A2AStreamingHandler this$0, String $streamId) {
        this$0.logger.info("Stream timed out for streamId: {}", (Object)$streamId);
        this$0.activeStreams.remove($streamId);
    }

    private static final void resubscribeToTask$lambda$3(TaskStateManager.TaskInfo $taskInfo, A2AStreamingHandler this$0, String $taskId, String $newStreamId, SseEmitter $emitter) {
        try {
            Iterable $this$forEach$iv = $taskInfo.getEvents();
            boolean $i$f$forEach = false;
            for (Object element$iv : $this$forEach$iv) {
                StreamingEventKind event = (StreamingEventKind)element$iv;
                boolean bl = false;
                this$0.sendStreamEvent($newStreamId, event, $taskId);
            }
            if (!this$0.taskStateManager.isTaskActive($taskId)) {
                this$0.closeStream($newStreamId);
            }
        }
        catch (Exception e) {
            this$0.logger.error("Error replaying events for task {}", (Object)$taskId, (Object)e);
            $emitter.completeWithError((Throwable)e);
        }
    }
}

