/*
 * Decompiled with CFR 0.152.
 */
package io.a2a.client.transport.jsonrpc.sse;

import io.a2a.grpc.StreamResponse;
import io.a2a.grpc.StreamResponseOrBuilder;
import io.a2a.grpc.utils.JSONRPCUtils;
import io.a2a.grpc.utils.ProtoUtils;
import io.a2a.jsonrpc.common.json.JsonProcessingException;
import io.a2a.spec.A2AError;
import io.a2a.spec.StreamingEventKind;
import io.a2a.spec.TaskStatusUpdateEvent;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.logging.Logger;
import org.jspecify.annotations.Nullable;

public class SSEEventListener {
    private static final Logger log = Logger.getLogger(SSEEventListener.class.getName());
    private final Consumer<StreamingEventKind> eventHandler;
    private final @Nullable Consumer<Throwable> errorHandler;
    private volatile boolean completed = false;

    public SSEEventListener(Consumer<StreamingEventKind> eventHandler, @Nullable Consumer<Throwable> errorHandler) {
        this.eventHandler = eventHandler;
        this.errorHandler = errorHandler;
    }

    public void onMessage(String message, @Nullable Future<Void> completableFuture) {
        this.handleMessage(message, completableFuture);
    }

    public void onError(Throwable throwable, @Nullable Future<Void> future) {
        if (this.errorHandler != null) {
            this.errorHandler.accept(throwable);
        }
        if (future != null) {
            future.cancel(true);
        }
    }

    public void onComplete() {
        if (this.completed) {
            log.fine("SSEEventListener.onComplete() called again - ignoring (already completed)");
            return;
        }
        this.completed = true;
        log.fine("SSEEventListener.onComplete() called - signaling successful stream completion");
        if (this.errorHandler != null) {
            log.fine("Calling errorHandler.accept(null) to signal successful completion");
            this.errorHandler.accept(null);
        } else {
            log.warning("errorHandler is null, cannot signal completion");
        }
    }

    private void handleMessage(String message, @Nullable Future<Void> future) {
        try {
            StreamResponse response = JSONRPCUtils.parseResponseEvent((String)message);
            StreamingEventKind event = ProtoUtils.FromProto.streamingEventKind((StreamResponseOrBuilder)response);
            this.eventHandler.accept(event);
            if (event instanceof TaskStatusUpdateEvent && ((TaskStatusUpdateEvent)event).isFinal() && future != null) {
                future.cancel(true);
            }
        }
        catch (A2AError error) {
            if (this.errorHandler != null) {
                this.errorHandler.accept(error);
            }
        }
        catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }
}

