/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.resume;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.exceptions.Exceptions;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.FrameType;
import io.rsocket.frame.ResumeFrameCodec;
import io.rsocket.frame.ResumeOkFrameCodec;
import io.rsocket.keepalive.KeepAliveSupport;
import io.rsocket.resume.RSocketSession;
import io.rsocket.resume.ResumableDuplexConnection;
import io.rsocket.resume.ResumableFramesStore;
import io.rsocket.resume.ResumeStateHolder;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;
import reactor.util.function.Tuple2;
import reactor.util.retry.Retry;

public class ClientRSocketSession
implements RSocketSession,
ResumeStateHolder,
CoreSubscriber<Tuple2<ByteBuf, DuplexConnection>> {
    private static final Logger logger = LoggerFactory.getLogger(ClientRSocketSession.class);
    final ResumableDuplexConnection resumableConnection;
    final Mono<Tuple2<ByteBuf, DuplexConnection>> connectionFactory;
    final ResumableFramesStore resumableFramesStore;
    final ByteBufAllocator allocator;
    final Duration resumeSessionDuration;
    final Retry retry;
    final boolean cleanupStoreOnKeepAlive;
    final ByteBuf resumeToken;
    volatile Subscription s;
    static final AtomicReferenceFieldUpdater<ClientRSocketSession, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(ClientRSocketSession.class, Subscription.class, "s");
    KeepAliveSupport keepAliveSupport;

    public ClientRSocketSession(ByteBuf resumeToken, ResumableDuplexConnection resumableDuplexConnection, Mono<DuplexConnection> connectionFactory, Function<DuplexConnection, Mono<Tuple2<ByteBuf, DuplexConnection>>> connectionTransformer, ResumableFramesStore resumableFramesStore, Duration resumeSessionDuration, Retry retry, boolean cleanupStoreOnKeepAlive) {
        this.resumeToken = resumeToken;
        this.connectionFactory = connectionFactory.flatMap(dc -> {
            dc.sendFrame(0, ResumeFrameCodec.encode(dc.alloc(), resumeToken.retain(), resumableFramesStore.frameImpliedPosition(), resumableFramesStore.framePosition()));
            logger.debug("Resume Frame has been sent");
            return (Mono)connectionTransformer.apply((DuplexConnection)dc);
        });
        this.resumableFramesStore = resumableFramesStore;
        this.allocator = resumableDuplexConnection.alloc();
        this.resumeSessionDuration = resumeSessionDuration;
        this.retry = retry;
        this.cleanupStoreOnKeepAlive = cleanupStoreOnKeepAlive;
        this.resumableConnection = resumableDuplexConnection;
        resumableDuplexConnection.onClose().doFinally(__ -> this.dispose()).subscribe();
        resumableDuplexConnection.onActiveConnectionClosed().subscribe(this::reconnect);
        S.lazySet(this, Operators.cancelledSubscription());
    }

    void reconnect(int index) {
        if (this.s == Operators.cancelledSubscription() && S.compareAndSet(this, Operators.cancelledSubscription(), null)) {
            this.keepAliveSupport.stop();
            logger.debug("Connection[" + index + "] is lost. Reconnecting to resume...");
            this.connectionFactory.retryWhen(this.retry).timeout(this.resumeSessionDuration).subscribe((CoreSubscriber)this);
        }
    }

    @Override
    public long impliedPosition() {
        return this.resumableFramesStore.frameImpliedPosition();
    }

    @Override
    public void onImpliedPosition(long remoteImpliedPos) {
        if (this.cleanupStoreOnKeepAlive) {
            try {
                this.resumableFramesStore.releaseFrames(remoteImpliedPos);
            }
            catch (Throwable e) {
                this.resumableConnection.sendErrorAndClose(new ConnectionErrorException(e.getMessage(), e));
            }
        }
    }

    public void dispose() {
        Operators.terminate(S, (Object)this);
        this.resumableConnection.dispose();
        this.resumableFramesStore.dispose();
        if (this.resumeToken.refCnt() > 0) {
            this.resumeToken.release();
        }
    }

    public boolean isDisposed() {
        return this.resumableConnection.isDisposed();
    }

    public void onSubscribe(Subscription s) {
        if (Operators.setOnce(S, (Object)this, (Subscription)s)) {
            s.request(Long.MAX_VALUE);
        }
    }

    public void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
        ByteBuf shouldBeResumeOKFrame = (ByteBuf)tuple2.getT1();
        DuplexConnection nextDuplexConnection = (DuplexConnection)tuple2.getT2();
        if (!Operators.terminate(S, (Object)this)) {
            logger.debug("Session has already been expired. Terminating received connection");
            ConnectionErrorException connectionErrorException = new ConnectionErrorException("resumption_server=[Session Expired]");
            nextDuplexConnection.sendErrorAndClose(connectionErrorException);
            return;
        }
        int streamId = FrameHeaderCodec.streamId(shouldBeResumeOKFrame);
        if (streamId != 0) {
            logger.debug("Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection");
            this.resumableConnection.dispose();
            ConnectionErrorException connectionErrorException = new ConnectionErrorException("RESUME_OK frame must be received before any others");
            nextDuplexConnection.sendErrorAndClose(connectionErrorException);
            return;
        }
        FrameType frameType = FrameHeaderCodec.nativeFrameType(shouldBeResumeOKFrame);
        if (frameType == FrameType.RESUME_OK) {
            long remoteImpliedPos = ResumeOkFrameCodec.lastReceivedClientPos(shouldBeResumeOKFrame);
            long position = this.resumableFramesStore.framePosition();
            long impliedPosition = this.resumableFramesStore.frameImpliedPosition();
            logger.debug("ResumeOK FRAME received. ServerResumeState{observedFramesPosition[{}]}. ClientResumeState{observedFramesPosition[{}], sentFramesPosition[{}]}", new Object[]{remoteImpliedPos, impliedPosition, position});
            if (position <= remoteImpliedPos) {
                try {
                    if (position != remoteImpliedPos) {
                        this.resumableFramesStore.releaseFrames(remoteImpliedPos);
                    }
                }
                catch (IllegalStateException e) {
                    logger.debug("Exception occurred while releasing frames in the frameStore", (Throwable)e);
                    this.resumableConnection.dispose();
                    ConnectionErrorException t = new ConnectionErrorException(e.getMessage(), e);
                    nextDuplexConnection.sendErrorAndClose(t);
                    return;
                }
                if (this.resumableConnection.connect(nextDuplexConnection)) {
                    this.keepAliveSupport.start();
                    logger.debug("Session has been resumed successfully");
                } else {
                    logger.debug("Session has already been expired. Terminating received connection");
                    ConnectionErrorException connectionErrorException = new ConnectionErrorException("resumption_server_pos=[Session Expired]");
                    nextDuplexConnection.sendErrorAndClose(connectionErrorException);
                }
            } else {
                logger.debug("Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}]. Terminating received connection", (Object)remoteImpliedPos, (Object)position);
                this.resumableConnection.dispose();
                ConnectionErrorException connectionErrorException = new ConnectionErrorException("resumption_server_pos=[" + remoteImpliedPos + "]");
                nextDuplexConnection.sendErrorAndClose(connectionErrorException);
            }
        } else if (frameType == FrameType.ERROR) {
            RuntimeException exception = Exceptions.from(0, shouldBeResumeOKFrame);
            logger.debug("Received error frame. Terminating received connection", (Throwable)exception);
            this.resumableConnection.dispose();
        } else {
            logger.debug("Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection");
            this.resumableConnection.dispose();
            ConnectionErrorException connectionErrorException = new ConnectionErrorException("RESUME_OK frame must be received before any others");
            nextDuplexConnection.sendErrorAndClose(connectionErrorException);
        }
    }

    public void onError(Throwable t) {
        if (!Operators.terminate(S, (Object)this)) {
            Operators.onErrorDropped((Throwable)t, (Context)this.currentContext());
        }
        this.resumableConnection.dispose();
    }

    public void onComplete() {
    }

    @Override
    public void setKeepAliveSupport(KeepAliveSupport keepAliveSupport) {
        this.keepAliveSupport = keepAliveSupport;
    }
}

