/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.resume;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.frame.ErrorFrameFlyweight;
import io.rsocket.frame.ResumeFrameFlyweight;
import io.rsocket.frame.ResumeOkFrameFlyweight;
import io.rsocket.internal.ClientServerInputMultiplexer;
import io.rsocket.resume.ClientResume;
import io.rsocket.resume.RSocketSession;
import io.rsocket.resume.ResumableDuplexConnection;
import io.rsocket.resume.ResumableFramesStore;
import io.rsocket.resume.ResumePositionsConnection;
import io.rsocket.resume.ResumeStrategy;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class ClientRSocketSession
implements RSocketSession<Mono<? extends ResumePositionsConnection>> {
    private static final Logger logger = LoggerFactory.getLogger(ClientRSocketSession.class);
    private final ResumableDuplexConnection resumableConnection;
    private volatile Mono<? extends ResumePositionsConnection> newConnection;
    private volatile ByteBuf resumeToken;
    private final ByteBufAllocator allocator;

    public ClientRSocketSession(ResumePositionsConnection duplexConnection, ByteBufAllocator allocator, Duration resumeSessionDuration, Supplier<ResumeStrategy> resumeStrategy, ResumableFramesStore resumableFramesStore, Duration resumeStreamTimeout, boolean cleanupStoreOnKeepAlive) {
        this.allocator = allocator;
        this.resumableConnection = new ResumableDuplexConnection("client", duplexConnection, resumableFramesStore, resumeStreamTimeout, cleanupStoreOnKeepAlive);
        this.onClose().doFinally(s -> this.resumeToken.release()).subscribe();
        this.resumableConnection.connectionErrors().flatMap(err -> {
            logger.debug("Client session connection error. Starting new connection");
            ResumeStrategy reconnectOnError = (ResumeStrategy)resumeStrategy.get();
            ClientResume clientResume = new ClientResume(resumeSessionDuration, this.resumeToken);
            AtomicBoolean once = new AtomicBoolean();
            return this.newConnection.delaySubscription((Publisher)(once.compareAndSet(false, true) ? (Publisher)reconnectOnError.apply(clientResume, err) : Mono.empty())).retryWhen(errors -> errors.doOnNext(retryErr -> logger.debug("Resumption reconnection error: {}", retryErr)).flatMap(retryErr -> Mono.from((Publisher)((Publisher)reconnectOnError.apply(clientResume, retryErr))).doOnNext(v -> logger.debug("Retrying with: {}", v)))).timeout(resumeSessionDuration);
        }).map(ClientServerInputMultiplexer::new).subscribe(multiplexer -> {
            this.reconnect(multiplexer.asClientServerConnection());
            long impliedPosition = this.resumableConnection.impliedPosition();
            long position = this.resumableConnection.position();
            logger.debug("Client ResumableConnection reconnected. Sending RESUME frame with state: [impliedPos: {}, pos: {}]", (Object)impliedPosition, (Object)position);
            this.sendFrame(ResumeFrameFlyweight.encode(allocator, this.resumeToken.retain(), impliedPosition, position)).then(multiplexer.asSetupConnection().receive().next()).subscribe(this::resumeWith);
        }, err -> {
            logger.debug("Client ResumableConnection reconnect timeout");
            this.resumableConnection.dispose();
        });
    }

    public ClientRSocketSession continueWith(Mono<? extends ResumePositionsConnection> newConnection) {
        this.newConnection = newConnection;
        return this;
    }

    @Override
    public ClientRSocketSession resumeWith(ByteBuf resumeOkFrame) {
        logger.debug("ResumeOK FRAME received");
        long remotePos = ClientRSocketSession.remotePos(resumeOkFrame);
        long remoteImpliedPos = ClientRSocketSession.remoteImpliedPos(resumeOkFrame);
        resumeOkFrame.release();
        this.resumableConnection.resume(remotePos, remoteImpliedPos, pos -> pos.then().onErrorResume(err -> this.sendFrame(ErrorFrameFlyweight.encode(this.allocator, 0, ClientRSocketSession.errorFrameThrowable(remoteImpliedPos))).then(Mono.fromRunnable(this.resumableConnection::dispose)).then(Mono.never())));
        return this;
    }

    public ClientRSocketSession resumeToken(ByteBuf resumeToken) {
        this.resumeToken = resumeToken.retain();
        return this;
    }

    @Override
    public void reconnect(ResumePositionsConnection connection) {
        this.resumableConnection.reconnect(connection);
    }

    @Override
    public DuplexConnection resumableConnection() {
        return this.resumableConnection;
    }

    @Override
    public ByteBuf token() {
        return this.resumeToken;
    }

    private Mono<Void> sendFrame(ByteBuf frame) {
        return this.resumableConnection.sendOne(frame).onErrorResume(err -> Mono.empty());
    }

    private static long remoteImpliedPos(ByteBuf resumeOkFrame) {
        return ResumeOkFrameFlyweight.lastReceivedClientPos(resumeOkFrame);
    }

    private static long remotePos(ByteBuf resumeOkFrame) {
        return -1L;
    }

    private static ConnectionErrorException errorFrameThrowable(long impliedPos) {
        return new ConnectionErrorException("resumption_server_pos=[" + impliedPos + "]");
    }
}

