/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.rsocket.Closeable;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.DuplexConnection;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.DefaultConnectionSetupPayload;
import io.rsocket.core.RSocketRequester;
import io.rsocket.core.RSocketResponder;
import io.rsocket.core.Resume;
import io.rsocket.core.ServerSetup;
import io.rsocket.core.StreamIdSupplier;
import io.rsocket.exceptions.InvalidSetupException;
import io.rsocket.exceptions.RejectedSetupException;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.SetupFrameCodec;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.internal.ClientServerInputMultiplexer;
import io.rsocket.keepalive.KeepAliveHandler;
import io.rsocket.lease.LeaseStats;
import io.rsocket.lease.Leases;
import io.rsocket.lease.RequesterLeaseHandler;
import io.rsocket.lease.ResponderLeaseHandler;
import io.rsocket.plugins.InitializingInterceptorRegistry;
import io.rsocket.plugins.InterceptorRegistry;
import io.rsocket.resume.SessionManager;
import io.rsocket.transport.ServerTransport;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public final class RSocketServer {
    private static final String SERVER_TAG = "server";
    private SocketAcceptor acceptor = SocketAcceptor.with(new RSocket(){});
    private InitializingInterceptorRegistry interceptors = new InitializingInterceptorRegistry();
    private Resume resume;
    private Supplier<Leases<?>> leasesSupplier = null;
    private int mtu = 0;
    private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;

    private RSocketServer() {
    }

    public static RSocketServer create() {
        return new RSocketServer();
    }

    public static RSocketServer create(SocketAcceptor acceptor) {
        return RSocketServer.create().acceptor(acceptor);
    }

    public RSocketServer acceptor(SocketAcceptor acceptor) {
        Objects.requireNonNull(acceptor);
        this.acceptor = acceptor;
        return this;
    }

    public RSocketServer interceptors(Consumer<InterceptorRegistry> configurer) {
        configurer.accept(this.interceptors);
        return this;
    }

    public RSocketServer resume(Resume resume) {
        this.resume = resume;
        return this;
    }

    public RSocketServer lease(Supplier<Leases<?>> supplier) {
        this.leasesSupplier = supplier;
        return this;
    }

    public RSocketServer fragment(int mtu) {
        if (mtu > 0 && mtu < 64 || mtu < 0) {
            String msg = String.format("The smallest allowed mtu size is %d bytes, provided: %d", 64, mtu);
            throw new IllegalArgumentException(msg);
        }
        this.mtu = mtu;
        return this;
    }

    public RSocketServer payloadDecoder(PayloadDecoder decoder) {
        Objects.requireNonNull(decoder);
        this.payloadDecoder = decoder;
        return this;
    }

    public <T extends Closeable> Mono<T> bind(final ServerTransport<T> transport) {
        return Mono.defer((Supplier)new Supplier<Mono<T>>(){
            ServerSetup serverSetup;
            {
                this.serverSetup = RSocketServer.this.serverSetup();
            }

            @Override
            public Mono<T> get() {
                return transport.start(duplexConnection -> RSocketServer.this.acceptor(this.serverSetup, duplexConnection), RSocketServer.this.mtu).doOnNext(c -> c.onClose().doFinally(v -> this.serverSetup.dispose()).subscribe());
            }
        });
    }

    public <T extends Closeable> T bindNow(ServerTransport<T> transport) {
        return (T)((Closeable)this.bind(transport).block());
    }

    public ServerTransport.ConnectionAcceptor asConnectionAcceptor() {
        return new ServerTransport.ConnectionAcceptor(){
            private final ServerSetup serverSetup;
            {
                this.serverSetup = RSocketServer.this.serverSetup();
            }

            @Override
            public Mono<Void> apply(DuplexConnection connection) {
                return RSocketServer.this.acceptor(this.serverSetup, connection);
            }
        };
    }

    private Mono<Void> acceptor(ServerSetup serverSetup, DuplexConnection connection) {
        ClientServerInputMultiplexer multiplexer = new ClientServerInputMultiplexer(connection, this.interceptors, false);
        return multiplexer.asSetupConnection().receive().next().flatMap(startFrame -> this.accept(serverSetup, (ByteBuf)startFrame, multiplexer));
    }

    private Mono<Void> acceptResume(ServerSetup serverSetup, ByteBuf resumeFrame, ClientServerInputMultiplexer multiplexer) {
        return serverSetup.acceptRSocketResume(resumeFrame, multiplexer);
    }

    private Mono<Void> accept(ServerSetup serverSetup, ByteBuf startFrame, ClientServerInputMultiplexer multiplexer) {
        switch (FrameHeaderCodec.frameType(startFrame)) {
            case SETUP: {
                return this.acceptSetup(serverSetup, startFrame, multiplexer);
            }
            case RESUME: {
                return this.acceptResume(serverSetup, startFrame, multiplexer);
            }
        }
        return serverSetup.sendError(multiplexer, new InvalidSetupException("invalid setup frame: " + (Object)((Object)FrameHeaderCodec.frameType(startFrame)))).doFinally(signalType -> {
            startFrame.release();
            multiplexer.dispose();
        });
    }

    private Mono<Void> acceptSetup(ServerSetup serverSetup, ByteBuf setupFrame, ClientServerInputMultiplexer multiplexer) {
        boolean leaseEnabled;
        if (!SetupFrameCodec.isSupportedVersion(setupFrame)) {
            return serverSetup.sendError(multiplexer, new InvalidSetupException("Unsupported version: " + SetupFrameCodec.humanReadableVersion(setupFrame))).doFinally(signalType -> {
                setupFrame.release();
                multiplexer.dispose();
            });
        }
        boolean bl = leaseEnabled = this.leasesSupplier != null;
        if (SetupFrameCodec.honorLease(setupFrame) && !leaseEnabled) {
            return serverSetup.sendError(multiplexer, new InvalidSetupException("lease is not supported")).doFinally(signalType -> {
                setupFrame.release();
                multiplexer.dispose();
            });
        }
        return serverSetup.acceptRSocketSetup(setupFrame, multiplexer, (keepAliveHandler, wrappedMultiplexer) -> {
            DefaultConnectionSetupPayload setupPayload = new DefaultConnectionSetupPayload(setupFrame);
            Leases<?> leases = leaseEnabled ? this.leasesSupplier.get() : null;
            RequesterLeaseHandler requesterLeaseHandler = leaseEnabled ? new RequesterLeaseHandler.Impl(SERVER_TAG, leases.receiver()) : RequesterLeaseHandler.None;
            RSocketRequester rSocketRequester = new RSocketRequester(wrappedMultiplexer.asServerConnection(), this.payloadDecoder, StreamIdSupplier.serverSupplier(), this.mtu, ((ConnectionSetupPayload)setupPayload).keepAliveInterval(), ((ConnectionSetupPayload)setupPayload).keepAliveMaxLifetime(), (KeepAliveHandler)keepAliveHandler, requesterLeaseHandler, Schedulers.single((Scheduler)Schedulers.parallel()));
            RSocket wrappedRSocketRequester = this.interceptors.initRequester(rSocketRequester);
            return this.interceptors.initSocketAcceptor(this.acceptor).accept(setupPayload, wrappedRSocketRequester).onErrorResume(err -> serverSetup.sendError(multiplexer, this.rejectedSetupError((Throwable)err)).then(Mono.error((Throwable)err))).doOnNext(rSocketHandler -> {
                RSocket wrappedRSocketHandler = this.interceptors.initResponder((RSocket)rSocketHandler);
                DuplexConnection connection = wrappedMultiplexer.asClientConnection();
                ResponderLeaseHandler responderLeaseHandler = leaseEnabled ? new ResponderLeaseHandler.Impl<LeaseStats>(SERVER_TAG, connection.alloc(), leases.sender(), leases.stats()) : ResponderLeaseHandler.None;
                RSocketResponder rSocketResponder = new RSocketResponder(connection, wrappedRSocketHandler, this.payloadDecoder, responderLeaseHandler, this.mtu);
            }).doFinally(signalType -> setupPayload.release()).then();
        });
    }

    private ServerSetup serverSetup() {
        return this.resume != null ? this.createSetup() : new ServerSetup.DefaultServerSetup();
    }

    ServerSetup createSetup() {
        return new ServerSetup.ResumableServerSetup(new SessionManager(), this.resume.getSessionDuration(), this.resume.getStreamTimeout(), this.resume.getStoreFactory(SERVER_TAG), this.resume.isCleanupStoreOnKeepAlive());
    }

    private Exception rejectedSetupError(Throwable err) {
        String msg = err.getMessage();
        return new RejectedSetupException(msg == null ? "rejected by server acceptor" : msg);
    }
}

