/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.arthas.mcp.server.protocol.spec;

import com.fasterxml.jackson.core.type.TypeReference;
import com.taobao.arthas.mcp.server.CommandExecutor;
import com.taobao.arthas.mcp.server.protocol.server.McpNettyServerExchange;
import com.taobao.arthas.mcp.server.protocol.server.McpNotificationHandler;
import com.taobao.arthas.mcp.server.protocol.server.McpRequestHandler;
import com.taobao.arthas.mcp.server.protocol.server.McpTransportContext;
import com.taobao.arthas.mcp.server.protocol.spec.EventStore;
import com.taobao.arthas.mcp.server.protocol.spec.McpError;
import com.taobao.arthas.mcp.server.protocol.spec.McpSchema;
import com.taobao.arthas.mcp.server.protocol.spec.McpSession;
import com.taobao.arthas.mcp.server.protocol.spec.McpStreamableServerTransport;
import com.taobao.arthas.mcp.server.protocol.spec.MissingMcpTransportSession;
import com.taobao.arthas.mcp.server.session.ArthasCommandContext;
import com.taobao.arthas.mcp.server.session.ArthasCommandSessionManager;
import com.taobao.arthas.mcp.server.util.Assert;
import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class McpStreamableServerSession
implements McpSession {
    private static final Logger logger = LoggerFactory.getLogger(McpStreamableServerSession.class);
    private final ConcurrentHashMap<Object, McpStreamableServerSessionStream> requestIdToStream = new ConcurrentHashMap();
    private final String id;
    private final Duration requestTimeout;
    private final AtomicLong requestCounter = new AtomicLong(0L);
    private final Map<String, McpRequestHandler<?>> requestHandlers;
    private final Map<String, McpNotificationHandler> notificationHandlers;
    private final AtomicReference<McpSchema.ClientCapabilities> clientCapabilities = new AtomicReference();
    private final AtomicReference<McpSchema.Implementation> clientInfo = new AtomicReference();
    private final AtomicReference<McpSession> listeningStreamRef;
    private final MissingMcpTransportSession missingMcpTransportSession;
    private volatile McpSchema.LoggingLevel minLoggingLevel = McpSchema.LoggingLevel.INFO;
    private final CommandExecutor commandExecutor;
    private final ArthasCommandSessionManager commandSessionManager;
    private final EventStore eventStore;

    public McpStreamableServerSession(String id, McpSchema.ClientCapabilities clientCapabilities, McpSchema.Implementation clientInfo, Duration requestTimeout, Map<String, McpRequestHandler<?>> requestHandlers, Map<String, McpNotificationHandler> notificationHandlers, CommandExecutor commandExecutor, EventStore eventStore) {
        this.id = id;
        this.missingMcpTransportSession = new MissingMcpTransportSession(id);
        this.listeningStreamRef = new AtomicReference<MissingMcpTransportSession>(this.missingMcpTransportSession);
        this.clientCapabilities.lazySet(clientCapabilities);
        this.clientInfo.lazySet(clientInfo);
        this.requestTimeout = requestTimeout;
        this.requestHandlers = requestHandlers;
        this.notificationHandlers = notificationHandlers;
        this.commandExecutor = commandExecutor;
        this.commandSessionManager = new ArthasCommandSessionManager(commandExecutor);
        this.eventStore = eventStore;
    }

    public void setMinLoggingLevel(McpSchema.LoggingLevel minLoggingLevel) {
        Assert.notNull((Object)minLoggingLevel, "minLoggingLevel must not be null");
        this.minLoggingLevel = minLoggingLevel;
    }

    public boolean isNotificationForLevelAllowed(McpSchema.LoggingLevel loggingLevel) {
        return loggingLevel.level() >= this.minLoggingLevel.level();
    }

    public String getId() {
        return this.id;
    }

    private String generateRequestId() {
        return this.id + "-" + this.requestCounter.getAndIncrement();
    }

    @Override
    public <T> CompletableFuture<T> sendRequest(String method, Object requestParams, TypeReference<T> typeRef) {
        McpSession listeningStream = this.listeningStreamRef.get();
        return listeningStream.sendRequest(method, requestParams, typeRef);
    }

    @Override
    public CompletableFuture<Void> sendNotification(String method, Object params) {
        McpSession listeningStream = this.listeningStreamRef.get();
        return listeningStream.sendNotification(method, params);
    }

    public CompletableFuture<Void> delete() {
        return this.closeGracefully().thenRun(() -> {
            try {
                this.eventStore.removeSessionEvents(this.id);
                this.commandSessionManager.closeCommandSession(this.id);
            }
            catch (Exception e) {
                logger.warn("Failed to clear session during deletion: {}", (Object)e.getMessage());
            }
        });
    }

    public McpStreamableServerSessionStream listeningStream(McpStreamableServerTransport transport) {
        McpStreamableServerSessionStream listeningStream = new McpStreamableServerSessionStream(transport);
        this.listeningStreamRef.set(listeningStream);
        return listeningStream;
    }

    public Stream<McpSchema.JSONRPCMessage> replay(Object lastEventId) {
        String lastEventIdStr = lastEventId != null ? lastEventId.toString() : null;
        return this.eventStore.getEventsForSession(this.id, lastEventIdStr).map(EventStore.StoredEvent::getMessage);
    }

    public CompletableFuture<Void> responseStream(McpSchema.JSONRPCRequest jsonrpcRequest, McpStreamableServerTransport transport, McpTransportContext transportContext) {
        McpStreamableServerSessionStream stream = new McpStreamableServerSessionStream(transport);
        McpRequestHandler<?> requestHandler = this.requestHandlers.get(jsonrpcRequest.getMethod());
        if (requestHandler == null) {
            MethodNotFoundError error = this.getMethodNotFoundError(jsonrpcRequest.getMethod());
            McpSchema.JSONRPCResponse errorResponse = new McpSchema.JSONRPCResponse("2.0", jsonrpcRequest.getId(), null, new McpSchema.JSONRPCResponse.JSONRPCError(-32601, error.getMessage(), error.getData()));
            try {
                this.eventStore.storeEvent(this.id, errorResponse);
            }
            catch (Exception e) {
                logger.warn("Failed to store error response event: {}", (Object)e.getMessage());
            }
            return transport.sendMessage(errorResponse, null).thenCompose(v -> transport.closeGracefully());
        }
        ArthasCommandContext commandContext = this.createCommandContext(transportContext.get("mcp.auth.subject"));
        return ((CompletableFuture)((CompletableFuture)requestHandler.handle(new McpNettyServerExchange(this.id, stream, this.clientCapabilities.get(), this.clientInfo.get(), transportContext), commandContext, jsonrpcRequest.getParams()).thenApply(result -> new McpSchema.JSONRPCResponse("2.0", jsonrpcRequest.getId(), result, null))).thenCompose(response -> transport.sendMessage((McpSchema.JSONRPCMessage)response, null))).thenCompose(v -> transport.closeGracefully());
    }

    public CompletableFuture<Void> accept(McpSchema.JSONRPCNotification notification, McpTransportContext transportContext) {
        McpNotificationHandler notificationHandler = this.notificationHandlers.get(notification.getMethod());
        if (notificationHandler == null) {
            logger.error("No handler registered for notification method: {}", (Object)notification.getMethod());
            return CompletableFuture.completedFuture(null);
        }
        ArthasCommandContext commandContext = this.createCommandContext(transportContext.get("mcp.auth.subject"));
        McpSession listeningStream = this.listeningStreamRef.get();
        return notificationHandler.handle(new McpNettyServerExchange(this.id, listeningStream, this.clientCapabilities.get(), this.clientInfo.get(), transportContext), commandContext, notification.getParams());
    }

    public CompletableFuture<Void> accept(McpSchema.JSONRPCResponse response) {
        McpStreamableServerSessionStream stream = this.requestIdToStream.get(response.getId());
        if (stream == null) {
            CompletableFuture<Object> f = CompletableFuture.completedFuture(null);
            f.completeExceptionally(new McpError((Object)("Unexpected response for unknown id " + response.getId())));
            return f;
        }
        CompletableFuture future = (CompletableFuture)stream.pendingResponses.remove(response.getId());
        if (future == null) {
            CompletableFuture<Object> f = CompletableFuture.completedFuture(null);
            f.completeExceptionally(new McpError((Object)("Unexpected response for unknown id " + response.getId())));
            return f;
        }
        future.complete(response);
        return CompletableFuture.completedFuture(null);
    }

    private MethodNotFoundError getMethodNotFoundError(String method) {
        return new MethodNotFoundError(method, "Method not found: " + method, null);
    }

    @Override
    public CompletableFuture<Void> closeGracefully() {
        McpSession listeningStream = this.listeningStreamRef.getAndSet(this.missingMcpTransportSession);
        try {
            this.commandSessionManager.closeCommandSession(this.id);
            logger.debug("Successfully closed command session during graceful shutdown: {}", (Object)this.id);
        }
        catch (Exception e) {
            logger.warn("Failed to close command session during graceful shutdown: {}", (Object)e.getMessage());
        }
        return listeningStream.closeGracefully();
    }

    @Override
    public void close() {
        McpSession listeningStream = this.listeningStreamRef.getAndSet(this.missingMcpTransportSession);
        try {
            this.commandSessionManager.closeCommandSession(this.id);
            logger.debug("Successfully closed command session during close: {}", (Object)this.id);
        }
        catch (Exception e) {
            logger.warn("Failed to close command session during close: {}", (Object)e.getMessage());
        }
        if (listeningStream != null) {
            listeningStream.close();
        }
    }

    private ArthasCommandContext createCommandContext(Object authSubject) {
        ArthasCommandSessionManager.CommandSessionBinding binding = this.commandSessionManager.getCommandSession(this.id, authSubject);
        return new ArthasCommandContext(this.commandExecutor, binding);
    }

    public final class McpStreamableServerSessionStream
    implements McpSession {
        private final ConcurrentHashMap<Object, CompletableFuture<McpSchema.JSONRPCResponse>> pendingResponses = new ConcurrentHashMap();
        private final McpStreamableServerTransport transport;
        private final String transportId;
        private final Supplier<String> uuidGenerator;

        public McpStreamableServerSessionStream(McpStreamableServerTransport transport) {
            this.transport = transport;
            this.transportId = UUID.randomUUID().toString();
            this.uuidGenerator = () -> this.transportId + "_" + UUID.randomUUID();
        }

        @Override
        public <T> CompletableFuture<T> sendRequest(String method, Object requestParams, TypeReference<T> typeRef) {
            String requestId = McpStreamableServerSession.this.generateRequestId();
            McpStreamableServerSession.this.requestIdToStream.put(requestId, this);
            CompletableFuture responseFuture = new CompletableFuture();
            this.pendingResponses.put(requestId, responseFuture);
            McpSchema.JSONRPCRequest jsonrpcRequest = new McpSchema.JSONRPCRequest("2.0", method, requestId, requestParams);
            String messageId = null;
            try {
                messageId = McpStreamableServerSession.this.eventStore.storeEvent(McpStreamableServerSession.this.id, jsonrpcRequest);
            }
            catch (Exception e) {
                logger.warn("Failed to store outbound request event: {}", (Object)e.getMessage());
            }
            this.transport.sendMessage(jsonrpcRequest, messageId).exceptionally(ex -> {
                responseFuture.completeExceptionally((Throwable)ex);
                return null;
            });
            return responseFuture.handle((jsonRpcResponse, throwable) -> {
                this.pendingResponses.remove(requestId);
                McpStreamableServerSession.this.requestIdToStream.remove(requestId);
                if (throwable != null) {
                    if (throwable instanceof RuntimeException) {
                        throw (RuntimeException)throwable;
                    }
                    throw new RuntimeException((Throwable)throwable);
                }
                if (jsonRpcResponse.getError() != null) {
                    throw new RuntimeException(new McpError(jsonRpcResponse.getError()));
                }
                if (typeRef.getType().equals(Void.class)) {
                    return null;
                }
                return this.transport.unmarshalFrom(jsonRpcResponse.getResult(), typeRef);
            });
        }

        @Override
        public CompletableFuture<Void> sendNotification(String method, Object params) {
            McpSchema.JSONRPCNotification jsonrpcNotification = new McpSchema.JSONRPCNotification("2.0", method, params);
            String messageId = null;
            try {
                messageId = McpStreamableServerSession.this.eventStore.storeEvent(McpStreamableServerSession.this.id, jsonrpcNotification);
            }
            catch (Exception e) {
                logger.warn("Failed to store outbound notification event: {}", (Object)e.getMessage());
            }
            return this.transport.sendMessage(jsonrpcNotification, messageId);
        }

        @Override
        public CompletableFuture<Void> closeGracefully() {
            this.pendingResponses.values().forEach(future -> future.completeExceptionally(new RuntimeException("Stream closed")));
            this.pendingResponses.clear();
            McpStreamableServerSession.this.listeningStreamRef.compareAndSet(this, McpStreamableServerSession.this.missingMcpTransportSession);
            McpStreamableServerSession.this.requestIdToStream.values().removeIf(this::equals);
            return this.transport.closeGracefully();
        }

        @Override
        public void close() {
            this.pendingResponses.values().forEach(future -> future.completeExceptionally(new RuntimeException("Stream closed")));
            this.pendingResponses.clear();
            McpStreamableServerSession.this.listeningStreamRef.compareAndSet(this, McpStreamableServerSession.this.missingMcpTransportSession);
            McpStreamableServerSession.this.requestIdToStream.values().removeIf(this::equals);
            this.transport.close();
        }
    }

    public static class McpStreamableServerSessionInit {
        private final McpStreamableServerSession session;
        private final CompletableFuture<McpSchema.InitializeResult> initResult;

        public McpStreamableServerSessionInit(McpStreamableServerSession session, CompletableFuture<McpSchema.InitializeResult> initResult) {
            this.session = session;
            this.initResult = initResult;
        }

        public McpStreamableServerSession session() {
            return this.session;
        }

        public CompletableFuture<McpSchema.InitializeResult> initResult() {
            return this.initResult;
        }
    }

    public static interface Factory {
        public McpStreamableServerSessionInit startSession(McpSchema.InitializeRequest var1);
    }

    public class MethodNotFoundError {
        private final String method;
        private final String message;
        private final Object data;

        public MethodNotFoundError(String method, String message, Object data) {
            this.method = method;
            this.message = message;
            this.data = data;
        }

        public String getMethod() {
            return this.method;
        }

        public String getMessage() {
            return this.message;
        }

        public Object getData() {
            return this.data;
        }
    }
}

