package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.frame.SetupFrameCodec;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.keepalive.KeepAliveHandler;
import io.rsocket.lease.TrackingLeaseSender;
import io.rsocket.plugins.DuplexConnectionInterceptor;
import io.rsocket.plugins.InitializingInterceptorRegistry;
import io.rsocket.plugins.InterceptorRegistry;
import io.rsocket.plugins.RequestInterceptor;
import io.rsocket.resume.ClientRSocketSession;
import io.rsocket.resume.ResumableDuplexConnection;
import io.rsocket.resume.ResumableFramesStore;
import io.rsocket.transport.ClientTransport;
import io.rsocket.util.DefaultPayload;
import io.rsocket.util.EmptyPayload;
import java.time.Duration;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuples;
import reactor.util.retry.Retry;

/* loaded from: input_file:BOOT-INF/lib/rsocket-core-1.1.4.jar:io/rsocket/core/RSocketConnector.class */
public class RSocketConnector {
    private static final String CLIENT_TAG = "client";
    private static final BiConsumer<RSocket, Invalidatable> INVALIDATE_FUNCTION = (rSocket, invalidatable) -> {
        Mono<Void> onClose = rSocket.onClose();
        Consumer<? super Throwable> consumer = th -> {
            invalidatable.invalidate();
        };
        invalidatable.getClass();
        onClose.subscribe(null, consumer, invalidatable::invalidate);
    };

    @Nullable
    private SocketAcceptor acceptor;
    private Retry retrySpec;
    private Resume resume;

    @Nullable
    private Consumer<LeaseSpec> leaseConfigurer;
    private Mono<Payload> setupPayloadMono = Mono.empty();
    private String metadataMimeType = "application/binary";
    private String dataMimeType = "application/binary";
    private Duration keepAliveInterval = Duration.ofSeconds(20);
    private Duration keepAliveMaxLifeTime = Duration.ofSeconds(90);
    private InitializingInterceptorRegistry interceptors = new InitializingInterceptorRegistry();
    private int mtu = 0;
    private int maxInboundPayloadSize = Integer.MAX_VALUE;
    private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;

    private RSocketConnector() {
    }

    public static RSocketConnector create() {
        return new RSocketConnector();
    }

    public static Mono<RSocket> connectWith(ClientTransport clientTransport) {
        return create().connect(() -> {
            return clientTransport;
        });
    }

    public RSocketConnector setupPayload(Mono<Payload> mono) {
        this.setupPayloadMono = mono;
        return this;
    }

    public RSocketConnector setupPayload(Payload payload) {
        if (payload instanceof DefaultPayload) {
            this.setupPayloadMono = Mono.just(payload);
        } else {
            this.setupPayloadMono = Mono.just(DefaultPayload.create((Payload) Objects.requireNonNull(payload)));
            payload.release();
        }
        return this;
    }

    public RSocketConnector dataMimeType(String str) {
        this.dataMimeType = (String) Objects.requireNonNull(str);
        return this;
    }

    public RSocketConnector metadataMimeType(String str) {
        this.metadataMimeType = (String) Objects.requireNonNull(str);
        return this;
    }

    public RSocketConnector keepAlive(Duration duration, Duration duration2) {
        if (!duration.negated().isNegative()) {
            throw new IllegalArgumentException("`interval` for keepAlive must be > 0");
        }
        if (!duration2.negated().isNegative()) {
            throw new IllegalArgumentException("`maxLifeTime` for keepAlive must be > 0");
        }
        this.keepAliveInterval = duration;
        this.keepAliveMaxLifeTime = duration2;
        return this;
    }

    public RSocketConnector interceptors(Consumer<InterceptorRegistry> consumer) {
        consumer.accept(this.interceptors);
        return this;
    }

    public RSocketConnector acceptor(SocketAcceptor socketAcceptor) {
        this.acceptor = socketAcceptor;
        return this;
    }

    public RSocketConnector reconnect(Retry retry) {
        this.retrySpec = (Retry) Objects.requireNonNull(retry);
        return this;
    }

    public RSocketConnector resume(Resume resume) {
        this.resume = resume;
        return this;
    }

    public RSocketConnector lease() {
        return lease(leaseSpec -> {
        });
    }

    public RSocketConnector lease(Consumer<LeaseSpec> consumer) {
        this.leaseConfigurer = consumer;
        return this;
    }

    public RSocketConnector maxInboundPayloadSize(int i) {
        this.maxInboundPayloadSize = ReassemblyUtils.assertInboundPayloadSize(i);
        return this;
    }

    public RSocketConnector fragment(int i) {
        this.mtu = FragmentationUtils.assertMtu(i);
        return this;
    }

    public RSocketConnector payloadDecoder(PayloadDecoder payloadDecoder) {
        Objects.requireNonNull(payloadDecoder);
        this.payloadDecoder = payloadDecoder;
        return this;
    }

    public Mono<RSocket> connect(ClientTransport clientTransport) {
        return connect(() -> {
            return clientTransport;
        });
    }

    public Mono<RSocket> connect(Supplier<ClientTransport> supplier) {
        return (Mono) Mono.fromSupplier(supplier).flatMap(clientTransport -> {
            int maxFrameLength = clientTransport.maxFrameLength();
            Mono map = Mono.fromCallable(() -> {
                PayloadValidationUtils.assertValidateSetup(maxFrameLength, this.maxInboundPayloadSize, this.mtu);
                return clientTransport;
            }).flatMap(clientTransport -> {
                return clientTransport.connect();
            }).map(duplexConnection -> {
                return this.interceptors.initConnection(DuplexConnectionInterceptor.Type.SOURCE, duplexConnection);
            }).map(duplexConnection2 -> {
                return LoggingDuplexConnection.wrapIfEnabled(duplexConnection2);
            });
            return map.flatMap(duplexConnection3 -> {
                Mono doOnError = this.setupPayloadMono.defaultIfEmpty(EmptyPayload.INSTANCE).map(payload -> {
                    return Tuples.of(duplexConnection3, payload);
                }).doOnError(th -> {
                    duplexConnection3.dispose();
                });
                duplexConnection3.getClass();
                return doOnError.doOnCancel(duplexConnection3::dispose);
            }).flatMap(tuple2 -> {
                DuplexConnection duplexConnection4 = (DuplexConnection) tuple2.getT1();
                Payload payload = (Payload) tuple2.getT2();
                boolean z = this.leaseConfigurer != null;
                boolean z2 = this.resume != null;
                DefaultClientSetup defaultClientSetup = new DefaultClientSetup();
                ByteBuf byteBuf = z2 ? this.resume.getTokenSupplier().get() : Unpooled.EMPTY_BUFFER;
                ByteBuf encode = SetupFrameCodec.encode(duplexConnection4.alloc(), z, (int) this.keepAliveInterval.toMillis(), (int) this.keepAliveMaxLifeTime.toMillis(), byteBuf, this.metadataMimeType, this.dataMimeType, payload);
                duplexConnection4.sendFrame(0, encode.retainedSlice());
                ByteBuf byteBuf2 = byteBuf;
                return defaultClientSetup.init(duplexConnection4).flatMap(tuple2 -> {
                    KeepAliveHandler defaultKeepAliveHandler;
                    DuplexConnection duplexConnection5;
                    LeaseSpec leaseSpec;
                    RequesterLeaseTracker requesterLeaseTracker;
                    DuplexConnection duplexConnection6 = (DuplexConnection) tuple2.getT2();
                    InitializingInterceptorRegistry initializingInterceptorRegistry = this.interceptors;
                    if (z2) {
                        ResumableFramesStore apply = this.resume.getStoreFactory(CLIENT_TAG).apply(byteBuf2);
                        ResumableDuplexConnection resumableDuplexConnection = new ResumableDuplexConnection(CLIENT_TAG, byteBuf2, duplexConnection6, apply);
                        ResumableClientSetup resumableClientSetup = new ResumableClientSetup();
                        resumableClientSetup.getClass();
                        ClientRSocketSession clientRSocketSession = new ClientRSocketSession(byteBuf2, resumableDuplexConnection, map, resumableClientSetup::init, apply, this.resume.getSessionDuration(), this.resume.getRetry(), this.resume.isCleanupStoreOnKeepAlive());
                        defaultKeepAliveHandler = new KeepAliveHandler.ResumableKeepAliveHandler(resumableDuplexConnection, clientRSocketSession, clientRSocketSession);
                        duplexConnection5 = resumableDuplexConnection;
                    } else {
                        defaultKeepAliveHandler = new KeepAliveHandler.DefaultKeepAliveHandler();
                        duplexConnection5 = duplexConnection6;
                    }
                    ClientServerInputMultiplexer clientServerInputMultiplexer = new ClientServerInputMultiplexer(duplexConnection5, initializingInterceptorRegistry, true);
                    if (z) {
                        leaseSpec = new LeaseSpec();
                        this.leaseConfigurer.accept(leaseSpec);
                        requesterLeaseTracker = new RequesterLeaseTracker(CLIENT_TAG, leaseSpec.maxPendingRequests);
                    } else {
                        leaseSpec = null;
                        requesterLeaseTracker = null;
                    }
                    Sinks.Empty empty = Sinks.unsafe().empty();
                    Sinks.Empty empty2 = Sinks.unsafe().empty();
                    initializingInterceptorRegistry.getClass();
                    RSocket initRequester = initializingInterceptorRegistry.initRequester(new RSocketRequester(clientServerInputMultiplexer.asClientConnection(), this.payloadDecoder, StreamIdSupplier.clientSupplier(), this.mtu, maxFrameLength, this.maxInboundPayloadSize, (int) this.keepAliveInterval.toMillis(), (int) this.keepAliveMaxLifeTime.toMillis(), defaultKeepAliveHandler, initializingInterceptorRegistry::initRequesterRequestInterceptor, requesterLeaseTracker, empty, Mono.whenDelayError((Publisher<?>[]) new Publisher[]{empty2.asMono(), empty.asMono()})));
                    SocketAcceptor with = this.acceptor != null ? this.acceptor : SocketAcceptor.with(new RSocket() { // from class: io.rsocket.core.RSocketConnector.1
                    });
                    DefaultConnectionSetupPayload defaultConnectionSetupPayload = new DefaultConnectionSetupPayload(encode);
                    DuplexConnection duplexConnection7 = duplexConnection5;
                    LeaseSpec leaseSpec2 = leaseSpec;
                    return initializingInterceptorRegistry.initSocketAcceptor(with).accept(defaultConnectionSetupPayload, initRequester).map(rSocket -> {
                        Function function;
                        RSocket initResponder = initializingInterceptorRegistry.initResponder(rSocket);
                        ResponderLeaseTracker responderLeaseTracker = z ? new ResponderLeaseTracker(CLIENT_TAG, duplexConnection7, leaseSpec2.sender) : null;
                        DuplexConnection asServerConnection = clientServerInputMultiplexer.asServerConnection();
                        PayloadDecoder payloadDecoder = this.payloadDecoder;
                        int i = this.mtu;
                        int i2 = this.maxInboundPayloadSize;
                        if (z && (leaseSpec2.sender instanceof TrackingLeaseSender)) {
                            function = rSocket -> {
                                return initializingInterceptorRegistry.initResponderRequestInterceptor(rSocket, (RequestInterceptor) leaseSpec2.sender);
                            };
                        } else {
                            initializingInterceptorRegistry.getClass();
                            function = rSocket2 -> {
                                return initializingInterceptorRegistry.initResponderRequestInterceptor(rSocket2, new RequestInterceptor[0]);
                            };
                        }
                        new RSocketResponder(asServerConnection, initResponder, payloadDecoder, responderLeaseTracker, i, maxFrameLength, i2, function, empty2);
                        return initRequester;
                    }).doFinally(signalType -> {
                        defaultConnectionSetupPayload.release();
                    });
                });
            });
        }).as(mono -> {
            return this.retrySpec != null ? new ReconnectMono(mono.retryWhen(this.retrySpec), (v0) -> {
                v0.dispose();
            }, INVALIDATE_FUNCTION) : mono;
        });
    }
}
