/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.graphql.server.webmvc;

import graphql.ErrorClassification;
import graphql.ExecutionResult;
import graphql.GraphQLError;
import graphql.GraphqlErrorBuilder;
import java.io.IOException;
import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.commons.logging.Log;
import org.jspecify.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.springframework.graphql.execution.ErrorType;
import org.springframework.graphql.execution.SubscriptionPublisherException;
import org.springframework.graphql.server.WebGraphQlHandler;
import org.springframework.graphql.server.WebGraphQlResponse;
import org.springframework.graphql.server.webmvc.AbstractGraphQlHttpHandler;
import org.springframework.web.context.request.async.AsyncRequestTimeoutException;
import org.springframework.web.servlet.function.ServerRequest;
import org.springframework.web.servlet.function.ServerResponse;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public class GraphQlSseHandler
extends AbstractGraphQlHttpHandler {
    private static final Map<String, Object> HEARTBEAT_MAP = new LinkedHashMap<String, Object>(0);
    private final @Nullable Duration timeout;
    private final @Nullable Duration keepAliveDuration;

    public GraphQlSseHandler(WebGraphQlHandler graphQlHandler) {
        this(graphQlHandler, null, null);
    }

    public GraphQlSseHandler(WebGraphQlHandler graphQlHandler, @Nullable Duration timeout) {
        this(graphQlHandler, timeout, null);
    }

    public GraphQlSseHandler(WebGraphQlHandler graphQlHandler, @Nullable Duration timeout, @Nullable Duration keepAliveDuration) {
        super(graphQlHandler, null);
        this.timeout = timeout;
        this.keepAliveDuration = keepAliveDuration;
    }

    @Override
    protected ServerResponse prepareResponse(ServerRequest request, Mono<WebGraphQlResponse> responseMono) {
        Flux resultFlux = responseMono.flatMapMany(response -> {
            if (response.getData() instanceof Publisher) {
                Publisher publisher = (Publisher)response.getData();
                return Flux.from((Publisher)publisher).map(ExecutionResult::toSpecification);
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("A subscription DataFetcher must return a Publisher: " + String.valueOf(response.getData())));
            }
            return Flux.just((Object)ExecutionResult.newExecutionResult().addError(GraphQLError.newError().errorType((ErrorClassification)graphql.ErrorType.OperationNotSupported).message("SSE handler supports only subscriptions", new Object[0]).build()).build().toSpecification());
        });
        return this.timeout != null ? ServerResponse.sse(SseSubscriber.connect((Flux<Map<String, Object>>)resultFlux, this.logger, this.keepAliveDuration), (Duration)this.timeout) : ServerResponse.sse(SseSubscriber.connect((Flux<Map<String, Object>>)resultFlux, this.logger, this.keepAliveDuration));
    }

    private static final class SseSubscriber
    extends BaseSubscriber<Map<String, Object>> {
        private final ServerResponse.SseBuilder sseBuilder;
        private final Log logger;

        private SseSubscriber(ServerResponse.SseBuilder sseBuilder, Log logger) {
            this.sseBuilder = sseBuilder;
            this.sseBuilder.onTimeout(() -> this.cancelWithError((Throwable)new AsyncRequestTimeoutException()));
            this.logger = logger;
        }

        protected void hookOnNext(Map<String, Object> value) {
            if (value == HEARTBEAT_MAP) {
                this.sendHeartbeat();
                return;
            }
            this.sendNext(value);
        }

        private void sendNext(Map<String, Object> value) {
            try {
                this.sseBuilder.event("next");
                this.sseBuilder.data(value);
            }
            catch (IOException exception) {
                this.cancelWithError(exception);
            }
        }

        private void sendHeartbeat() {
            try {
                this.sseBuilder.comment(" ");
                this.sseBuilder.send();
            }
            catch (IOException exception) {
                this.cancelWithError(exception);
            }
        }

        private void cancelWithError(Throwable ex) {
            this.cancel();
            this.sseBuilder.error(ex);
        }

        protected void hookOnError(Throwable ex) {
            Map errorMap;
            if (ex instanceof SubscriptionPublisherException) {
                SubscriptionPublisherException spe = (SubscriptionPublisherException)((Object)ex);
                errorMap = spe.toMap();
            } else {
                if (this.logger.isErrorEnabled()) {
                    this.logger.error((Object)("Unresolved " + ex.getClass().getSimpleName()), ex);
                }
                errorMap = GraphqlErrorBuilder.newError().message("Subscription error", new Object[0]).errorType((ErrorClassification)ErrorType.INTERNAL_ERROR).build().toSpecification();
            }
            this.sendNext(errorMap);
            this.sendComplete();
        }

        private void sendComplete() {
            try {
                this.sseBuilder.event("complete").data((Object)"");
            }
            catch (IOException exc) {
                throw new RuntimeException(exc);
            }
            this.sseBuilder.complete();
        }

        protected void hookOnComplete() {
            this.sendComplete();
        }

        static Consumer<ServerResponse.SseBuilder> connect(Flux<Map<String, Object>> resultFlux, Log logger, @Nullable Duration keepAliveDuration) {
            return sseBuilder -> {
                SseSubscriber subscriber = new SseSubscriber((ServerResponse.SseBuilder)sseBuilder, logger);
                if (keepAliveDuration != null) {
                    KeepAliveHandler handler = new KeepAliveHandler(keepAliveDuration);
                    handler.compose(resultFlux).subscribe((CoreSubscriber)subscriber);
                } else {
                    resultFlux.subscribe((CoreSubscriber)subscriber);
                }
            };
        }
    }

    private static final class KeepAliveHandler {
        private final Duration keepAliveDuration;
        private boolean eventSent;
        private final Sinks.Empty<Void> completionSink = Sinks.empty();

        KeepAliveHandler(Duration keepAliveDuration) {
            this.keepAliveDuration = keepAliveDuration;
        }

        public Flux<Map<String, Object>> compose(Flux<Map<String, Object>> flux) {
            return flux.doOnNext(event -> {
                this.eventSent = true;
            }).doOnComplete(() -> this.completionSink.tryEmitEmpty()).mergeWith(this.getKeepAliveFlux()).takeUntilOther((Publisher)this.completionSink.asMono());
        }

        private Flux<Map<String, Object>> getKeepAliveFlux() {
            return Flux.interval((Duration)this.keepAliveDuration, (Duration)this.keepAliveDuration).filter(aLong -> !this.checkEventSentAndClear()).map(aLong -> HEARTBEAT_MAP);
        }

        private boolean checkEventSentAndClear() {
            boolean result = this.eventSent;
            this.eventSent = false;
            return result;
        }
    }
}

