/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.internal;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.exceptions.RejectedResumeException;
import io.rsocket.exceptions.UnsupportedSetupException;
import io.rsocket.frame.ResumeFrameFlyweight;
import io.rsocket.frame.SetupFrameFlyweight;
import io.rsocket.internal.ClientServerInputMultiplexer;
import io.rsocket.keepalive.KeepAliveHandler;
import io.rsocket.resume.ResumableDuplexConnection;
import io.rsocket.resume.ResumableFramesStore;
import io.rsocket.resume.ServerRSocketSession;
import io.rsocket.resume.SessionManager;
import io.rsocket.util.ConnectionUtils;
import java.time.Duration;
import java.util.function.BiFunction;
import java.util.function.Function;
import reactor.core.publisher.Mono;

public interface ServerSetup {
    public Mono<Void> acceptRSocketSetup(ByteBuf var1, ClientServerInputMultiplexer var2, BiFunction<KeepAliveHandler, ClientServerInputMultiplexer, Mono<Void>> var3);

    public Mono<Void> acceptRSocketResume(ByteBuf var1, ClientServerInputMultiplexer var2);

    default public void dispose() {
    }

    public static class ResumableServerSetup
    implements ServerSetup {
        private final ByteBufAllocator allocator;
        private final SessionManager sessionManager;
        private final Duration resumeSessionDuration;
        private final Duration resumeStreamTimeout;
        private final Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory;
        private final boolean cleanupStoreOnKeepAlive;

        public ResumableServerSetup(ByteBufAllocator allocator, SessionManager sessionManager, Duration resumeSessionDuration, Duration resumeStreamTimeout, Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory, boolean cleanupStoreOnKeepAlive) {
            this.allocator = allocator;
            this.sessionManager = sessionManager;
            this.resumeSessionDuration = resumeSessionDuration;
            this.resumeStreamTimeout = resumeStreamTimeout;
            this.resumeStoreFactory = resumeStoreFactory;
            this.cleanupStoreOnKeepAlive = cleanupStoreOnKeepAlive;
        }

        @Override
        public Mono<Void> acceptRSocketSetup(ByteBuf frame, ClientServerInputMultiplexer multiplexer, BiFunction<KeepAliveHandler, ClientServerInputMultiplexer, Mono<Void>> then) {
            if (SetupFrameFlyweight.resumeEnabled(frame)) {
                ByteBuf resumeToken = SetupFrameFlyweight.resumeToken(frame);
                ResumableDuplexConnection connection = this.sessionManager.save(new ServerRSocketSession(multiplexer.asClientServerConnection(), this.allocator, this.resumeSessionDuration, this.resumeStreamTimeout, this.resumeStoreFactory, resumeToken, this.cleanupStoreOnKeepAlive)).resumableConnection();
                return then.apply(new KeepAliveHandler.ResumableKeepAliveHandler(connection), new ClientServerInputMultiplexer(connection));
            }
            return then.apply(new KeepAliveHandler.DefaultKeepAliveHandler(multiplexer), multiplexer);
        }

        @Override
        public Mono<Void> acceptRSocketResume(ByteBuf frame, ClientServerInputMultiplexer multiplexer) {
            ServerRSocketSession session = this.sessionManager.get(ResumeFrameFlyweight.token(frame));
            if (session != null) {
                return session.continueWith(multiplexer.asClientServerConnection()).resumeWith(frame).onClose().then();
            }
            return this.sendError(multiplexer, new RejectedResumeException("unknown resume token")).doFinally(s -> {
                frame.release();
                multiplexer.dispose();
            });
        }

        private Mono<Void> sendError(ClientServerInputMultiplexer multiplexer, Exception exception) {
            return ConnectionUtils.sendError(this.allocator, multiplexer, exception);
        }

        @Override
        public void dispose() {
            this.sessionManager.dispose();
        }
    }

    public static class DefaultServerSetup
    implements ServerSetup {
        private final ByteBufAllocator allocator;

        public DefaultServerSetup(ByteBufAllocator allocator) {
            this.allocator = allocator;
        }

        @Override
        public Mono<Void> acceptRSocketSetup(ByteBuf frame, ClientServerInputMultiplexer multiplexer, BiFunction<KeepAliveHandler, ClientServerInputMultiplexer, Mono<Void>> then) {
            if (SetupFrameFlyweight.resumeEnabled(frame)) {
                return this.sendError(multiplexer, new UnsupportedSetupException("resume not supported")).doFinally(signalType -> {
                    frame.release();
                    multiplexer.dispose();
                });
            }
            return then.apply(new KeepAliveHandler.DefaultKeepAliveHandler(multiplexer), multiplexer);
        }

        @Override
        public Mono<Void> acceptRSocketResume(ByteBuf frame, ClientServerInputMultiplexer multiplexer) {
            return this.sendError(multiplexer, new RejectedResumeException("resume not supported")).doFinally(signalType -> {
                frame.release();
                multiplexer.dispose();
            });
        }

        private Mono<Void> sendError(ClientServerInputMultiplexer multiplexer, Exception exception) {
            return ConnectionUtils.sendError(this.allocator, multiplexer, exception);
        }
    }
}

