/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.rsocket.DuplexConnection;
import io.rsocket.RSocketErrorException;
import io.rsocket.core.SetupHandlingDuplexConnection;
import io.rsocket.exceptions.RejectedResumeException;
import io.rsocket.exceptions.UnsupportedSetupException;
import io.rsocket.frame.ResumeFrameCodec;
import io.rsocket.frame.SetupFrameCodec;
import io.rsocket.keepalive.KeepAliveHandler;
import io.rsocket.resume.ResumableDuplexConnection;
import io.rsocket.resume.ResumableFramesStore;
import io.rsocket.resume.ServerRSocketSession;
import io.rsocket.resume.SessionManager;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.function.BiFunction;
import java.util.function.Function;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.util.function.Tuple2;

abstract class ServerSetup {
    ServerSetup() {
    }

    Mono<Tuple2<ByteBuf, DuplexConnection>> init(DuplexConnection connection) {
        return Mono.create(sink -> sink.onRequest(__ -> new SetupHandlingDuplexConnection(connection, (MonoSink<Tuple2<ByteBuf, DuplexConnection>>)sink))).or(connection.onClose().then(Mono.error(ClosedChannelException::new)));
    }

    abstract Mono<Void> acceptRSocketSetup(ByteBuf var1, DuplexConnection var2, BiFunction<KeepAliveHandler, DuplexConnection, Mono<Void>> var3);

    abstract Mono<Void> acceptRSocketResume(ByteBuf var1, DuplexConnection var2);

    void dispose() {
    }

    void sendError(DuplexConnection duplexConnection, RSocketErrorException exception) {
        duplexConnection.sendErrorAndClose(exception);
    }

    static class ResumableServerSetup
    extends ServerSetup {
        private final SessionManager sessionManager;
        private final Duration resumeSessionDuration;
        private final Duration resumeStreamTimeout;
        private final Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory;
        private final boolean cleanupStoreOnKeepAlive;

        ResumableServerSetup(SessionManager sessionManager, Duration resumeSessionDuration, Duration resumeStreamTimeout, Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory, boolean cleanupStoreOnKeepAlive) {
            this.sessionManager = sessionManager;
            this.resumeSessionDuration = resumeSessionDuration;
            this.resumeStreamTimeout = resumeStreamTimeout;
            this.resumeStoreFactory = resumeStoreFactory;
            this.cleanupStoreOnKeepAlive = cleanupStoreOnKeepAlive;
        }

        @Override
        public Mono<Void> acceptRSocketSetup(ByteBuf frame, DuplexConnection duplexConnection, BiFunction<KeepAliveHandler, DuplexConnection, Mono<Void>> then) {
            if (SetupFrameCodec.resumeEnabled(frame)) {
                ByteBuf resumeToken = SetupFrameCodec.resumeToken(frame);
                ResumableFramesStore resumableFramesStore = this.resumeStoreFactory.apply((ByteBuf)resumeToken);
                ResumableDuplexConnection resumableDuplexConnection = new ResumableDuplexConnection("server", duplexConnection, resumableFramesStore);
                ServerRSocketSession serverRSocketSession = new ServerRSocketSession(resumeToken, duplexConnection, resumableDuplexConnection, this.resumeSessionDuration, resumableFramesStore, this.cleanupStoreOnKeepAlive);
                this.sessionManager.save(serverRSocketSession, resumeToken);
                return then.apply(new KeepAliveHandler.ResumableKeepAliveHandler(resumableDuplexConnection, serverRSocketSession, serverRSocketSession), resumableDuplexConnection);
            }
            return then.apply(new KeepAliveHandler.DefaultKeepAliveHandler(duplexConnection), duplexConnection);
        }

        @Override
        public Mono<Void> acceptRSocketResume(ByteBuf frame, DuplexConnection duplexConnection) {
            ServerRSocketSession session = this.sessionManager.get(ResumeFrameCodec.token(frame));
            if (session != null) {
                session.resumeWith(frame, duplexConnection);
                return duplexConnection.onClose();
            }
            this.sendError(duplexConnection, new RejectedResumeException("unknown resume token"));
            return duplexConnection.onClose();
        }

        @Override
        public void dispose() {
            this.sessionManager.dispose();
        }
    }

    static class DefaultServerSetup
    extends ServerSetup {
        DefaultServerSetup() {
        }

        @Override
        public Mono<Void> acceptRSocketSetup(ByteBuf frame, DuplexConnection duplexConnection, BiFunction<KeepAliveHandler, DuplexConnection, Mono<Void>> then) {
            if (SetupFrameCodec.resumeEnabled(frame)) {
                this.sendError(duplexConnection, new UnsupportedSetupException("resume not supported"));
                return Mono.empty();
            }
            return then.apply(new KeepAliveHandler.DefaultKeepAliveHandler(duplexConnection), duplexConnection);
        }

        @Override
        public Mono<Void> acceptRSocketResume(ByteBuf frame, DuplexConnection duplexConnection) {
            this.sendError(duplexConnection, new RejectedResumeException("resume not supported"));
            return duplexConnection.onClose();
        }
    }
}

