/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.cloud.ai.graph.agent.a2a;

import com.alibaba.cloud.ai.graph.CompileConfig;
import com.alibaba.cloud.ai.graph.GraphResponse;
import com.alibaba.cloud.ai.graph.NodeOutput;
import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.RunnableConfig;
import com.alibaba.cloud.ai.graph.action.NodeActionWithConfig;
import com.alibaba.cloud.ai.graph.agent.a2a.AgentCardWrapper;
import com.alibaba.cloud.ai.graph.async.AsyncGenerator;
import com.alibaba.cloud.ai.graph.async.AsyncGeneratorQueue;
import com.alibaba.cloud.ai.graph.streaming.StreamingOutput;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson.parser.Feature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.springframework.ai.chat.prompt.PromptTemplate;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;

public class A2aNodeActionWithConfig
implements NodeActionWithConfig {
    private final String agentName;
    private final AgentCardWrapper agentCard;
    private final boolean includeContents;
    private final String outputKeyToParent;
    private final boolean streaming;
    private final String instruction;
    private boolean shareState;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private CompileConfig parentCompileConfig;

    public A2aNodeActionWithConfig(AgentCardWrapper agentCard, String agentName, boolean includeContents, String outputKeyToParent, String instruction, boolean streaming) {
        this.agentName = agentName;
        this.agentCard = agentCard;
        this.includeContents = includeContents;
        this.outputKeyToParent = outputKeyToParent;
        this.streaming = streaming;
        this.instruction = instruction;
        this.shareState = false;
    }

    public A2aNodeActionWithConfig(AgentCardWrapper agentCard, String agentName, boolean includeContents, String outputKeyToParent, String instruction, boolean streaming, boolean shareState, CompileConfig compileConfig) {
        this(agentCard, agentName, includeContents, outputKeyToParent, instruction, streaming);
        this.parentCompileConfig = compileConfig;
        this.shareState = shareState;
    }

    public Map<String, Object> apply(OverAllState state, RunnableConfig config) throws Exception {
        RunnableConfig subGraphRunnableConfig = this.getSubGraphRunnableConfig(config);
        if (this.streaming) {
            AsyncGenerator<NodeOutput> generator = this.createStreamingGenerator(state, subGraphRunnableConfig);
            Flux<GraphResponse<NodeOutput>> flux = this.toFlux(generator);
            return Map.of(StringUtils.hasLength((String)this.outputKeyToParent) ? this.outputKeyToParent : "messages", flux);
        }
        String requestPayload = this.buildSendMessageRequest(state, subGraphRunnableConfig);
        String resultText = this.sendMessageToServer(this.agentCard, requestPayload);
        Map<String, Object> resultMap = this.autoDetectAndParseResponse(resultText);
        Map result = (Map)resultMap.get("result");
        String responseText = this.extractResponseText(result);
        return Map.of(this.outputKeyToParent, responseText);
    }

    private RunnableConfig getSubGraphRunnableConfig(RunnableConfig config) {
        if (this.shareState) {
            return config;
        }
        return RunnableConfig.builder((RunnableConfig)config).threadId(config.threadId().map(threadId -> String.format("%s_%s", threadId, this.subGraphId())).orElseGet(this::subGraphId)).nextNode(null).checkPointId(null).build();
    }

    public String subGraphId() {
        return String.format("subgraph_%s", this.agentCard.name());
    }

    private <E> Flux<GraphResponse<E>> toFlux(AsyncGenerator<E> generator) {
        return Flux.create(sink -> {
            Disposable disposable = Schedulers.boundedElastic().schedule(() -> this.drainGenerator(generator, (FluxSink)sink));
            sink.onCancel(() -> ((Disposable)disposable).dispose());
            sink.onDispose(() -> ((Disposable)disposable).dispose());
        });
    }

    private <E> void drainGenerator(AsyncGenerator<E> generator, FluxSink<GraphResponse<E>> sink) {
        AsyncGenerator.Data data;
        if (sink.isCancelled()) {
            return;
        }
        try {
            data = generator.next();
        }
        catch (Exception ex) {
            sink.error((Throwable)ex);
            return;
        }
        if (data.isDone()) {
            data.resultValue().ifPresent(result -> {
                if (!sink.isCancelled()) {
                    sink.next((Object)GraphResponse.done((Object)result));
                }
            });
            if (!sink.isCancelled()) {
                sink.complete();
            }
            return;
        }
        CompletableFuture future = data.getData();
        if (future == null) {
            sink.error((Throwable)new IllegalStateException("AsyncGenerator data is null without completion signal"));
            return;
        }
        future.whenComplete((value, throwable) -> {
            if (sink.isCancelled()) {
                return;
            }
            if (throwable != null) {
                Throwable actual = this.unwrapCompletionException((Throwable)throwable);
                sink.error(actual);
                return;
            }
            if (!sink.isCancelled()) {
                sink.next((Object)GraphResponse.of((Object)value));
            }
            this.drainGenerator(generator, sink);
        });
    }

    private Throwable unwrapCompletionException(Throwable throwable) {
        CompletionException completionException;
        if (throwable instanceof CompletionException && (completionException = (CompletionException)throwable).getCause() != null) {
            return completionException.getCause();
        }
        return throwable;
    }

    private AsyncGenerator<NodeOutput> createStreamingGenerator(OverAllState state, RunnableConfig config) throws Exception {
        String requestPayload = this.buildSendStreamingMessageRequest(state, config);
        LinkedBlockingQueue queue = new LinkedBlockingQueue(1000);
        String outputKey = StringUtils.hasLength((String)this.outputKeyToParent) ? this.outputKeyToParent : "messages";
        StringBuilder accumulated = new StringBuilder();
        return AsyncGeneratorQueue.of(queue, q -> {
            String baseUrl = this.resolveAgentBaseUrl(this.agentCard);
            if (baseUrl == null || baseUrl.isBlank()) {
                StreamingOutput errorOutput = new StreamingOutput("Error: AgentCard.url is empty", "a2aNode", this.agentName, state);
                queue.add(AsyncGenerator.Data.of((Object)errorOutput));
                return;
            }
            try {
                HttpEntity entity;
                block38: {
                    CloseableHttpClient httpClient = HttpClients.createDefault();
                    HttpPost post = new HttpPost(baseUrl);
                    post.setHeader("Content-Type", "application/json");
                    post.setHeader("Accept", "text/event-stream");
                    post.setEntity((HttpEntity)new StringEntity(requestPayload, ContentType.APPLICATION_JSON));
                    CloseableHttpResponse response = httpClient.execute((HttpUriRequest)post);
                    int statusCode = response.getStatusLine().getStatusCode();
                    if (statusCode != 200) {
                        StreamingOutput errorOutput = new StreamingOutput("HTTP request failed, status: " + statusCode, "a2aNode", this.agentName, state);
                        queue.add(AsyncGenerator.Data.of((Object)errorOutput));
                        return;
                    }
                    entity = response.getEntity();
                    if (entity == null) {
                        StreamingOutput errorOutput = new StreamingOutput("Empty HTTP entity", "a2aNode", this.agentName, state);
                        queue.add(AsyncGenerator.Data.of((Object)errorOutput));
                        return;
                    }
                    String contentType = entity.getContentType() != null ? entity.getContentType().getValue() : "";
                    boolean isEventStream = contentType.contains("text/event-stream");
                    if (!isEventStream) {
                        String body = EntityUtils.toString((HttpEntity)entity, (String)"UTF-8");
                        try {
                            Map resultMap = (Map)JSON.parseObject((String)body, (TypeReference)new TypeReference<Map<String, Object>>(){}, (Feature[])new Feature[0]);
                            Map result = (Map)resultMap.get("result");
                            String text = this.extractResponseText(result);
                            if (text == null) return;
                            if (text.isEmpty()) return;
                            accumulated.append(text);
                            queue.add(AsyncGenerator.Data.of((Object)new StreamingOutput(text, "a2aNode", this.agentName, state)));
                            return;
                        }
                        catch (Exception ex) {
                            queue.add(AsyncGenerator.Data.of((Object)new StreamingOutput("Error: " + ex.getMessage(), "a2aNode", this.agentName, state)));
                            return;
                        }
                    }
                    break block38;
                    finally {
                        if (response != null) {
                            response.close();
                        }
                    }
                    finally {
                        if (httpClient != null) {
                            httpClient.close();
                        }
                    }
                }
                try (BufferedReader reader = new BufferedReader(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));){
                    String line;
                    while ((line = reader.readLine()) != null) {
                        String trimmed = line.trim();
                        if (!trimmed.startsWith("data:")) continue;
                        String jsonContent = trimmed.substring(5).trim();
                        if ("[DONE]".equals(jsonContent)) {
                            return;
                        }
                        try {
                            String text;
                            Map parsed = (Map)JSON.parseObject((String)jsonContent, (TypeReference)new TypeReference<Map<String, Object>>(){}, (Feature[])new Feature[0]);
                            Map result = (Map)parsed.get("result");
                            if (result == null || (text = this.extractResponseText(result)) == null || text.isEmpty()) continue;
                            accumulated.append(text);
                            queue.add(AsyncGenerator.Data.of((Object)new StreamingOutput(text, "a2aNode", this.agentName, state)));
                        }
                        catch (Exception exception) {
                        }
                    }
                    return;
                }
            }
            catch (Exception e) {
                StreamingOutput errorOutput = new StreamingOutput("Error: " + e.getMessage(), "a2aNode", this.agentName, state);
                queue.add(AsyncGenerator.Data.of((Object)errorOutput));
                return;
            }
            finally {
                queue.add(AsyncGenerator.Data.done(Map.of(outputKey, accumulated.toString())));
            }
        });
    }

    private boolean isSSEResponse(String responseText) {
        return responseText.contains("data: ");
    }

    private AsyncGenerator<NodeOutput> createSseStreamingGenerator(String sseResponseText, OverAllState state) {
        return this.createRealTimeSseStreamingGenerator(sseResponseText, state);
    }

    private AsyncGenerator<NodeOutput> createRealTimeSseStreamingGenerator(String sseResponseText, OverAllState state) {
        LinkedBlockingQueue queue = new LinkedBlockingQueue(1000);
        String outputKey = StringUtils.hasLength((String)this.outputKeyToParent) ? this.outputKeyToParent : "messages";
        StringBuilder accumulated = new StringBuilder();
        return AsyncGeneratorQueue.of(queue, executor -> {
            try {
                String[] lines;
                for (String line : lines = sseResponseText.split("\n")) {
                    if (!(line = line.trim()).startsWith("data: ")) continue;
                    try {
                        StreamingOutput streamingOutput;
                        String jsonContent = line.substring(6);
                        if ("[DONE]".equals(jsonContent)) break;
                        Map parsed = (Map)JSON.parseObject((String)jsonContent, (TypeReference)new TypeReference<Map<String, Object>>(){}, (Feature[])new Feature[0]);
                        Map result = (Map)parsed.get("result");
                        if (result == null || (streamingOutput = this.createStreamingOutputFromResult(result, state)) == null) continue;
                        queue.add(AsyncGenerator.Data.of((Object)streamingOutput));
                    }
                    catch (Exception e) {
                        // empty catch block
                    }
                }
                queue.add(AsyncGenerator.Data.done(Map.of(outputKey, accumulated.toString())));
            }
            catch (Exception e) {
                StreamingOutput errorOutput = new StreamingOutput("Error: " + e.getMessage(), "a2aNode", this.agentName, state);
                queue.add(AsyncGenerator.Data.of((Object)errorOutput));
                queue.add(AsyncGenerator.Data.done(Map.of(outputKey, accumulated.toString())));
            }
        });
    }

    private AsyncGenerator<NodeOutput> createSingleStreamingGenerator(String responseText, OverAllState state) {
        LinkedBlockingQueue<AsyncGenerator.Data> queue = new LinkedBlockingQueue<AsyncGenerator.Data>(10);
        String outputKey = StringUtils.hasLength((String)this.outputKeyToParent) ? this.outputKeyToParent : "messages";
        StringBuilder accumulated = new StringBuilder();
        try {
            Map resultMap = (Map)JSON.parseObject((String)responseText, (TypeReference)new TypeReference<Map<String, Object>>(){}, (Feature[])new Feature[0]);
            Map result = (Map)resultMap.get("result");
            String responseText2 = this.extractResponseText(result);
            if (responseText2 != null && !responseText2.isEmpty()) {
                accumulated.append(responseText2);
                StreamingOutput streamingOutput = new StreamingOutput(responseText2, "a2aNode", this.agentName, state);
                queue.add(AsyncGenerator.Data.of((Object)streamingOutput));
            }
        }
        catch (Exception e) {
            StreamingOutput errorOutput = new StreamingOutput("Error: " + e.getMessage(), "a2aNode", this.agentName, state);
            queue.add(AsyncGenerator.Data.of((Object)errorOutput));
        }
        queue.add(AsyncGenerator.Data.done(Map.of(outputKey, accumulated.toString())));
        return new AsyncGeneratorQueue.Generator(queue);
    }

    private StreamingOutput createStreamingOutputFromResult(Map<String, Object> result, OverAllState state) {
        String text = this.extractResponseText(result);
        if (text != null && !text.isEmpty()) {
            return new StreamingOutput(text, "a2aNode", this.agentName, state);
        }
        return null;
    }

    private Map<String, Object> autoDetectAndParseResponse(String responseText) {
        if (responseText.contains("data: ")) {
            return this.parseStreamingResponse(responseText);
        }
        return (Map)JSON.parseObject((String)responseText, (TypeReference)new TypeReference<Map<String, Object>>(){}, (Feature[])new Feature[0]);
    }

    private Map<String, Object> parseStreamingResponse(String responseText) {
        String[] lines = responseText.split("\n");
        Map lastResult = null;
        for (String line : lines) {
            if (!(line = line.trim()).startsWith("data: ")) continue;
            String jsonContent = line.substring(6);
            try {
                Map parsed = (Map)JSON.parseObject((String)jsonContent, (TypeReference)new TypeReference<Map<String, Object>>(){}, (Feature[])new Feature[0]);
                Map result = (Map)parsed.get("result");
                if (result == null || !result.containsKey("artifact") && lastResult != null) continue;
                lastResult = result;
            }
            catch (Exception e) {
                // empty catch block
            }
        }
        if (lastResult == null) {
            throw new IllegalStateException("Failed to parse any valid result from streaming response");
        }
        HashMap<String, Object> resultMap = new HashMap<String, Object>();
        resultMap.put("result", lastResult);
        return resultMap;
    }

    private String extractResponseText(Map<String, Object> result) {
        String text;
        Map lastPart;
        List parts;
        Map message;
        String text2;
        Map lastPart2;
        List parts2;
        List artifacts;
        if (result == null) {
            throw new IllegalStateException("Result is null, cannot extract response text");
        }
        if ("status-update".equals(result.get("kind"))) {
            Map status = (Map)result.get("status");
            if (status != null) {
                String state = (String)status.get("state");
                if ("completed".equals(state)) {
                    return "";
                }
                if ("processing".equals(state)) {
                    return "";
                }
                if ("failed".equals(state)) {
                    return "";
                }
                if ("working".equals(state)) {
                    String text3;
                    Map lastPart3;
                    List parts3;
                    Map message2 = (Map)status.get("message");
                    if (message2 != null && message2.containsKey("parts") && (parts3 = (List)message2.get("parts")) != null && !parts3.isEmpty() && (lastPart3 = (Map)parts3.get(parts3.size() - 1)) != null && (text3 = (String)lastPart3.get("text")) != null) {
                        return text3;
                    }
                    return "";
                }
                if ("submitted".equals(state)) {
                    return "";
                }
                if ("canceled".equals(state)) {
                    return "";
                }
                return "Agent State: " + state;
            }
            return "";
        }
        if ("artifact-update".equals(result.get("kind"))) {
            List parts4;
            Map artifact = (Map)result.get("artifact");
            if (artifact != null && artifact.containsKey("parts") && (parts4 = (List)artifact.get("parts")) != null && !parts4.isEmpty()) {
                StringBuilder responseBuilder = new StringBuilder();
                for (Object part : parts4) {
                    String text4;
                    if (!(part instanceof Map) || (text4 = (String)((Map)part).get("text")) == null) continue;
                    responseBuilder.append(text4);
                }
                String response = responseBuilder.toString();
                if (!response.isEmpty()) {
                    return response;
                }
            }
            return "";
        }
        if (result.containsKey("artifacts") && (artifacts = (List)result.get("artifacts")) != null && !artifacts.isEmpty()) {
            StringBuilder responseBuilder = new StringBuilder();
            for (Object artifact : artifacts) {
                List parts5;
                if (!(artifact instanceof Map) || (parts5 = (List)((Map)artifact).get("parts")) == null) continue;
                for (Object part : parts5) {
                    String text5;
                    if (!(part instanceof Map) || (text5 = (String)((Map)part).get("text")) == null) continue;
                    responseBuilder.append(text5);
                }
            }
            String response = responseBuilder.toString();
            if (!response.isEmpty()) {
                return response;
            }
        }
        if (result.containsKey("parts") && (parts2 = (List)result.get("parts")) != null && !parts2.isEmpty() && (lastPart2 = (Map)parts2.get(parts2.size() - 1)) != null && (text2 = (String)lastPart2.get("text")) != null) {
            return text2;
        }
        if (result.containsKey("message") && (message = (Map)result.get("message")) != null && message.containsKey("parts") && (parts = (List)message.get("parts")) != null && !parts.isEmpty() && (lastPart = (Map)parts.get(parts.size() - 1)) != null && (text = (String)lastPart.get("text")) != null) {
            return text;
        }
        throw new IllegalStateException("No valid text content found in result: " + result);
    }

    private String buildSendMessageRequest(OverAllState state, RunnableConfig config) {
        String textValue = this.getEffectiveInstruction(state);
        String text = String.valueOf(textValue);
        String id = UUID.randomUUID().toString();
        String messageId = UUID.randomUUID().toString().replace("-", "");
        Map<String, String> part = Map.of("kind", "text", "text", text);
        HashMap<String, Object> message = new HashMap<String, Object>();
        message.put("kind", "message");
        message.put("messageId", messageId);
        message.put("parts", List.of(part));
        message.put("role", "user");
        HashMap params = new HashMap();
        params.put("message", message);
        HashMap metadata = new HashMap();
        config.threadId().ifPresent(threadId -> metadata.put("threadId", threadId));
        config.metadata("userId").ifPresent(userId -> metadata.put("userId", userId));
        params.put("metadata", metadata);
        HashMap<String, Object> root = new HashMap<String, Object>();
        root.put("id", id);
        root.put("jsonrpc", "2.0");
        root.put("method", "message/send");
        root.put("params", params);
        try {
            return this.objectMapper.writeValueAsString(root);
        }
        catch (Exception e) {
            throw new IllegalStateException("Failed to build JSON-RPC payload", e);
        }
    }

    private String buildSendStreamingMessageRequest(OverAllState state, RunnableConfig config) {
        String textValue = this.getEffectiveInstruction(state);
        String text = String.valueOf(textValue);
        String id = UUID.randomUUID().toString();
        String messageId = UUID.randomUUID().toString().replace("-", "");
        Map<String, String> part = Map.of("kind", "text", "text", text);
        HashMap<String, Object> message = new HashMap<String, Object>();
        message.put("kind", "message");
        message.put("messageId", messageId);
        message.put("parts", List.of(part));
        message.put("role", "user");
        HashMap params = new HashMap();
        params.put("message", message);
        HashMap metadata = new HashMap();
        config.threadId().ifPresent(threadId -> metadata.put("threadId", threadId));
        config.metadata("userId").ifPresent(userId -> metadata.put("userId", userId));
        params.put("metadata", metadata);
        HashMap<String, Object> root = new HashMap<String, Object>();
        root.put("id", id);
        root.put("jsonrpc", "2.0");
        root.put("method", "message/stream");
        root.put("params", params);
        try {
            return this.objectMapper.writeValueAsString(root);
        }
        catch (Exception e) {
            throw new IllegalStateException("Failed to build JSON-RPC streaming payload", e);
        }
    }

    private String getEffectiveInstruction(OverAllState state) {
        if (StringUtils.hasLength((String)this.instruction)) {
            PromptTemplate template = PromptTemplate.builder().template(this.instruction).build();
            return template.render(state.data());
        }
        if (!this.shareState || this.shareState && state.value("messages").isEmpty()) {
            throw new IllegalStateException("Instruction is empty and shareState is false");
        }
        return "";
    }

    private String sendMessageToServer(AgentCardWrapper agentCard, String requestPayload) throws Exception {
        String baseUrl = this.resolveAgentBaseUrl(agentCard);
        System.out.println(baseUrl);
        System.out.println(requestPayload);
        if (baseUrl == null || baseUrl.isBlank()) {
            throw new IllegalStateException("AgentCard.url is empty");
        }
        try (CloseableHttpClient httpClient = HttpClients.createDefault();){
            String string;
            block15: {
                HttpPost post = new HttpPost(baseUrl);
                post.setHeader("Content-Type", "application/json");
                post.setEntity((HttpEntity)new StringEntity(requestPayload, ContentType.APPLICATION_JSON));
                CloseableHttpResponse response = httpClient.execute((HttpUriRequest)post);
                try {
                    int statusCode = response.getStatusLine().getStatusCode();
                    if (statusCode != 200) {
                        throw new IllegalStateException("HTTP request failed, status: " + statusCode);
                    }
                    HttpEntity entity = response.getEntity();
                    if (entity == null) {
                        throw new IllegalStateException("Empty HTTP entity");
                    }
                    string = EntityUtils.toString((HttpEntity)entity, (String)"UTF-8");
                    if (response == null) break block15;
                }
                catch (Throwable throwable) {
                    if (response != null) {
                        try {
                            response.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                response.close();
            }
            return string;
        }
    }

    private String resolveAgentBaseUrl(AgentCardWrapper agentCard) {
        return agentCard.url();
    }
}

