/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.graphql.client;

import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.graphql.GraphQlRequest;
import org.springframework.graphql.GraphQlResponse;
import org.springframework.graphql.ResponseError;
import org.springframework.graphql.client.CodecDelegate;
import org.springframework.graphql.client.GraphQlTransport;
import org.springframework.graphql.client.ResponseMapGraphQlResponse;
import org.springframework.graphql.client.SubscriptionErrorException;
import org.springframework.graphql.client.WebSocketDisconnectedException;
import org.springframework.graphql.client.WebSocketGraphQlClientInterceptor;
import org.springframework.graphql.server.support.GraphQlWebSocketMessage;
import org.springframework.graphql.server.support.GraphQlWebSocketMessageType;
import org.springframework.http.HttpHeaders;
import org.springframework.http.codec.CodecConfigurer;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

final class WebSocketGraphQlTransport
implements GraphQlTransport {
    private static final Log logger = LogFactory.getLog(WebSocketGraphQlTransport.class);
    private final URI url;
    private final HttpHeaders headers = new HttpHeaders();
    private final WebSocketClient webSocketClient;
    private final GraphQlSessionHandler graphQlSessionHandler;
    private final Mono<GraphQlSession> graphQlSessionMono;

    WebSocketGraphQlTransport(URI url, @Nullable HttpHeaders headers, WebSocketClient client, CodecConfigurer codecConfigurer, WebSocketGraphQlClientInterceptor interceptor) {
        Assert.notNull((Object)url, (String)"URI is required");
        Assert.notNull((Object)client, (String)"WebSocketClient is required");
        Assert.notNull((Object)codecConfigurer, (String)"CodecConfigurer is required");
        Assert.notNull((Object)interceptor, (String)"WebSocketGraphQlClientInterceptor is required");
        this.url = url;
        this.headers.putAll((Map)(headers != null ? headers : HttpHeaders.EMPTY));
        this.webSocketClient = client;
        this.graphQlSessionHandler = new GraphQlSessionHandler(codecConfigurer, interceptor);
        this.graphQlSessionMono = WebSocketGraphQlTransport.initGraphQlSession(this.url, this.headers, client, this.graphQlSessionHandler).cacheInvalidateWhen(GraphQlSession::notifyWhenClosed);
    }

    private static Mono<GraphQlSession> initGraphQlSession(URI uri, HttpHeaders headers, WebSocketClient client, GraphQlSessionHandler handler) {
        return Mono.defer(() -> {
            if (handler.isStopped()) {
                return Mono.error((Throwable)new IllegalStateException("WebSocketGraphQlTransport has been stopped"));
            }
            client.execute(uri, headers, (WebSocketHandler)handler).subscribe(aVoid -> {}, handler::handleWebSocketSessionError, () -> {});
            return handler.getGraphQlSession();
        });
    }

    public URI getUrl() {
        return this.url;
    }

    public HttpHeaders getHeaders() {
        return this.headers;
    }

    public WebSocketClient getWebSocketClient() {
        return this.webSocketClient;
    }

    public CodecConfigurer getCodecConfigurer() {
        return this.graphQlSessionHandler.getCodecConfigurer();
    }

    public Mono<Void> start() {
        this.graphQlSessionHandler.setStopped(false);
        return this.graphQlSessionMono.then();
    }

    public Mono<Void> stop() {
        this.graphQlSessionHandler.setStopped(true);
        return this.graphQlSessionMono.flatMap(GraphQlSession::close).onErrorResume(ex -> Mono.empty());
    }

    @Override
    public Mono<GraphQlResponse> execute(GraphQlRequest request) {
        return this.graphQlSessionMono.flatMap(session -> session.execute(request));
    }

    @Override
    public Flux<GraphQlResponse> executeSubscription(GraphQlRequest request) {
        return this.graphQlSessionMono.flatMapMany(session -> session.executeSubscription(request));
    }

    private static class SubscriptionState
    extends AbstractRequestState {
        private final Sinks.Many<GraphQlResponse> sink = Sinks.many().unicast().onBackpressureBuffer();

        SubscriptionState(GraphQlRequest request) {
            super(request);
        }

        public Sinks.Many<GraphQlResponse> sink() {
            return this.sink;
        }

        @Override
        protected void emitDisconnectError(WebSocketDisconnectedException ex) {
            this.sink.tryEmitError((Throwable)((Object)ex));
        }
    }

    private static class ResponseState
    extends AbstractRequestState {
        private final Sinks.One<GraphQlResponse> sink = Sinks.one();

        ResponseState(GraphQlRequest request) {
            super(request);
        }

        public Sinks.One<GraphQlResponse> sink() {
            return this.sink;
        }

        @Override
        protected void emitDisconnectError(WebSocketDisconnectedException ex) {
            this.sink.tryEmitError((Throwable)((Object)ex));
        }
    }

    private static abstract class AbstractRequestState {
        private final GraphQlRequest request;

        public AbstractRequestState(GraphQlRequest request) {
            this.request = request;
        }

        public GraphQlRequest request() {
            return this.request;
        }

        public void emitDisconnectError(String message, CloseStatus closeStatus) {
            this.emitDisconnectError(new WebSocketDisconnectedException(message, this.request, closeStatus));
        }

        protected abstract void emitDisconnectError(WebSocketDisconnectedException var1);
    }

    private static interface DisposableConnection {
        public Mono<Void> close(CloseStatus var1);

        public Mono<Void> notifyWhenClosed();

        public String getDescription();

        public static DisposableConnection from(final WebSocketSession session) {
            return new DisposableConnection(){

                @Override
                public Mono<Void> close(CloseStatus status) {
                    return session.close(status);
                }

                @Override
                public Mono<Void> notifyWhenClosed() {
                    return session.closeStatus().then();
                }

                @Override
                public String getDescription() {
                    return session.toString();
                }
            };
        }
    }

    private static class GraphQlSession {
        private final DisposableConnection connection;
        private final AtomicLong requestIndex = new AtomicLong();
        private final Sinks.Many<GraphQlWebSocketMessage> requestSink = Sinks.many().unicast().onBackpressureBuffer();
        private final Map<String, ResponseState> responseMap = new ConcurrentHashMap<String, ResponseState>();
        private final Map<String, SubscriptionState> subscriptionMap = new ConcurrentHashMap<String, SubscriptionState>();

        GraphQlSession(WebSocketSession webSocketSession) {
            this.connection = DisposableConnection.from(webSocketSession);
        }

        public Flux<GraphQlWebSocketMessage> getRequestFlux() {
            return this.requestSink.asFlux();
        }

        public Mono<GraphQlResponse> execute(GraphQlRequest request) {
            String id = String.valueOf(this.requestIndex.incrementAndGet());
            try {
                GraphQlWebSocketMessage message = GraphQlWebSocketMessage.subscribe(id, request);
                ResponseState state = new ResponseState(request);
                this.responseMap.put(id, state);
                this.trySend(message);
                return state.sink().asMono().doOnCancel(() -> this.responseMap.remove(id));
            }
            catch (Exception ex) {
                this.responseMap.remove(id);
                return Mono.error((Throwable)ex);
            }
        }

        public Flux<GraphQlResponse> executeSubscription(GraphQlRequest request) {
            String id = String.valueOf(this.requestIndex.incrementAndGet());
            try {
                GraphQlWebSocketMessage message = GraphQlWebSocketMessage.subscribe(id, request);
                SubscriptionState state = new SubscriptionState(request);
                this.subscriptionMap.put(id, state);
                this.trySend(message);
                return state.sink().asFlux().doOnCancel(() -> this.stopSubscription(id));
            }
            catch (Exception ex) {
                this.subscriptionMap.remove(id);
                return Flux.error((Throwable)ex);
            }
        }

        public void sendPong(@Nullable Map<String, Object> payload) {
            GraphQlWebSocketMessage message = GraphQlWebSocketMessage.pong(payload);
            this.trySend(message);
        }

        private void trySend(GraphQlWebSocketMessage message) {
            Sinks.EmitResult emitResult = null;
            for (int i = 0; i < 100 && (emitResult = this.requestSink.tryEmitNext((Object)message)) == Sinks.EmitResult.FAIL_NON_SERIALIZED; ++i) {
            }
            Assert.state((boolean)emitResult.isSuccess(), (String)("Failed to send request: " + emitResult));
        }

        private void stopSubscription(String id) {
            SubscriptionState state = this.subscriptionMap.remove(id);
            if (state != null) {
                try {
                    this.trySend(GraphQlWebSocketMessage.complete(id));
                }
                catch (Exception ex) {
                    if (logger.isErrorEnabled()) {
                        logger.error((Object)("Closing " + this.connection.getDescription() + " after failure to send 'complete' for subscription id='" + id + "'."));
                    }
                    this.connection.close(CloseStatus.PROTOCOL_ERROR).subscribe();
                }
            }
        }

        public void handleNext(GraphQlWebSocketMessage message) {
            Sinks.EmitResult emitResult;
            String id = message.getId();
            ResponseState responseState = this.responseMap.remove(id);
            SubscriptionState subscriptionState = this.subscriptionMap.get(id);
            if (responseState == null && subscriptionState == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("No receiver for message: " + message));
                }
                return;
            }
            Map responseMap = (Map)message.getPayload();
            ResponseMapGraphQlResponse graphQlResponse = new ResponseMapGraphQlResponse(responseMap);
            Sinks.EmitResult emitResult2 = emitResult = responseState != null ? responseState.sink().tryEmitValue((Object)graphQlResponse) : subscriptionState.sink().tryEmitNext((Object)graphQlResponse);
            if (emitResult.isFailure() && logger.isDebugEnabled()) {
                logger.debug((Object)("Message: " + message + " could not be emitted: " + emitResult));
            }
        }

        public void handleError(GraphQlWebSocketMessage message) {
            Sinks.EmitResult emitResult;
            String id = message.getId();
            ResponseState responseState = this.responseMap.remove(id);
            SubscriptionState subscriptionState = this.subscriptionMap.remove(id);
            if (responseState == null && subscriptionState == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("No receiver for message: " + message));
                }
                return;
            }
            List errorList = (List)message.getPayload();
            ResponseMapGraphQlResponse response = new ResponseMapGraphQlResponse(Collections.singletonMap("errors", errorList));
            if (responseState != null) {
                emitResult = responseState.sink().tryEmitValue((Object)response);
            } else {
                List<ResponseError> errors = response.getErrors();
                SubscriptionErrorException ex = new SubscriptionErrorException(subscriptionState.request(), errors);
                emitResult = subscriptionState.sink().tryEmitError((Throwable)((Object)ex));
            }
            if (emitResult.isFailure() && logger.isDebugEnabled()) {
                logger.debug((Object)("Error: " + message + " could not be emitted: " + emitResult));
            }
        }

        public void handleComplete(GraphQlWebSocketMessage message) {
            ResponseState responseState = this.responseMap.remove(message.getId());
            SubscriptionState subscriptionState = this.subscriptionMap.remove(message.getId());
            if (responseState != null) {
                responseState.sink().tryEmitEmpty();
            } else if (subscriptionState != null) {
                subscriptionState.sink().tryEmitComplete();
            }
        }

        public Mono<Void> notifyWhenClosed() {
            return this.connection.notifyWhenClosed();
        }

        public Mono<Void> close() {
            return this.connection.close(CloseStatus.GOING_AWAY);
        }

        public void terminateRequests(String message, CloseStatus status) {
            this.responseMap.values().forEach(info -> info.emitDisconnectError(message, status));
            this.subscriptionMap.values().forEach(info -> info.emitDisconnectError(message, status));
            this.responseMap.clear();
            this.subscriptionMap.clear();
        }

        public String toString() {
            return "GraphQlSession over " + this.connection.getDescription();
        }
    }

    private static class GraphQlSessionHandler
    implements WebSocketHandler {
        private final CodecDelegate codecDelegate;
        private final WebSocketGraphQlClientInterceptor interceptor;
        private Sinks.One<GraphQlSession> graphQlSessionSink;
        private final AtomicBoolean stopped = new AtomicBoolean();

        GraphQlSessionHandler(CodecConfigurer codecConfigurer, WebSocketGraphQlClientInterceptor interceptor) {
            this.codecDelegate = new CodecDelegate(codecConfigurer);
            this.interceptor = interceptor;
            this.graphQlSessionSink = Sinks.unsafe().one();
        }

        public CodecConfigurer getCodecConfigurer() {
            return this.codecDelegate.getCodecConfigurer();
        }

        public List<String> getSubProtocols() {
            return Collections.singletonList("graphql-transport-ws");
        }

        public Mono<GraphQlSession> getGraphQlSession() {
            return this.graphQlSessionSink.asMono();
        }

        public void setStopped(boolean stopped) {
            this.stopped.set(stopped);
        }

        public boolean isStopped() {
            return this.stopped.get();
        }

        public Mono<Void> handle(WebSocketSession session) {
            Assert.state((boolean)this.sessionNotInitialized(), (String)"This handler supports only one session at a time, for shared use.");
            GraphQlSession graphQlSession = new GraphQlSession(session);
            this.registerCloseStatusHandling(graphQlSession, session);
            Mono connectionInitMono = this.interceptor.connectionInitPayload().defaultIfEmpty(Collections.emptyMap()).map(GraphQlWebSocketMessage::connectionInit);
            Mono sendCompletion = session.send((Publisher)connectionInitMono.concatWith(graphQlSession.getRequestFlux()).map(message -> this.codecDelegate.encode(session, (GraphQlWebSocketMessage)message)));
            Mono receiveCompletion = session.receive().flatMap(webSocketMessage -> {
                if (this.sessionNotInitialized()) {
                    try {
                        GraphQlWebSocketMessage message = this.codecDelegate.decode((WebSocketMessage)webSocketMessage);
                        Assert.state((message.resolvedType() == GraphQlWebSocketMessageType.CONNECTION_ACK ? 1 : 0) != 0, () -> "Unexpected message before connection_ack: " + message);
                        return this.interceptor.handleConnectionAck((Map)message.getPayload()).then(Mono.defer(() -> {
                            Sinks.EmitResult result;
                            if (logger.isDebugEnabled()) {
                                logger.debug((Object)(graphQlSession + " initialized"));
                            }
                            if ((result = this.graphQlSessionSink.tryEmitValue((Object)graphQlSession)).isFailure()) {
                                return Mono.error((Throwable)new IllegalStateException("GraphQlSession initialized but could not be emitted: " + result));
                            }
                            return Mono.empty();
                        }));
                    }
                    catch (Throwable ex) {
                        this.graphQlSessionSink.tryEmitError(ex);
                        return Mono.error((Throwable)ex);
                    }
                }
                try {
                    GraphQlWebSocketMessage message = this.codecDelegate.decode((WebSocketMessage)webSocketMessage);
                    switch (message.resolvedType()) {
                        case NEXT: {
                            graphQlSession.handleNext(message);
                            break;
                        }
                        case PING: {
                            graphQlSession.sendPong(null);
                            break;
                        }
                        case ERROR: {
                            graphQlSession.handleError(message);
                            break;
                        }
                        case COMPLETE: {
                            graphQlSession.handleComplete(message);
                            break;
                        }
                        default: {
                            throw new IllegalStateException("Unexpected message type: '" + message.getType() + "'");
                        }
                    }
                }
                catch (Exception ex) {
                    if (logger.isErrorEnabled()) {
                        logger.error((Object)("Closing " + session + ": " + ex));
                    }
                    return session.close(new CloseStatus(4400, "Invalid message"));
                }
                return Mono.empty();
            }).then();
            return Mono.zip((Mono)sendCompletion, (Mono)receiveCompletion).then();
        }

        private boolean sessionNotInitialized() {
            return !Boolean.TRUE.equals(this.graphQlSessionSink.scan(Scannable.Attr.TERMINATED));
        }

        private void registerCloseStatusHandling(GraphQlSession graphQlSession, WebSocketSession session) {
            session.closeStatus().defaultIfEmpty((Object)CloseStatus.NO_STATUS_CODE).doOnNext(closeStatus -> {
                String closeStatusMessage = this.initCloseStatusMessage((CloseStatus)closeStatus, null, graphQlSession);
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)closeStatusMessage);
                }
                graphQlSession.terminateRequests(closeStatusMessage, (CloseStatus)closeStatus);
            }).doOnError(cause -> {
                CloseStatus closeStatus = CloseStatus.NO_STATUS_CODE;
                String closeStatusMessage = this.initCloseStatusMessage(closeStatus, (Throwable)cause, graphQlSession);
                if (logger.isErrorEnabled()) {
                    logger.error((Object)closeStatusMessage);
                }
                graphQlSession.terminateRequests(closeStatusMessage, closeStatus);
            }).doOnTerminate(() -> {
                this.graphQlSessionSink = Sinks.unsafe().one();
            }).subscribe();
        }

        private String initCloseStatusMessage(CloseStatus status, @Nullable Throwable ex, GraphQlSession session) {
            String reason = session + " disconnected";
            reason = this.isStopped() ? session + " was stopped" : (ex != null ? reason + ", closeStatus() completed with error " + ex : (!status.equals((Object)CloseStatus.NO_STATUS_CODE) ? reason + " with " + status : reason + " without a status"));
            return reason;
        }

        public void handleWebSocketSessionError(Throwable ex) {
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Session handling error: " + ex.getMessage()), ex);
            } else if (logger.isErrorEnabled()) {
                logger.error((Object)("Session handling error: " + ex.getMessage()));
            }
            this.graphQlSessionSink.tryEmitError(ex);
        }
    }
}

