/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.webpubsub.client;

import com.azure.core.http.policy.RetryStrategy;
import com.azure.core.util.BinaryData;
import com.azure.core.util.UrlBuilder;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.webpubsub.client.implementation.LoggingUtils;
import com.azure.messaging.webpubsub.client.implementation.WebPubSubClientState;
import com.azure.messaging.webpubsub.client.implementation.WebPubSubConnection;
import com.azure.messaging.webpubsub.client.implementation.WebPubSubGroup;
import com.azure.messaging.webpubsub.client.implementation.models.AckMessage;
import com.azure.messaging.webpubsub.client.implementation.models.ConnectedMessage;
import com.azure.messaging.webpubsub.client.implementation.models.DisconnectedMessage;
import com.azure.messaging.webpubsub.client.implementation.models.GroupDataMessage;
import com.azure.messaging.webpubsub.client.implementation.models.JoinGroupMessage;
import com.azure.messaging.webpubsub.client.implementation.models.LeaveGroupMessage;
import com.azure.messaging.webpubsub.client.implementation.models.SendEventMessage;
import com.azure.messaging.webpubsub.client.implementation.models.SendToGroupMessage;
import com.azure.messaging.webpubsub.client.implementation.models.SequenceAckMessage;
import com.azure.messaging.webpubsub.client.implementation.models.ServerDataMessage;
import com.azure.messaging.webpubsub.client.implementation.models.WebPubSubMessage;
import com.azure.messaging.webpubsub.client.implementation.models.WebPubSubMessageAck;
import com.azure.messaging.webpubsub.client.implementation.websocket.ClientEndpointConfiguration;
import com.azure.messaging.webpubsub.client.implementation.websocket.CloseReason;
import com.azure.messaging.webpubsub.client.implementation.websocket.WebSocketClient;
import com.azure.messaging.webpubsub.client.implementation.websocket.WebSocketClientNettyImpl;
import com.azure.messaging.webpubsub.client.implementation.websocket.WebSocketSession;
import com.azure.messaging.webpubsub.client.models.AckResponseError;
import com.azure.messaging.webpubsub.client.models.ConnectedEvent;
import com.azure.messaging.webpubsub.client.models.DisconnectedEvent;
import com.azure.messaging.webpubsub.client.models.GroupMessageEvent;
import com.azure.messaging.webpubsub.client.models.RejoinGroupFailedEvent;
import com.azure.messaging.webpubsub.client.models.SendEventOptions;
import com.azure.messaging.webpubsub.client.models.SendMessageFailedException;
import com.azure.messaging.webpubsub.client.models.SendToGroupOptions;
import com.azure.messaging.webpubsub.client.models.ServerMessageEvent;
import com.azure.messaging.webpubsub.client.models.StoppedEvent;
import com.azure.messaging.webpubsub.client.models.WebPubSubDataFormat;
import com.azure.messaging.webpubsub.client.models.WebPubSubProtocolType;
import com.azure.messaging.webpubsub.client.models.WebPubSubResult;
import java.io.Closeable;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues;
import reactor.util.retry.Retry;

final class WebPubSubAsyncClient
implements Closeable {
    private ClientLogger logger;
    private final AtomicReference<ClientLogger> loggerReference = new AtomicReference();
    private final Mono<String> clientAccessUrlProvider;
    private final WebPubSubProtocolType webPubSubProtocol;
    private final boolean autoReconnect;
    private final boolean autoRestoreGroup;
    private final String applicationId;
    private final ClientEndpointConfiguration clientEndpointConfiguration;
    private final WebSocketClient webSocketClient;
    private WebSocketSession webSocketSession;
    private Sinks.Many<GroupMessageEvent> groupMessageEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
    private Sinks.Many<ServerMessageEvent> serverMessageEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
    private Sinks.Many<AckMessage> ackMessageSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
    private Sinks.Many<ConnectedEvent> connectedEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
    private Sinks.Many<DisconnectedEvent> disconnectedEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
    private Sinks.Many<StoppedEvent> stoppedEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
    private Sinks.Many<RejoinGroupFailedEvent> rejoinGroupFailedEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
    private final AtomicLong ackId = new AtomicLong(0L);
    private WebPubSubConnection webPubSubConnection;
    private final AtomicReference<Disposable> sequenceAckTask = new AtomicReference();
    private final ClientState clientState = new ClientState();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final Sinks.Empty<Void> isClosedMono = Sinks.empty();
    private final AtomicBoolean isStoppedByUser = new AtomicBoolean();
    private final AtomicReference<Sinks.Empty<Void>> isStoppedByUserSink = new AtomicReference();
    private final ConcurrentMap<String, WebPubSubGroup> groups = new ConcurrentHashMap<String, WebPubSubGroup>();
    private final Retry sendMessageRetrySpec;
    private static final Duration ACK_TIMEOUT = Duration.ofSeconds(30L);
    private static final Duration RECOVER_TIMEOUT = Duration.ofSeconds(30L);
    private static final Retry RECONNECT_RETRY_SPEC = Retry.backoff((long)Long.MAX_VALUE, (Duration)Duration.ofSeconds(1L)).filter(thr -> !(thr instanceof StopReconnectException));
    private static final Duration CLOSE_AFTER_SESSION_OPEN_DELAY = Duration.ofMillis(100L);
    private static final Duration SEQUENCE_ACK_DELAY = Duration.ofSeconds(5L);

    WebPubSubAsyncClient(WebSocketClient webSocketClient, Supplier<String> clientAccessUrlSupplier, WebPubSubProtocolType webPubSubProtocol, String applicationId, String userAgent, RetryStrategy retryStrategy, boolean autoReconnect, boolean autoRestoreGroup) {
        this.updateLogger(applicationId, null);
        this.applicationId = applicationId;
        Objects.requireNonNull(clientAccessUrlSupplier);
        this.clientAccessUrlProvider = Mono.fromSupplier(clientAccessUrlSupplier).subscribeOn(Schedulers.boundedElastic());
        this.webPubSubProtocol = Objects.requireNonNull(webPubSubProtocol);
        this.autoReconnect = autoReconnect;
        this.autoRestoreGroup = autoRestoreGroup;
        this.clientEndpointConfiguration = new ClientEndpointConfiguration(webPubSubProtocol.toString(), userAgent);
        this.webSocketClient = webSocketClient == null ? new WebSocketClientNettyImpl() : webSocketClient;
        this.sendMessageRetrySpec = Retry.from(signals -> {
            AtomicInteger retryCount = new AtomicInteger(0);
            return signals.concatMap(s -> {
                int retryAttempt;
                Mono ret = Mono.error((Throwable)s.failure());
                if (s.failure() instanceof SendMessageFailedException && ((SendMessageFailedException)((Object)((Object)((Object)s.failure())))).isTransient() && (retryAttempt = retryCount.incrementAndGet()) <= retryStrategy.getMaxRetries()) {
                    ret = Mono.delay((Duration)retryStrategy.calculateRetryDelay(retryAttempt)).then(Mono.just((Object)s));
                }
                return ret;
            });
        });
    }

    public String getConnectionId() {
        return this.webPubSubConnection == null ? null : this.webPubSubConnection.getConnectionId();
    }

    public Mono<Void> start() {
        return this.start(null);
    }

    Mono<Void> start(Runnable postStartTask) {
        if (this.clientState.get() == WebPubSubClientState.CLOSED) {
            return Mono.error((Throwable)this.logger.logExceptionAsError((RuntimeException)new IllegalStateException("Failed to start. Client is CLOSED.")));
        }
        return Mono.defer(() -> {
            this.logger.atInfo().addKeyValue("currentClientState", (Object)this.clientState.get()).log("Start client called.");
            this.isStoppedByUser.set(false);
            this.isStoppedByUserSink.set(null);
            boolean success = this.clientState.changeStateOn(WebPubSubClientState.STOPPED, WebPubSubClientState.CONNECTING);
            if (!success) {
                return Mono.error((Throwable)this.logger.logExceptionAsError((RuntimeException)new IllegalStateException("Failed to start. Client is not STOPPED.")));
            }
            if (postStartTask != null) {
                postStartTask.run();
            }
            return Mono.empty();
        }).then(this.clientAccessUrlProvider.flatMap(url -> Mono.fromRunnable(() -> {
            this.webSocketSession = this.webSocketClient.connectToServer(this.clientEndpointConfiguration, (String)url, this.loggerReference, this::handleMessage, this::handleSessionOpen, this::handleSessionClose);
        }).subscribeOn(Schedulers.boundedElastic()))).doOnError(error -> this.handleClientStop(false));
    }

    public Mono<Void> stop() {
        if (this.clientState.get() == WebPubSubClientState.CLOSED) {
            return Mono.error((Throwable)this.logger.logExceptionAsError((RuntimeException)new IllegalStateException("Failed to stop. Client is CLOSED.")));
        }
        return Mono.defer(() -> {
            this.logger.atInfo().addKeyValue("currentClientState", (Object)this.clientState.get()).log("Stop client called.");
            if (this.clientState.get() == WebPubSubClientState.STOPPED) {
                return Mono.empty();
            }
            if (this.clientState.get() == WebPubSubClientState.STOPPING) {
                return this.getStoppedByUserMono();
            }
            this.isStoppedByUser.compareAndSet(false, true);
            this.groups.clear();
            WebSocketSession localSession = this.webSocketSession;
            if (localSession != null && localSession.isOpen()) {
                this.clientState.changeState(WebPubSubClientState.STOPPING);
                return Mono.fromCallable(() -> {
                    localSession.close();
                    return null;
                }).subscribeOn(Schedulers.boundedElastic());
            }
            if (this.clientState.changeStateOn(WebPubSubClientState.DISCONNECTED, WebPubSubClientState.STOPPED)) {
                this.handleClientStop();
                return Mono.empty();
            }
            return this.getStoppedByUserMono();
        });
    }

    @Override
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            this.isClosedMono.asMono().block();
        } else {
            this.stop().then(Mono.fromRunnable(() -> {
                this.clientState.changeState(WebPubSubClientState.CLOSED);
                this.isClosedMono.emitEmpty(this.emitFailureHandler("Unable to emit Close"));
            })).block();
        }
    }

    public Mono<WebPubSubResult> joinGroup(String group) {
        return this.joinGroup(group, this.nextAckId());
    }

    public Mono<WebPubSubResult> joinGroup(String group, Long ackId) {
        Objects.requireNonNull(group);
        if (ackId == null) {
            ackId = this.nextAckId();
        }
        return this.sendMessage(new JoinGroupMessage().setGroup(group).setAckId(ackId)).then(this.waitForAckMessage(ackId)).retryWhen(this.sendMessageRetrySpec).map(result -> {
            this.groups.compute(group, (k, v) -> v == null ? new WebPubSubGroup(group).setJoined(true) : v.setJoined(true));
            return result;
        });
    }

    public Mono<WebPubSubResult> leaveGroup(String group) {
        return this.leaveGroup(group, this.nextAckId());
    }

    public Mono<WebPubSubResult> leaveGroup(String group, Long ackId) {
        Objects.requireNonNull(group);
        if (ackId == null) {
            ackId = this.nextAckId();
        }
        return this.sendMessage(new LeaveGroupMessage().setGroup(group).setAckId(ackId)).then(this.waitForAckMessage(ackId)).retryWhen(this.sendMessageRetrySpec).map(result -> {
            this.groups.compute(group, (k, v) -> v == null ? new WebPubSubGroup(group).setJoined(false) : v.setJoined(false));
            return result;
        });
    }

    public Mono<WebPubSubResult> sendToGroup(String group, String content) {
        return this.sendToGroup(group, BinaryData.fromString((String)content), WebPubSubDataFormat.TEXT);
    }

    public Mono<WebPubSubResult> sendToGroup(String group, String content, SendToGroupOptions options) {
        return this.sendToGroup(group, BinaryData.fromString((String)content), WebPubSubDataFormat.TEXT, options);
    }

    public Mono<WebPubSubResult> sendToGroup(String group, BinaryData content, WebPubSubDataFormat dataFormat) {
        return this.sendToGroup(group, content, dataFormat, new SendToGroupOptions().setAckId(this.nextAckId()));
    }

    public Mono<WebPubSubResult> sendToGroup(String group, BinaryData content, WebPubSubDataFormat dataFormat, SendToGroupOptions options) {
        Objects.requireNonNull(group);
        Objects.requireNonNull(content);
        Objects.requireNonNull(dataFormat);
        Objects.requireNonNull(options);
        Long ackId = options.isFireAndForget() ? null : Long.valueOf(options.getAckId() != null ? options.getAckId().longValue() : this.nextAckId());
        SendToGroupMessage message = new SendToGroupMessage().setGroup(group).setData(content).setDataType(dataFormat.toString()).setAckId(ackId).setNoEcho(options.isEchoDisabled());
        Mono<Void> sendMessageMono = this.sendMessage(message);
        Mono responseMono = sendMessageMono.then(this.waitForAckMessage(ackId));
        return responseMono.retryWhen(this.sendMessageRetrySpec);
    }

    public Mono<WebPubSubResult> sendEvent(String eventName, BinaryData content, WebPubSubDataFormat dataFormat) {
        return this.sendEvent(eventName, content, dataFormat, new SendEventOptions().setAckId(this.nextAckId()));
    }

    public Mono<WebPubSubResult> sendEvent(String eventName, BinaryData content, WebPubSubDataFormat dataFormat, SendEventOptions options) {
        Objects.requireNonNull(eventName);
        Objects.requireNonNull(content);
        Objects.requireNonNull(dataFormat);
        Objects.requireNonNull(options);
        Long ackId = options.isFireAndForget() ? null : Long.valueOf(options.getAckId() != null ? options.getAckId().longValue() : this.nextAckId());
        SendEventMessage message = new SendEventMessage().setEvent(eventName).setData(content).setDataType(dataFormat.toString()).setAckId(ackId);
        Mono<Void> sendMessageMono = this.sendMessage(message);
        Mono responseMono = sendMessageMono.then(this.waitForAckMessage(ackId));
        return responseMono.retryWhen(this.sendMessageRetrySpec);
    }

    public Flux<GroupMessageEvent> receiveGroupMessageEvents() {
        return this.groupMessageEventSink.asFlux();
    }

    public Flux<ServerMessageEvent> receiveServerMessageEvents() {
        return this.serverMessageEventSink.asFlux();
    }

    public Flux<ConnectedEvent> receiveConnectedEvents() {
        return this.connectedEventSink.asFlux();
    }

    public Flux<DisconnectedEvent> receiveDisconnectedEvents() {
        return this.disconnectedEventSink.asFlux();
    }

    public Flux<StoppedEvent> receiveStoppedEvents() {
        return this.stoppedEventSink.asFlux();
    }

    public Flux<RejoinGroupFailedEvent> receiveRejoinGroupFailedEvents() {
        return this.rejoinGroupFailedEventSink.asFlux();
    }

    private long nextAckId() {
        return this.ackId.getAndUpdate(value -> {
            if (++value < 0L) {
                value = 0L;
            }
            return value;
        });
    }

    private Flux<AckMessage> receiveAckMessages() {
        return this.ackMessageSink.asFlux();
    }

    private Mono<Void> sendMessage(WebPubSubMessage message) {
        return this.checkStateBeforeSend().then(Mono.create(sink -> this.webSocketSession.sendObjectAsync(message, sendResult -> {
            if (sendResult.isOK()) {
                sink.success();
            } else {
                sink.error((Throwable)this.logSendMessageFailedException("Failed to send message.", sendResult.getException(), true, message));
            }
        })));
    }

    private Mono<Void> checkStateBeforeSend() {
        return Mono.defer(() -> {
            WebPubSubClientState state = this.clientState.get();
            if (state == WebPubSubClientState.CLOSED) {
                return Mono.error((Throwable)this.logger.logExceptionAsError((RuntimeException)new IllegalStateException("Failed to send message. WebPubSubClient is CLOSED.")));
            }
            if (state != WebPubSubClientState.CONNECTED) {
                return Mono.error((Throwable)this.logSendMessageFailedException("Failed to send message. Client is " + state.name() + ".", null, state == WebPubSubClientState.RECOVERING || state == WebPubSubClientState.CONNECTING || state == WebPubSubClientState.RECONNECTING || state == WebPubSubClientState.DISCONNECTED, (Long)null));
            }
            if (this.webSocketSession == null || !this.webSocketSession.isOpen()) {
                return Mono.error((Throwable)this.logSendMessageFailedException("Failed to send message. Websocket session is not opened.", null, false, (Long)null));
            }
            return Mono.empty();
        });
    }

    private Mono<Void> getStoppedByUserMono() {
        Sinks.Empty<Void> sink = Sinks.empty();
        boolean isStoppedByUserMonoSet = this.isStoppedByUserSink.compareAndSet(null, sink);
        if (!isStoppedByUserMonoSet) {
            sink = this.isStoppedByUserSink.get();
        }
        return sink == null ? Mono.empty() : sink.asMono();
    }

    private void tryCompleteOnStoppedByUserSink() {
        Sinks.Empty mono = this.isStoppedByUserSink.getAndSet(null);
        if (mono != null) {
            mono.emitEmpty(this.emitFailureHandler("Unable to emit Stopped"));
        }
    }

    private <EventT> void tryEmitNext(Sinks.Many<EventT> sink, EventT event) {
        this.logger.atVerbose().addKeyValue("type", event.getClass().getSimpleName()).log("Send event");
        sink.emitNext(event, this.emitFailureHandler("Unable to emit " + event.getClass().getSimpleName()));
    }

    private Mono<WebPubSubResult> waitForAckMessage(Long ackId) {
        if (ackId == null) {
            return Mono.just((Object)new WebPubSubResult(null, false));
        }
        return this.receiveAckMessages().filter(m -> ackId.longValue() == m.getAckId()).next().onErrorMap(throwable -> this.logSendMessageFailedException("Acknowledge from the service not received.", (Throwable)throwable, true, ackId)).flatMap(m -> {
            if (m.isSuccess()) {
                return Mono.just((Object)new WebPubSubResult(m.getAckId(), false));
            }
            if (m.getError() != null && "Duplicate".equals(m.getError().getName())) {
                return Mono.just((Object)new WebPubSubResult(m.getAckId(), true));
            }
            return Mono.error((Throwable)this.logSendMessageFailedException("Received non-success acknowledge from the service.", null, false, ackId, m.getError()));
        }).timeout(ACK_TIMEOUT, Mono.empty()).switchIfEmpty(Mono.defer(() -> Mono.error((Throwable)this.logSendMessageFailedException("Acknowledge from the service not received.", null, true, ackId))));
    }

    private void handleSessionOpen(WebSocketSession session) {
        this.logger.atVerbose().log("Session opened");
        this.clientState.changeState(WebPubSubClientState.CONNECTED);
        if (this.isStoppedByUser.compareAndSet(true, false)) {
            Mono.delay((Duration)CLOSE_AFTER_SESSION_OPEN_DELAY).then(Mono.fromCallable(() -> {
                this.clientState.changeState(WebPubSubClientState.STOPPING);
                if (session != null && session.isOpen()) {
                    session.close();
                } else {
                    this.logger.atError().log("Failed to close session after session open");
                    this.handleClientStop();
                }
                return null;
            }).subscribeOn(Schedulers.boundedElastic())).subscribe(null, thr -> {
                this.logger.atError().log("Failed to close session after session open: " + thr.getMessage());
                this.handleClientStop();
            });
        } else {
            Flux sequenceAckFlux;
            Disposable previousTask;
            if (WebPubSubAsyncClient.isReliableProtocol(this.webPubSubProtocol) && (previousTask = this.sequenceAckTask.getAndSet((sequenceAckFlux = Flux.interval((Duration)SEQUENCE_ACK_DELAY).concatMap(ignored -> {
                if (this.clientState.get() == WebPubSubClientState.CONNECTED && session != null && session.isOpen()) {
                    WebPubSubConnection connection = this.webPubSubConnection;
                    if (connection != null) {
                        Long id = connection.getSequenceAckId().getUpdated();
                        if (id != null) {
                            return this.sendMessage(new SequenceAckMessage().setSequenceId(id)).onErrorResume(error -> {
                                connection.getSequenceAckId().setUpdated();
                                return Mono.empty();
                            });
                        }
                        return Mono.empty();
                    }
                    return Mono.empty();
                }
                return Mono.empty();
            })).subscribe())) != null) {
                previousTask.dispose();
            }
            if (this.autoRestoreGroup) {
                List restoreGroupMonoList = this.groups.values().stream().filter(WebPubSubGroup::isJoined).map(group -> this.joinGroup(group.getName()).onErrorResume(error -> {
                    if (error instanceof SendMessageFailedException) {
                        this.tryEmitNext(this.rejoinGroupFailedEventSink, new RejoinGroupFailedEvent(group.getName(), (SendMessageFailedException)((Object)((Object)((Object)error)))));
                    }
                    return Mono.empty();
                })).collect(Collectors.toList());
                Mono.delay((Duration)CLOSE_AFTER_SESSION_OPEN_DELAY).thenMany((Publisher)Flux.mergeSequentialDelayError(restoreGroupMonoList, (int)Schedulers.DEFAULT_POOL_SIZE, (int)Schedulers.DEFAULT_POOL_SIZE)).subscribe(null, thr -> this.logger.atWarning().log("Failed to auto restore group: " + thr.getMessage()));
            }
        }
    }

    private void handleSessionClose(CloseReason closeReason) {
        this.logger.atVerbose().addKeyValue("code", (long)closeReason.getCloseCode()).log("Session closed");
        int violatedPolicyStatusCode = 1008;
        if (this.clientState.get() == WebPubSubClientState.STOPPED) {
            return;
        }
        String connectionId = this.getConnectionId();
        if (this.isStoppedByUser.compareAndSet(true, false) || this.clientState.get() == WebPubSubClientState.STOPPING) {
            this.handleConnectionClose();
            this.handleClientStop();
        } else if (closeReason.getCloseCode() == 1008) {
            this.clientState.changeState(WebPubSubClientState.DISCONNECTED);
            this.handleConnectionClose();
            this.handleNoRecovery().subscribe(null, thr -> this.logger.atWarning().log("Failed to auto reconnect session: " + thr.getMessage()));
        } else {
            String reconnectionToken;
            WebPubSubConnection connection = this.webPubSubConnection;
            String string = reconnectionToken = connection == null ? null : connection.getReconnectionToken();
            if (!WebPubSubAsyncClient.isReliableProtocol(this.webPubSubProtocol) || reconnectionToken == null || connectionId == null) {
                this.clientState.changeState(WebPubSubClientState.DISCONNECTED);
                this.handleConnectionClose();
                this.handleNoRecovery().subscribe(null, thr -> this.logger.atWarning().log("Failed to auto reconnect session.", new Object[]{thr}));
            } else {
                this.handleRecovery(connectionId, reconnectionToken).timeout(RECOVER_TIMEOUT, Mono.defer(() -> {
                    this.clientState.changeState(WebPubSubClientState.DISCONNECTED);
                    this.handleConnectionClose();
                    return this.handleNoRecovery();
                })).subscribe(null, thr -> this.logger.atWarning().log("Failed to recover or reconnect session.", new Object[]{thr}));
            }
        }
    }

    private void handleMessage(WebPubSubMessage webPubSubMessage) {
        if (webPubSubMessage instanceof GroupDataMessage) {
            GroupDataMessage groupDataMessage = (GroupDataMessage)webPubSubMessage;
            boolean emitMessage = true;
            if (groupDataMessage.getSequenceId() != null) {
                emitMessage = this.updateSequenceAckId(groupDataMessage.getSequenceId());
            }
            if (emitMessage) {
                this.tryEmitNext(this.groupMessageEventSink, new GroupMessageEvent(groupDataMessage.getGroup(), groupDataMessage.getData(), groupDataMessage.getDataType(), groupDataMessage.getFromUserId(), groupDataMessage.getSequenceId()));
            }
        } else if (webPubSubMessage instanceof ServerDataMessage) {
            ServerDataMessage serverDataMessage = (ServerDataMessage)webPubSubMessage;
            boolean emitMessage = true;
            if (serverDataMessage.getSequenceId() != null) {
                emitMessage = this.updateSequenceAckId(serverDataMessage.getSequenceId());
            }
            if (emitMessage) {
                this.tryEmitNext(this.serverMessageEventSink, new ServerMessageEvent(serverDataMessage.getData(), serverDataMessage.getDataType(), serverDataMessage.getSequenceId()));
            }
        } else if (webPubSubMessage instanceof AckMessage) {
            this.tryEmitNext(this.ackMessageSink, (AckMessage)webPubSubMessage);
        } else if (webPubSubMessage instanceof ConnectedMessage) {
            ConnectedMessage connectedMessage = (ConnectedMessage)webPubSubMessage;
            String connectionId = connectedMessage.getConnectionId();
            this.updateLogger(this.applicationId, connectionId);
            if (this.webPubSubConnection == null) {
                this.webPubSubConnection = new WebPubSubConnection();
            }
            this.webPubSubConnection.updateForConnected(connectedMessage.getConnectionId(), connectedMessage.getReconnectionToken(), () -> this.tryEmitNext(this.connectedEventSink, new ConnectedEvent(connectionId, connectedMessage.getUserId())));
        } else if (webPubSubMessage instanceof DisconnectedMessage) {
            DisconnectedMessage disconnectedMessage = (DisconnectedMessage)webPubSubMessage;
            this.handleConnectionClose(new DisconnectedEvent(this.getConnectionId(), disconnectedMessage.getReason()));
        } else {
            ClientLogger logger = this.loggerReference.get();
            if (logger != null) {
                logger.atWarning().addKeyValue("type", webPubSubMessage.getClass()).addKeyValue("message", (Object)webPubSubMessage).log("Unknown message type. Skipping decode.");
            }
        }
    }

    private boolean updateSequenceAckId(long id) {
        WebPubSubConnection connection = this.webPubSubConnection;
        if (connection != null) {
            return connection.getSequenceAckId().update(id);
        }
        return false;
    }

    private Mono<Void> handleNoRecovery() {
        return Mono.defer(() -> {
            if (this.isStoppedByUser.compareAndSet(true, false)) {
                this.handleClientStop();
                return Mono.empty();
            }
            if (this.autoReconnect) {
                boolean success = this.clientState.changeStateOn(WebPubSubClientState.DISCONNECTED, WebPubSubClientState.RECONNECTING);
                if (!success) {
                    return Mono.error((Throwable)this.logger.logExceptionAsError((RuntimeException)new StopReconnectException("Failed to start. Client is not DISCONNECTED.")));
                }
                return Mono.defer(() -> {
                    if (this.isStoppedByUser.compareAndSet(true, false)) {
                        return Mono.error((Throwable)this.logger.logExceptionAsWarning((RuntimeException)new StopReconnectException("Client is stopped by user.")));
                    }
                    return Mono.empty();
                }).then(this.clientAccessUrlProvider.flatMap(url -> Mono.fromRunnable(() -> {
                    this.webSocketSession = this.webSocketClient.connectToServer(this.clientEndpointConfiguration, (String)url, this.loggerReference, this::handleMessage, this::handleSessionOpen, this::handleSessionClose);
                }).subscribeOn(Schedulers.boundedElastic()))).retryWhen(RECONNECT_RETRY_SPEC).doOnError(error -> this.handleClientStop());
            }
            this.handleClientStop();
            return Mono.empty();
        });
    }

    private Mono<Void> handleRecovery(String connectionId, String reconnectionToken) {
        return Mono.defer(() -> {
            if (this.isStoppedByUser.compareAndSet(true, false)) {
                this.handleClientStop();
                return Mono.empty();
            }
            boolean success = this.clientState.changeStateOn(WebPubSubClientState.CONNECTED, WebPubSubClientState.RECOVERING);
            if (!success) {
                return Mono.error((Throwable)this.logger.logExceptionAsError((RuntimeException)new StopReconnectException("Failed to recover. Client is not CONNECTED.")));
            }
            return Mono.defer(() -> {
                if (this.isStoppedByUser.compareAndSet(true, false)) {
                    return Mono.error((Throwable)this.logger.logExceptionAsWarning((RuntimeException)new StopReconnectException("Client is stopped by user.")));
                }
                return Mono.empty();
            }).then(this.clientAccessUrlProvider.flatMap(url -> Mono.fromRunnable(() -> {
                String recoveryUrl = UrlBuilder.parse((String)url).addQueryParameter("awps_connection_id", connectionId).addQueryParameter("awps_reconnection_token", reconnectionToken).toString();
                this.webSocketSession = this.webSocketClient.connectToServer(this.clientEndpointConfiguration, recoveryUrl, this.loggerReference, this::handleMessage, this::handleSessionOpen, this::handleSessionClose);
            }).subscribeOn(Schedulers.boundedElastic()))).retryWhen(RECONNECT_RETRY_SPEC).doOnError(error -> this.handleClientStop());
        });
    }

    private void handleClientStop() {
        this.handleClientStop(true);
    }

    private void handleClientStop(boolean sendStoppedEvent) {
        this.clientState.changeState(WebPubSubClientState.STOPPED);
        this.webSocketSession = null;
        this.webPubSubConnection = null;
        this.tryCompleteOnStoppedByUserSink();
        Disposable task = this.sequenceAckTask.getAndSet(null);
        if (task != null) {
            task.dispose();
        }
        if (sendStoppedEvent) {
            this.tryEmitNext(this.stoppedEventSink, new StoppedEvent());
        }
        this.groupMessageEventSink.emitComplete(this.emitFailureHandler("Unable to emit Complete to groupMessageEventSink"));
        this.groupMessageEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
        this.serverMessageEventSink.emitComplete(this.emitFailureHandler("Unable to emit Complete to groupMessageEventSink"));
        this.serverMessageEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
        this.connectedEventSink.emitComplete(this.emitFailureHandler("Unable to emit Complete to connectedEventSink"));
        this.connectedEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
        this.disconnectedEventSink.emitComplete(this.emitFailureHandler("Unable to emit Complete to disconnectedEventSink"));
        this.disconnectedEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
        this.stoppedEventSink.emitComplete(this.emitFailureHandler("Unable to emit Complete to disconnectedEventSink"));
        this.stoppedEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
        this.rejoinGroupFailedEventSink.emitComplete(this.emitFailureHandler("Unable to emit Complete to rejoinGroupFailedEventSink"));
        this.rejoinGroupFailedEventSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
        this.ackMessageSink.emitComplete(this.emitFailureHandler("Unable to emit Complete to ackMessageSink"));
        this.ackMessageSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
        this.updateLogger(this.applicationId, null);
    }

    private void handleConnectionClose() {
        this.handleConnectionClose(null);
    }

    private void handleConnectionClose(DisconnectedEvent disconnectedEvent) {
        DisconnectedEvent event = disconnectedEvent == null ? new DisconnectedEvent(this.getConnectionId(), null) : disconnectedEvent;
        WebPubSubConnection connection = this.webPubSubConnection;
        if (connection != null) {
            connection.updateForDisconnected(() -> this.tryEmitNext(this.disconnectedEventSink, event));
        }
        if (disconnectedEvent == null) {
            this.webPubSubConnection = null;
        }
    }

    private void updateLogger(String applicationId, String connectionId) {
        this.logger = new ClientLogger(WebPubSubAsyncClient.class, LoggingUtils.createContextWithConnectionId(applicationId, connectionId));
        this.loggerReference.set(this.logger);
    }

    WebPubSubClientState getClientState() {
        return this.clientState.get();
    }

    WebSocketSession getWebsocketSession() {
        return this.webSocketSession;
    }

    private Sinks.EmitFailureHandler emitFailureHandler(String message) {
        return (signalType, emitResult) -> {
            LoggingUtils.addSignalTypeAndResult(this.logger.atWarning(), signalType, emitResult).log(message);
            return emitResult.equals((Object)Sinks.EmitResult.FAIL_NON_SERIALIZED);
        };
    }

    private RuntimeException logSendMessageFailedException(String errorMessage, Throwable cause, boolean isTransient, WebPubSubMessage message) {
        return this.logSendMessageFailedException(errorMessage, cause, isTransient, message instanceof WebPubSubMessageAck ? ((WebPubSubMessageAck)message).getAckId() : null);
    }

    private RuntimeException logSendMessageFailedException(String errorMessage, Throwable cause, boolean isTransient, Long ackId) {
        return this.logSendMessageFailedException(errorMessage, cause, isTransient, ackId, null);
    }

    private RuntimeException logSendMessageFailedException(String errorMessage, Throwable cause, boolean isTransient, Long ackId, AckResponseError error) {
        return this.logger.logExceptionAsWarning((RuntimeException)((Object)new SendMessageFailedException(errorMessage, cause, isTransient, ackId, error)));
    }

    private static boolean isReliableProtocol(WebPubSubProtocolType webPubSubProtocol) {
        return webPubSubProtocol == WebPubSubProtocolType.JSON_RELIABLE_PROTOCOL;
    }

    private final class ClientState {
        private final AtomicReference<WebPubSubClientState> clientState = new AtomicReference<WebPubSubClientState>(WebPubSubClientState.STOPPED);

        private ClientState() {
        }

        WebPubSubClientState get() {
            return this.clientState.get();
        }

        WebPubSubClientState changeState(WebPubSubClientState state) {
            WebPubSubClientState previousState = this.clientState.getAndSet(state);
            WebPubSubAsyncClient.this.logger.atInfo().addKeyValue("currentClientState", (Object)state).addKeyValue("previousClientState", (Object)previousState).log("Client state changed.");
            return previousState;
        }

        boolean changeStateOn(WebPubSubClientState previousState, WebPubSubClientState state) {
            boolean success = this.clientState.compareAndSet(previousState, state);
            if (success) {
                WebPubSubAsyncClient.this.logger.atInfo().addKeyValue("currentClientState", (Object)state).addKeyValue("previousClientState", (Object)previousState).log("Client state changed.");
            }
            return success;
        }
    }

    private static final class StopReconnectException
    extends RuntimeException {
        private StopReconnectException(String message) {
            super(message);
        }
    }
}

