/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.resume;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.Closeable;
import io.rsocket.DuplexConnection;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.resume.RequestListener;
import io.rsocket.resume.ResumableFramesStore;
import io.rsocket.resume.ResumeFramesSubscriber;
import io.rsocket.resume.ResumeStateException;
import io.rsocket.resume.ResumeStateHolder;
import io.rsocket.resume.UpstreamFramesSubscriber;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.UnicastProcessor;
import reactor.util.concurrent.Queues;

public class ResumableDuplexConnection
implements DuplexConnection,
ResumeStateHolder {
    private static final Logger logger = LoggerFactory.getLogger(ResumableDuplexConnection.class);
    private static final Throwable closedChannelException = new ClosedChannelException();
    private final String tag;
    private final ResumableFramesStore resumableFramesStore;
    private final Duration resumeStreamTimeout;
    private final boolean cleanupOnKeepAlive;
    private final ReplayProcessor<DuplexConnection> connections = ReplayProcessor.create((int)1);
    private final EmitterProcessor<Throwable> connectionErrors = EmitterProcessor.create();
    private volatile DuplexConnection curConnection;
    private final FluxProcessor<ByteBuf, ByteBuf> downStreamFrames = ReplayProcessor.create((int)0);
    private final FluxProcessor<ByteBuf, ByteBuf> resumeSaveFrames = EmitterProcessor.create();
    private final MonoProcessor<Void> resumeSaveCompleted = MonoProcessor.create();
    private final Queue<Object> actions = (Queue)Queues.unboundedMultiproducer().get();
    private final AtomicInteger actionsWip = new AtomicInteger();
    private final AtomicBoolean disposed = new AtomicBoolean();
    private final Mono<Void> framesSent;
    private final RequestListener downStreamRequestListener = new RequestListener();
    private final RequestListener resumeSaveStreamRequestListener = new RequestListener();
    private final UnicastProcessor<Flux<ByteBuf>> upstreams = UnicastProcessor.create();
    private final UpstreamFramesSubscriber upstreamSubscriber = new UpstreamFramesSubscriber(Queues.SMALL_BUFFER_SIZE, this.downStreamRequestListener.requests(), this.resumeSaveStreamRequestListener.requests(), this::dispatch);
    private volatile Runnable onResume;
    private volatile Runnable onDisconnect;
    private volatile int state;
    private volatile Disposable resumedStreamDisposable = Disposables.disposed();

    public ResumableDuplexConnection(String tag, DuplexConnection duplexConnection, ResumableFramesStore resumableFramesStore, Duration resumeStreamTimeout, boolean cleanupOnKeepAlive) {
        this.tag = tag;
        this.resumableFramesStore = resumableFramesStore;
        this.resumeStreamTimeout = resumeStreamTimeout;
        this.cleanupOnKeepAlive = cleanupOnKeepAlive;
        resumableFramesStore.saveFrames(this.resumeSaveStreamRequestListener.apply(this.resumeSaveFrames)).subscribe(this.resumeSaveCompleted);
        this.upstreams.flatMap(Function.identity()).subscribe((Subscriber)this.upstreamSubscriber);
        this.framesSent = this.connections.switchMap(c -> {
            logger.debug("Switching transport: {}", (Object)tag);
            return c.send((Publisher<ByteBuf>)this.downStreamRequestListener.apply(this.downStreamFrames)).doFinally(s -> logger.debug("{} Transport send completed: {}, {}", new Object[]{tag, s, c.toString()})).onErrorResume(err -> Mono.never());
        }).then().cache();
        this.reconnect(duplexConnection);
    }

    @Override
    public ByteBufAllocator alloc() {
        return this.curConnection.alloc();
    }

    public void disconnect() {
        DuplexConnection c = this.curConnection;
        if (c != null) {
            this.disconnect(c);
        }
    }

    public void onDisconnect(Runnable onDisconnectAction) {
        this.onDisconnect = onDisconnectAction;
    }

    public void onResume(Runnable onResumeAction) {
        this.onResume = onResumeAction;
    }

    public void reconnect(DuplexConnection connection) {
        if (this.curConnection == null) {
            logger.debug("{} Resumable duplex connection started with connection: {}", (Object)this.tag, (Object)connection);
            this.state = State.CONNECTED;
            this.onNewConnection(connection);
        } else {
            logger.debug("{} Resumable duplex connection reconnected with connection: {}", (Object)this.tag, (Object)connection);
            this.dispatch(new ResumeStart(connection));
        }
    }

    public void resume(long remotePos, long remoteImpliedPos, Function<Mono<Long>, Mono<Void>> resumeFrameSent) {
        this.dispatch(new Resume(remotePos, remoteImpliedPos, resumeFrameSent));
    }

    @Override
    public Mono<Void> sendOne(ByteBuf frame) {
        return this.curConnection.sendOne(frame);
    }

    @Override
    public Mono<Void> send(Publisher<ByteBuf> frames) {
        this.upstreams.onNext((Object)Flux.from(frames));
        return this.framesSent;
    }

    @Override
    public Flux<ByteBuf> receive() {
        return this.connections.switchMap(c -> c.receive().doOnNext(f -> {
            if (ResumableDuplexConnection.isResumableFrame(f)) {
                this.resumableFramesStore.resumableFrameReceived((ByteBuf)f);
            }
        }).onErrorResume(err -> Mono.never()));
    }

    public long position() {
        return this.resumableFramesStore.framePosition();
    }

    @Override
    public long impliedPosition() {
        return this.resumableFramesStore.frameImpliedPosition();
    }

    @Override
    public void onImpliedPosition(long remoteImpliedPos) {
        logger.debug("Got remote position from keep-alive: {}", (Object)remoteImpliedPos);
        if (this.cleanupOnKeepAlive) {
            this.dispatch(new ReleaseFrames(remoteImpliedPos));
        }
    }

    @Override
    public Mono<Void> onClose() {
        return Flux.merge((Publisher[])new Publisher[]{this.connections.last().flatMap(Closeable::onClose), this.resumeSaveCompleted}).then();
    }

    public void dispose() {
        if (this.disposed.compareAndSet(false, true)) {
            logger.debug("Resumable connection disposed: {}, {}", (Object)this.tag, (Object)this);
            this.upstreams.onComplete();
            this.connections.onComplete();
            this.connectionErrors.onComplete();
            this.resumeSaveFrames.onComplete();
            this.curConnection.dispose();
            this.upstreamSubscriber.dispose();
            this.resumedStreamDisposable.dispose();
            this.resumableFramesStore.dispose();
        }
    }

    @Override
    public double availability() {
        return this.curConnection.availability();
    }

    public boolean isDisposed() {
        return this.disposed.get();
    }

    private void sendFrame(ByteBuf f) {
        if (this.disposed.get()) {
            f.release();
            return;
        }
        if (this.state != State.RESUME && ResumableDuplexConnection.isResumableFrame(f)) {
            this.resumeSaveFrames.onNext((Object)f);
        }
        if (this.state != State.RESUME_STARTED) {
            this.downStreamFrames.onNext((Object)f);
        }
    }

    Flux<Throwable> connectionErrors() {
        return this.connectionErrors;
    }

    private void dispatch(Object action) {
        this.actions.offer(action);
        if (this.actionsWip.getAndIncrement() == 0) {
            do {
                Object a;
                if ((a = this.actions.poll()) instanceof ByteBuf) {
                    this.sendFrame((ByteBuf)a);
                    continue;
                }
                ((Runnable)a).run();
            } while (this.actionsWip.decrementAndGet() != 0);
        }
    }

    private void doResumeStart(DuplexConnection connection) {
        this.state = State.RESUME_STARTED;
        this.resumedStreamDisposable.dispose();
        this.upstreamSubscriber.resumeStart();
        this.onNewConnection(connection);
    }

    private void doResume(long remotePosition, long remoteImpliedPosition, Function<Mono<Long>, Mono<Void>> sendResumeFrame) {
        Mono impliedPositionOrError;
        long localPosition = this.position();
        long localImpliedPosition = this.impliedPosition();
        logger.debug("Resumption start");
        logger.debug("Resumption states. local: [pos: {}, impliedPos: {}], remote: [pos: {}, impliedPos: {}]", new Object[]{localPosition, localImpliedPosition, remotePosition, remoteImpliedPosition});
        long remoteImpliedPos = ResumableDuplexConnection.calculateRemoteImpliedPos(localPosition, localImpliedPosition, remotePosition, remoteImpliedPosition);
        if (remoteImpliedPos >= 0L) {
            this.state = State.RESUME;
            this.releaseFramesToPosition(remoteImpliedPos);
            impliedPositionOrError = Mono.just((Object)localImpliedPosition);
        } else {
            impliedPositionOrError = Mono.error((Throwable)new ResumeStateException(localPosition, localImpliedPosition, remotePosition, remoteImpliedPosition));
        }
        sendResumeFrame.apply((Mono<Long>)impliedPositionOrError).doOnSuccess(v -> {
            Runnable r = this.onResume;
            if (r != null) {
                r.run();
            }
        }).then(this.streamResumedFrames((Flux<ByteBuf>)this.resumableFramesStore.resumeStream().timeout(this.resumeStreamTimeout).doFinally(s -> this.dispatch(new ResumeComplete()))).doOnError(err -> this.dispose())).onErrorResume(err -> Mono.empty()).subscribe();
    }

    static long calculateRemoteImpliedPos(long pos, long impliedPos, long remotePos, long remoteImpliedPos) {
        if (remotePos <= impliedPos && pos <= remoteImpliedPos) {
            return remoteImpliedPos;
        }
        return -1L;
    }

    private void doResumeComplete() {
        logger.debug("Completing resumption");
        this.state = State.RESUME_COMPLETED;
        this.upstreamSubscriber.resumeComplete();
    }

    private Mono<Void> streamResumedFrames(Flux<ByteBuf> frames) {
        return Mono.create(s -> {
            ResumeFramesSubscriber subscriber = new ResumeFramesSubscriber(this.downStreamRequestListener.requests(), this::dispatch, arg_0 -> ((MonoSink)s).error(arg_0), () -> ((MonoSink)s).success());
            s.onDispose((Disposable)subscriber);
            this.resumedStreamDisposable = subscriber;
            frames.subscribe((Subscriber)subscriber);
        });
    }

    private void onNewConnection(DuplexConnection connection) {
        this.curConnection = connection;
        connection.onClose().doFinally(v -> this.disconnect(connection)).subscribe();
        this.connections.onNext((Object)connection);
    }

    private void disconnect(DuplexConnection connection) {
        if (this.curConnection == connection && this.state != State.DISCONNECTED) {
            connection.dispose();
            this.state = State.DISCONNECTED;
            logger.debug("{} Inner connection disconnected: {}", (Object)this.tag, (Object)closedChannelException.getClass().getSimpleName());
            this.connectionErrors.onNext((Object)closedChannelException);
            Runnable r = this.onDisconnect;
            if (r != null) {
                r.run();
            }
        }
    }

    private void releaseFramesToPosition(long remoteImpliedPos) {
        this.resumableFramesStore.releaseFrames(remoteImpliedPos);
    }

    static boolean isResumableFrame(ByteBuf frame) {
        switch (FrameHeaderCodec.nativeFrameType(frame)) {
            case REQUEST_CHANNEL: 
            case REQUEST_STREAM: 
            case REQUEST_RESPONSE: 
            case REQUEST_FNF: 
            case REQUEST_N: 
            case CANCEL: 
            case ERROR: 
            case PAYLOAD: {
                return true;
            }
        }
        return false;
    }

    private class ReleaseFrames
    implements Runnable {
        private final long remoteImpliedPos;

        public ReleaseFrames(long remoteImpliedPos) {
            this.remoteImpliedPos = remoteImpliedPos;
        }

        @Override
        public void run() {
            ResumableDuplexConnection.this.releaseFramesToPosition(this.remoteImpliedPos);
        }
    }

    private class ResumeComplete
    implements Runnable {
        private ResumeComplete() {
        }

        @Override
        public void run() {
            ResumableDuplexConnection.this.doResumeComplete();
        }
    }

    class Resume
    implements Runnable {
        private final long remotePos;
        private final long remoteImpliedPos;
        private final Function<Mono<Long>, Mono<Void>> resumeFrameSent;

        public Resume(long remotePos, long remoteImpliedPos, Function<Mono<Long>, Mono<Void>> resumeFrameSent) {
            this.remotePos = remotePos;
            this.remoteImpliedPos = remoteImpliedPos;
            this.resumeFrameSent = resumeFrameSent;
        }

        @Override
        public void run() {
            ResumableDuplexConnection.this.doResume(this.remotePos, this.remoteImpliedPos, this.resumeFrameSent);
        }
    }

    class ResumeStart
    implements Runnable {
        private final DuplexConnection connection;

        public ResumeStart(DuplexConnection connection) {
            this.connection = connection;
        }

        @Override
        public void run() {
            ResumableDuplexConnection.this.doResumeStart(this.connection);
        }
    }

    static class State {
        static int CONNECTED = 0;
        static int RESUME_STARTED = 1;
        static int RESUME = 2;
        static int RESUME_COMPLETED = 3;
        static int DISCONNECTED = 4;

        State() {
        }
    }
}

