/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.arthas.mcp.server.protocol.server.handler;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.taobao.arthas.mcp.server.protocol.server.DefaultMcpTransportContext;
import com.taobao.arthas.mcp.server.protocol.server.McpTransportContext;
import com.taobao.arthas.mcp.server.protocol.server.McpTransportContextExtractor;
import com.taobao.arthas.mcp.server.protocol.spec.McpError;
import com.taobao.arthas.mcp.server.protocol.spec.McpSchema;
import com.taobao.arthas.mcp.server.protocol.spec.McpStreamableServerSession;
import com.taobao.arthas.mcp.server.protocol.spec.McpStreamableServerTransport;
import com.taobao.arthas.mcp.server.util.Assert;
import com.taobao.arthas.mcp.server.util.KeepAliveScheduler;
import com.taobao.arthas.mcp.server.util.McpAuthExtractor;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class McpStreamableHttpRequestHandler {
    private static final Logger logger = LoggerFactory.getLogger(McpStreamableHttpRequestHandler.class);
    public static final String MESSAGE_EVENT_TYPE = "message";
    private static final String ACCEPT = "Accept";
    public static final String UTF_8 = "UTF-8";
    public static final String APPLICATION_JSON = "application/json";
    public static final String TEXT_EVENT_STREAM = "text/event-stream";
    private static final String FAILED_TO_SEND_ERROR_RESPONSE = "Failed to send error response: {}";
    private final String mcpEndpoint;
    private final boolean disallowDelete;
    private final ObjectMapper objectMapper;
    private McpStreamableServerSession.Factory sessionFactory;
    private final ConcurrentHashMap<String, McpStreamableServerSession> sessions = new ConcurrentHashMap();
    private McpTransportContextExtractor<FullHttpRequest> contextExtractor;
    private final AtomicBoolean isClosing = new AtomicBoolean(false);
    private KeepAliveScheduler keepAliveScheduler;

    public McpStreamableHttpRequestHandler(ObjectMapper objectMapper, String mcpEndpoint, boolean disallowDelete, McpTransportContextExtractor<FullHttpRequest> contextExtractor, Duration keepAliveInterval) {
        this.objectMapper = objectMapper;
        this.mcpEndpoint = mcpEndpoint;
        this.disallowDelete = disallowDelete;
        this.contextExtractor = contextExtractor;
        if (keepAliveInterval != null) {
            this.keepAliveScheduler = KeepAliveScheduler.builder(() -> this.isClosing.get() ? Collections.emptyList() : this.sessions.values()).initialDelay(keepAliveInterval).interval(keepAliveInterval).build();
            this.keepAliveScheduler.start();
            logger.debug("Keep-alive scheduler started with interval: {}ms", (Object)keepAliveInterval.toMillis());
        }
    }

    public void setSessionFactory(McpStreamableServerSession.Factory sessionFactory) {
        this.sessionFactory = sessionFactory;
    }

    public CompletableFuture<Void> notifyClients(String method, Object params) {
        if (this.sessions.isEmpty()) {
            logger.debug("No active sessions to broadcast message to");
            return CompletableFuture.completedFuture(null);
        }
        logger.debug("Attempting to broadcast message to {} active sessions", (Object)this.sessions.size());
        return CompletableFuture.runAsync(() -> this.sessions.values().parallelStream().forEach(session -> {
            try {
                session.sendNotification(method, params);
            }
            catch (Exception e) {
                logger.error("Failed to send message to session {}: {}", (Object)session.getId(), (Object)e.getMessage());
            }
        }));
    }

    public CompletableFuture<Void> closeGracefully() {
        return CompletableFuture.runAsync(() -> {
            this.isClosing.set(true);
            logger.debug("Initiating graceful shutdown with {} active sessions", (Object)this.sessions.size());
            this.sessions.values().parallelStream().forEach(session -> {
                try {
                    session.closeGracefully();
                }
                catch (Exception e) {
                    logger.error("Failed to close session {}: {}", (Object)session.getId(), (Object)e.getMessage());
                }
            });
            this.sessions.clear();
            logger.debug("Graceful shutdown completed");
            if (this.keepAliveScheduler != null) {
                this.keepAliveScheduler.shutdown();
            }
        });
    }

    protected void handle(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        String uri = request.uri();
        if (!uri.endsWith(this.mcpEndpoint)) {
            this.sendError(ctx, HttpResponseStatus.NOT_FOUND, new McpError((Object)"Endpoint not found"));
            return;
        }
        if (this.isClosing.get()) {
            this.sendError(ctx, HttpResponseStatus.SERVICE_UNAVAILABLE, new McpError((Object)"Server is shutting down"));
            return;
        }
        HttpMethod method = request.method();
        if (method == HttpMethod.GET) {
            this.handleGetRequest(ctx, request);
        } else if (method == HttpMethod.POST) {
            this.handlePostRequest(ctx, request);
        } else if (method == HttpMethod.DELETE) {
            this.handleDeleteRequest(ctx, request);
        } else {
            this.sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED, new McpError((Object)"Method not allowed"));
        }
    }

    private void handleGetRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
        block11: {
            String sessionId;
            ArrayList<String> badRequestErrors = new ArrayList<String>();
            String accept = request.headers().get(ACCEPT);
            if (accept == null || !accept.contains(TEXT_EVENT_STREAM)) {
                badRequestErrors.add("text/event-stream required in Accept header");
            }
            if ((sessionId = request.headers().get("mcp-session-id")) == null || sessionId.trim().isEmpty()) {
                badRequestErrors.add("Session ID required in mcp-session-id header");
            }
            if (!badRequestErrors.isEmpty()) {
                String combinedMessage = String.join((CharSequence)"; ", badRequestErrors);
                this.sendError(ctx, HttpResponseStatus.BAD_REQUEST, new McpError((Object)combinedMessage));
                return;
            }
            McpStreamableServerSession session = this.sessions.get(sessionId);
            if (session == null) {
                this.sendError(ctx, HttpResponseStatus.NOT_FOUND, new McpError((Object)"Session not found"));
                return;
            }
            logger.debug("Handling GET request for session: {}", (Object)sessionId);
            McpTransportContext transportContext = this.contextExtractor.extract(request, new DefaultMcpTransportContext());
            Object authSubject = McpAuthExtractor.extractAuthSubjectFromContext(ctx);
            transportContext.put("mcp.auth.subject", authSubject);
            String userId = McpAuthExtractor.extractUserIdFromRequest(request);
            transportContext.put("mcp.user.id", userId);
            try {
                DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                response.headers().set((CharSequence)HttpHeaderNames.CONTENT_TYPE, (Object)TEXT_EVENT_STREAM);
                response.headers().set((CharSequence)HttpHeaderNames.CACHE_CONTROL, (Object)"no-cache");
                response.headers().set((CharSequence)HttpHeaderNames.CONNECTION, (Object)"keep-alive");
                response.headers().set((CharSequence)HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, (Object)"*");
                response.headers().set((CharSequence)HttpHeaderNames.TRANSFER_ENCODING, (Object)HttpHeaderValues.CHUNKED);
                ctx.writeAndFlush(response);
                NettyStreamableMcpSessionTransport sessionTransport = new NettyStreamableMcpSessionTransport(sessionId, ctx);
                String lastEventId = request.headers().get("last-event-id");
                if (lastEventId != null) {
                    try {
                        try {
                            session.replay(lastEventId).forEach(message -> {
                                try {
                                    sessionTransport.sendMessage((McpSchema.JSONRPCMessage)message).join();
                                }
                                catch (Exception e) {
                                    logger.error("Failed to replay message: {}", (Object)e.getMessage());
                                    ctx.close();
                                }
                            });
                            break block11;
                        }
                        catch (Exception e) {
                            logger.error("Failed to replay messages: {}", (Object)e.getMessage());
                            ctx.close();
                        }
                    }
                    catch (Exception e) {
                        logger.error("Failed to replay messages: {}", (Object)e.getMessage());
                        ctx.close();
                    }
                    break block11;
                }
                McpStreamableServerSession.McpStreamableServerSessionStream listeningStream = session.listeningStream(sessionTransport);
                ctx.channel().closeFuture().addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)future -> {
                    logger.debug("SSE connection closed for session: {}", (Object)sessionId);
                    listeningStream.close();
                }));
            }
            catch (Exception e) {
                logger.error("Failed to handle GET request for session {}: {}", (Object)sessionId, (Object)e.getMessage());
                this.sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR, new McpError((Object)"Internal server error"));
            }
        }
    }

    private void handlePostRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
        ArrayList<String> badRequestErrors = new ArrayList<String>();
        String accept = request.headers().get(ACCEPT);
        if (accept == null || !accept.contains(TEXT_EVENT_STREAM)) {
            badRequestErrors.add("text/event-stream required in Accept header");
        }
        if (accept == null || !accept.contains(APPLICATION_JSON)) {
            badRequestErrors.add("application/json required in Accept header");
        }
        McpTransportContext transportContext = this.contextExtractor.extract(request, new DefaultMcpTransportContext());
        Object authSubject = McpAuthExtractor.extractAuthSubjectFromContext(ctx);
        transportContext.put("mcp.auth.subject", authSubject);
        String userId = McpAuthExtractor.extractUserIdFromRequest(request);
        transportContext.put("mcp.user.id", userId);
        try {
            McpSchema.JSONRPCRequest jsonrpcRequest;
            ByteBuf content = request.content();
            String body = content.toString(CharsetUtil.UTF_8);
            McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, body);
            if (message instanceof McpSchema.JSONRPCRequest && (jsonrpcRequest = (McpSchema.JSONRPCRequest)message).getMethod().equals("initialize")) {
                if (!badRequestErrors.isEmpty()) {
                    String combinedMessage = String.join((CharSequence)"; ", badRequestErrors);
                    this.sendError(ctx, HttpResponseStatus.BAD_REQUEST, new McpError((Object)combinedMessage));
                    return;
                }
                McpSchema.InitializeRequest initializeRequest = this.objectMapper.convertValue(jsonrpcRequest.getParams(), new TypeReference<McpSchema.InitializeRequest>(){});
                McpStreamableServerSession.McpStreamableServerSessionInit init = this.sessionFactory.startSession(initializeRequest);
                this.sessions.put(init.session().getId(), init.session());
                try {
                    ((CompletableFuture)init.initResult().thenAccept(initResult -> {
                        try {
                            DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(this.objectMapper.writeValueAsString(new McpSchema.JSONRPCResponse("2.0", jsonrpcRequest.getId(), initResult, null)), CharsetUtil.UTF_8));
                            response.headers().set((CharSequence)HttpHeaderNames.CONTENT_TYPE, (Object)APPLICATION_JSON);
                            response.headers().set((CharSequence)HttpHeaderNames.CONTENT_LENGTH, (Object)response.content().readableBytes());
                            response.headers().set("mcp-session-id", (Object)init.session().getId());
                            response.headers().set((CharSequence)HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, (Object)"*");
                            ctx.writeAndFlush(response);
                        }
                        catch (Exception e) {
                            logger.error("Failed to serialize init response: {}", (Object)e.getMessage());
                            this.sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR, new McpError((Object)"Failed to serialize response"));
                        }
                    })).exceptionally(e -> {
                        logger.error("Failed to initialize session: {}", (Object)e.getMessage());
                        this.sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR, new McpError((Object)("Failed to initialize session: " + e.getMessage())));
                        return null;
                    });
                    return;
                }
                catch (Exception e2) {
                    logger.error("Failed to initialize session: {}", (Object)e2.getMessage());
                    this.sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR, new McpError((Object)("Failed to initialize session: " + e2.getMessage())));
                    return;
                }
            }
            String sessionId = request.headers().get("mcp-session-id");
            if (sessionId == null || sessionId.trim().isEmpty()) {
                badRequestErrors.add("Session ID required in mcp-session-id header");
            }
            if (!badRequestErrors.isEmpty()) {
                String combinedMessage = String.join((CharSequence)"; ", badRequestErrors);
                this.sendError(ctx, HttpResponseStatus.BAD_REQUEST, new McpError((Object)combinedMessage));
                return;
            }
            McpStreamableServerSession session = this.sessions.get(sessionId);
            if (session == null) {
                this.sendError(ctx, HttpResponseStatus.NOT_FOUND, new McpError((Object)("Session not found: " + sessionId)));
                return;
            }
            if (message instanceof McpSchema.JSONRPCResponse) {
                McpSchema.JSONRPCResponse jsonrpcResponse = (McpSchema.JSONRPCResponse)message;
                ((CompletableFuture)session.accept(jsonrpcResponse).thenRun(() -> {
                    DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.ACCEPTED);
                    response.headers().set((CharSequence)HttpHeaderNames.CONTENT_LENGTH, (Object)0);
                    ctx.writeAndFlush(response);
                })).exceptionally(e -> {
                    logger.error("Failed to accept response: {}", (Object)e.getMessage());
                    this.sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR, new McpError((Object)"Failed to accept response"));
                    return null;
                });
            } else if (message instanceof McpSchema.JSONRPCNotification) {
                McpSchema.JSONRPCNotification jsonrpcNotification = (McpSchema.JSONRPCNotification)message;
                ((CompletableFuture)session.accept(jsonrpcNotification, transportContext).thenRun(() -> {
                    DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.ACCEPTED);
                    response.headers().set((CharSequence)HttpHeaderNames.CONTENT_LENGTH, (Object)0);
                    ctx.writeAndFlush(response);
                })).exceptionally(e -> {
                    logger.error("Failed to accept notification: {}", (Object)e.getMessage());
                    this.sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR, new McpError((Object)"Failed to accept notification"));
                    return null;
                });
            } else if (message instanceof McpSchema.JSONRPCRequest) {
                McpSchema.JSONRPCRequest jsonrpcRequest2 = (McpSchema.JSONRPCRequest)message;
                DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                response.headers().set((CharSequence)HttpHeaderNames.CONTENT_TYPE, (Object)TEXT_EVENT_STREAM);
                response.headers().set((CharSequence)HttpHeaderNames.CACHE_CONTROL, (Object)"no-cache");
                response.headers().set((CharSequence)HttpHeaderNames.CONNECTION, (Object)"keep-alive");
                response.headers().set((CharSequence)HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, (Object)"*");
                response.headers().set((CharSequence)HttpHeaderNames.TRANSFER_ENCODING, (Object)HttpHeaderValues.CHUNKED);
                ctx.writeAndFlush(response);
                NettyStreamableMcpSessionTransport sessionTransport = new NettyStreamableMcpSessionTransport(sessionId, ctx);
                try {
                    session.responseStream(jsonrpcRequest2, sessionTransport, transportContext).exceptionally(e -> {
                        logger.error("Failed to handle request stream: {}", (Object)e.getMessage());
                        ctx.close();
                        return null;
                    });
                }
                catch (Exception e3) {
                    logger.error("Failed to handle request stream: {}", (Object)e3.getMessage());
                    ctx.close();
                }
            } else {
                this.sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR, new McpError((Object)"Unknown message type"));
            }
        }
        catch (IOException | IllegalArgumentException e4) {
            logger.error("Failed to deserialize message: {}", (Object)e4.getMessage());
            this.sendError(ctx, HttpResponseStatus.BAD_REQUEST, new McpError((Object)("Invalid message format: " + e4.getMessage())));
        }
        catch (Exception e5) {
            logger.error("Error handling message: {}", (Object)e5.getMessage());
            this.sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR, new McpError((Object)("Error processing message: " + e5.getMessage())));
        }
    }

    private void handleDeleteRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
        if (this.disallowDelete) {
            this.sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED, new McpError((Object)"DELETE method not allowed"));
            return;
        }
        McpTransportContext transportContext = this.contextExtractor.extract(request, new DefaultMcpTransportContext());
        String sessionId = request.headers().get("mcp-session-id");
        if (sessionId == null) {
            this.sendError(ctx, HttpResponseStatus.BAD_REQUEST, new McpError((Object)"Session ID required in mcp-session-id header"));
            return;
        }
        McpStreamableServerSession session = this.sessions.get(sessionId);
        if (session == null) {
            this.sendError(ctx, HttpResponseStatus.NOT_FOUND, new McpError((Object)"Session not found"));
            return;
        }
        try {
            ((CompletableFuture)session.delete().thenRun(() -> {
                this.sessions.remove(sessionId);
                DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                response.headers().set((CharSequence)HttpHeaderNames.CONTENT_LENGTH, (Object)0);
                ctx.writeAndFlush(response);
            })).exceptionally(e -> {
                logger.error("Failed to delete session {}: {}", (Object)sessionId, (Object)e.getMessage());
                this.sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR, new McpError((Object)e.getMessage()));
                return null;
            });
        }
        catch (Exception e2) {
            logger.error("Failed to delete session {}: {}", (Object)sessionId, (Object)e2.getMessage());
            this.sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR, new McpError((Object)"Error deleting session"));
        }
    }

    private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status, McpError mcpError) {
        try {
            String jsonError = this.objectMapper.writeValueAsString(mcpError);
            ByteBuf content = Unpooled.copiedBuffer(jsonError, CharsetUtil.UTF_8);
            DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, content);
            response.headers().set((CharSequence)HttpHeaderNames.CONTENT_TYPE, (Object)APPLICATION_JSON);
            response.headers().set((CharSequence)HttpHeaderNames.CONTENT_LENGTH, (Object)response.content().readableBytes());
            response.headers().set((CharSequence)HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, (Object)"*");
            ctx.writeAndFlush(response);
        }
        catch (Exception e) {
            logger.error(FAILED_TO_SEND_ERROR_RESPONSE, (Object)e.getMessage());
            DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR);
            response.headers().set((CharSequence)HttpHeaderNames.CONTENT_LENGTH, (Object)0);
            ctx.writeAndFlush(response);
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    public static class Builder {
        private ObjectMapper objectMapper;
        private String mcpEndpoint = "/mcp";
        private boolean disallowDelete = false;
        private McpTransportContextExtractor<FullHttpRequest> contextExtractor = (serverRequest, context) -> context;
        private Duration keepAliveInterval;

        public Builder objectMapper(ObjectMapper objectMapper) {
            Assert.notNull(objectMapper, "ObjectMapper must not be null");
            this.objectMapper = objectMapper;
            return this;
        }

        public Builder mcpEndpoint(String mcpEndpoint) {
            Assert.notNull(mcpEndpoint, "MCP endpoint must not be null");
            this.mcpEndpoint = mcpEndpoint;
            return this;
        }

        public Builder disallowDelete(boolean disallowDelete) {
            this.disallowDelete = disallowDelete;
            return this;
        }

        public Builder contextExtractor(McpTransportContextExtractor<FullHttpRequest> contextExtractor) {
            Assert.notNull(contextExtractor, "Context extractor must not be null");
            this.contextExtractor = contextExtractor;
            return this;
        }

        public Builder keepAliveInterval(Duration keepAliveInterval) {
            this.keepAliveInterval = keepAliveInterval;
            return this;
        }

        public McpStreamableHttpRequestHandler build() {
            Assert.notNull(this.objectMapper, "ObjectMapper must be set");
            Assert.notNull(this.mcpEndpoint, "MCP endpoint must be set");
            return new McpStreamableHttpRequestHandler(this.objectMapper, this.mcpEndpoint, this.disallowDelete, this.contextExtractor, this.keepAliveInterval);
        }
    }

    private class NettyStreamableMcpSessionTransport
    implements McpStreamableServerTransport {
        private final String sessionId;
        private final ChannelHandlerContext ctx;
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final ReentrantLock lock = new ReentrantLock();

        NettyStreamableMcpSessionTransport(String sessionId, ChannelHandlerContext ctx) {
            this.sessionId = sessionId;
            this.ctx = ctx;
            logger.debug("Streamable session transport {} initialized", (Object)sessionId);
        }

        @Override
        public CompletableFuture<Void> sendMessage(McpSchema.JSONRPCMessage message) {
            return this.sendMessage(message, null);
        }

        @Override
        public CompletableFuture<Void> sendMessage(McpSchema.JSONRPCMessage message, String messageId) {
            return CompletableFuture.runAsync(() -> {
                if (this.closed.get()) {
                    logger.warn("Attempted to send message to closed session: {}", (Object)this.sessionId);
                    return;
                }
                if (!this.ctx.channel().isActive()) {
                    logger.warn("Channel for session {} is not active, message will not be sent", (Object)this.sessionId);
                    return;
                }
                this.lock.lock();
                try {
                    if (this.closed.get()) {
                        logger.debug("Session {} was closed during message send attempt", (Object)this.sessionId);
                        return;
                    }
                    String jsonText = McpStreamableHttpRequestHandler.this.objectMapper.writeValueAsString(message);
                    logger.debug("Sending SSE message to session {}: {}", (Object)this.sessionId, (Object)(jsonText.length() > 200 ? jsonText.substring(0, 200) + "..." : jsonText));
                    this.sendSseEvent(McpStreamableHttpRequestHandler.MESSAGE_EVENT_TYPE, jsonText, messageId != null ? messageId : this.sessionId);
                    logger.debug("Message sent to session {} with ID {}", (Object)this.sessionId, (Object)messageId);
                }
                catch (Exception e) {
                    logger.error("Failed to send message to session {}: {}", (Object)this.sessionId, (Object)e.getMessage());
                    this.ctx.close();
                }
                finally {
                    this.lock.unlock();
                }
            });
        }

        @Override
        public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
            return McpStreamableHttpRequestHandler.this.objectMapper.convertValue(data, typeRef);
        }

        @Override
        public CompletableFuture<Void> closeGracefully() {
            return CompletableFuture.runAsync(this::close);
        }

        @Override
        public void close() {
            this.lock.lock();
            try {
                if (this.closed.get()) {
                    logger.debug("Session transport {} already closed", (Object)this.sessionId);
                    return;
                }
                this.closed.set(true);
                if (this.ctx.channel().isActive()) {
                    this.ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE);
                }
                logger.debug("Successfully closed session transport {}", (Object)this.sessionId);
            }
            catch (Exception e) {
                logger.warn("Failed to close session transport {}: {}", (Object)this.sessionId, (Object)e.getMessage());
            }
            finally {
                this.lock.unlock();
            }
        }

        @Override
        public Channel getChannel() {
            return this.ctx.channel();
        }

        private void sendSseEvent(String eventType, String data, String id) {
            StringBuilder sseData = new StringBuilder();
            if (id != null) {
                sseData.append("id: ").append(id).append("\n");
            }
            sseData.append("event: ").append(eventType).append("\n");
            sseData.append("data: ").append(data).append("\n\n");
            ByteBuf buffer = Unpooled.copiedBuffer(sseData.toString(), CharsetUtil.UTF_8);
            this.ctx.writeAndFlush(new DefaultHttpContent(buffer));
            logger.debug("SSE event sent - Type: {}, ID: {}, Data length: {}", eventType, id, data != null ? data.length() : 0);
        }
    }
}

