/*
 * Decompiled with CFR 0.152.
 */
package com.apollographql.subscription.callback;

import com.apollographql.subscription.callback.SubscriptionCallback;
import com.apollographql.subscription.exception.CallbackInitializationFailedException;
import com.apollographql.subscription.exception.InactiveSubscriptionException;
import com.apollographql.subscription.message.CallbackMessageCheck;
import com.apollographql.subscription.message.CallbackMessageComplete;
import com.apollographql.subscription.message.CallbackMessageNext;
import com.apollographql.subscription.message.SubscritionCallbackMessage;
import graphql.ExecutionResult;
import graphql.GraphQLError;
import graphql.GraphqlErrorBuilder;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import org.springframework.graphql.ExecutionGraphQlRequest;
import org.springframework.graphql.ExecutionGraphQlService;
import org.springframework.graphql.server.WebGraphQlRequest;
import org.springframework.http.HttpStatusCode;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class SubscriptionCallbackHandler {
    private static final Log logger = LogFactory.getLog(SubscriptionCallbackHandler.class);
    public static final String SUBSCRIPTION_PROTOCOL_HEADER = "subscription-protocol";
    public static final String SUBSCRIPTION_PROTOCOL_HEADER_VALUE = "callback/1.0";
    private final ExecutionGraphQlService graphQlService;
    private final Scheduler scheduler;

    public SubscriptionCallbackHandler(ExecutionGraphQlService graphQlService) {
        this(graphQlService, Schedulers.boundedElastic());
    }

    public SubscriptionCallbackHandler(ExecutionGraphQlService graphQlService, Scheduler scheduler) {
        this.graphQlService = graphQlService;
        this.scheduler = scheduler;
    }

    @NotNull
    public Mono<ExecutionResult> handleSubscriptionUsingCallback(@NotNull WebGraphQlRequest graphQlRequest, @NotNull SubscriptionCallback callback) {
        if (logger.isDebugEnabled()) {
            logger.debug((Object)("Starting subscription callback: " + String.valueOf(callback)));
        }
        WebClient client = WebClient.builder().baseUrl(callback.callback_url()).build();
        CallbackMessageCheck checkMessage = new CallbackMessageCheck(callback.subscription_id(), callback.verifier());
        return ((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)client.post().header("Content-Type", new String[]{"application/json"})).header(SUBSCRIPTION_PROTOCOL_HEADER, new String[]{SUBSCRIPTION_PROTOCOL_HEADER_VALUE})).headers(httpHeaders -> httpHeaders.putAll(callback.context()))).bodyValue((Object)checkMessage).exchangeToMono(checkResponse -> {
            HttpStatusCode responseStatusCode = checkResponse.statusCode();
            if (responseStatusCode.is2xxSuccessful()) {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("Subscription callback init successful: " + String.valueOf(callback)));
                }
                Flux<SubscritionCallbackMessage> subscription = this.startSubscription(client, graphQlRequest, callback);
                return Mono.just((Object)this.emptyResult()).publishOn(this.scheduler).doOnSubscribe(subscribed -> subscription.subscribe());
            }
            return Mono.error((Throwable)new CallbackInitializationFailedException(callback, responseStatusCode.value()));
        });
    }

    private ExecutionResult emptyResult() {
        return ExecutionResult.newExecutionResult().data(null).build();
    }

    @NotNull
    protected Flux<SubscritionCallbackMessage> startSubscription(@NotNull WebClient callbackClient, @NotNull WebGraphQlRequest graphQlRequest, @NotNull SubscriptionCallback callback) {
        Flux<SubscritionCallbackMessage> heartbeatFlux;
        if (callback.heartbeatIntervalMs() > 0) {
            CallbackMessageCheck checkMessage = new CallbackMessageCheck(callback.subscription_id(), callback.verifier());
            heartbeatFlux = this.heartbeatFlux(callbackClient, checkMessage, callback);
        } else {
            heartbeatFlux = Flux.empty();
        }
        Flux subscriptionFlux = this.graphQlService.execute((ExecutionGraphQlRequest)graphQlRequest).flatMapMany(subscriptionData -> {
            Flux responseFlux = subscriptionData.getData() instanceof Publisher ? Flux.from((Publisher)((Publisher)subscriptionData.getData())).map(ExecutionResult::toSpecification) : Flux.just((Object)subscriptionData.toMap());
            return responseFlux.map(data -> new CallbackMessageNext(callback.subscription_id(), callback.verifier(), (Map<String, Object>)data)).concatWith((Publisher)Mono.just((Object)new CallbackMessageComplete(callback.subscription_id(), callback.verifier()))).onErrorResume(e -> {
                GraphQLError error = GraphqlErrorBuilder.newError().message(e.getMessage(), new Object[0]).build();
                return Mono.just((Object)new CallbackMessageComplete(callback.subscription_id(), callback.verifier(), List.of(error)));
            });
        }).publishOn(this.scheduler).concatMap(message -> ((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)callbackClient.post().header("Content-Type", new String[]{"application/json"})).header(SUBSCRIPTION_PROTOCOL_HEADER, new String[]{SUBSCRIPTION_PROTOCOL_HEADER_VALUE})).headers(httpHeaders -> httpHeaders.putAll(callback.context()))).bodyValue(message).exchangeToMono(routerResponse -> {
            if (routerResponse.statusCode().is2xxSuccessful()) {
                return Mono.just((Object)message);
            }
            return Mono.error((Throwable)new InactiveSubscriptionException(callback));
        })).doOnError(e -> {
            if (logger.isErrorEnabled()) {
                logger.error((Object)"Subscription terminated abnormally due to exception", e);
            }
        }).publish().refCount(2);
        return Flux.merge((Publisher[])new Publisher[]{subscriptionFlux, heartbeatFlux.takeUntilOther((Publisher)subscriptionFlux.ignoreElements())});
    }

    private Flux<SubscritionCallbackMessage> heartbeatFlux(WebClient client, CallbackMessageCheck check, SubscriptionCallback callback) {
        return Flux.just((Object)check).delayElements(Duration.ofMillis(callback.heartbeatIntervalMs())).publishOn(this.scheduler).concatMap(heartbeat -> ((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)client.post().header("Content-Type", new String[]{"application/json"})).header(SUBSCRIPTION_PROTOCOL_HEADER, new String[]{SUBSCRIPTION_PROTOCOL_HEADER_VALUE})).headers(httpHeaders -> httpHeaders.putAll(callback.context()))).bodyValue(heartbeat).exchangeToFlux(heartBeatResponse -> {
            if (heartBeatResponse.statusCode().is2xxSuccessful()) {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("Subscription callback heartbeat successful: " + String.valueOf(callback)));
                }
                return this.heartbeatFlux(client, (CallbackMessageCheck)heartbeat, callback);
            }
            if (logger.isWarnEnabled()) {
                logger.warn((Object)("Subscription callback heartbeat failed: " + String.valueOf(callback)));
            }
            return Flux.error((Throwable)new InactiveSubscriptionException(callback));
        }));
    }
}

