/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.IllegalReferenceCountException;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.collection.IntObjectMap;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.ResponderRSocket;
import io.rsocket.core.PayloadValidationUtils;
import io.rsocket.frame.CancelFrameCodec;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.FrameType;
import io.rsocket.frame.PayloadFrameCodec;
import io.rsocket.frame.RequestChannelFrameCodec;
import io.rsocket.frame.RequestNFrameCodec;
import io.rsocket.frame.RequestStreamFrameCodec;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.internal.SynchronizedIntObjectHashMap;
import io.rsocket.internal.UnboundedProcessor;
import io.rsocket.lease.ResponderLeaseHandler;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.UnicastProcessor;
import reactor.util.annotation.Nullable;

class RSocketResponder
implements RSocket {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketResponder.class);
    private static final Consumer<ReferenceCounted> DROPPED_ELEMENTS_CONSUMER = referenceCounted -> {
        if (referenceCounted.refCnt() > 0) {
            try {
                referenceCounted.release();
            }
            catch (IllegalReferenceCountException illegalReferenceCountException) {
                // empty catch block
            }
        }
    };
    private static final Exception CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
    private final DuplexConnection connection;
    private final RSocket requestHandler;
    private final ResponderRSocket responderRSocket;
    private final PayloadDecoder payloadDecoder;
    private final ResponderLeaseHandler leaseHandler;
    private final Disposable leaseHandlerDisposable;
    private volatile Throwable terminationError;
    private static final AtomicReferenceFieldUpdater<RSocketResponder, Throwable> TERMINATION_ERROR = AtomicReferenceFieldUpdater.newUpdater(RSocketResponder.class, Throwable.class, "terminationError");
    private final int mtu;
    private final int maxFrameLength;
    private final IntObjectMap<Subscription> sendingSubscriptions;
    private final IntObjectMap<Processor<Payload, Payload>> channelProcessors;
    private final UnboundedProcessor<ByteBuf> sendProcessor;
    private final ByteBufAllocator allocator;

    RSocketResponder(DuplexConnection connection, RSocket requestHandler, PayloadDecoder payloadDecoder, ResponderLeaseHandler leaseHandler, int mtu, int maxFrameLength) {
        this.connection = connection;
        this.allocator = connection.alloc();
        this.mtu = mtu;
        this.maxFrameLength = maxFrameLength;
        this.requestHandler = requestHandler;
        this.responderRSocket = requestHandler instanceof ResponderRSocket ? (ResponderRSocket)requestHandler : null;
        this.payloadDecoder = payloadDecoder;
        this.leaseHandler = leaseHandler;
        this.sendingSubscriptions = new SynchronizedIntObjectHashMap<Subscription>();
        this.channelProcessors = new SynchronizedIntObjectHashMap<Processor<Payload, Payload>>();
        this.sendProcessor = new UnboundedProcessor();
        connection.send((Publisher<ByteBuf>)this.sendProcessor).subscribe(null, this::handleSendProcessorError);
        connection.receive().subscribe(this::handleFrame, e -> {});
        this.leaseHandlerDisposable = leaseHandler.send(this.sendProcessor::onNextPrioritized);
        this.connection.onClose().subscribe(null, this::tryTerminateOnConnectionError, this::tryTerminateOnConnectionClose);
    }

    private void handleSendProcessorError(Throwable t) {
        this.sendingSubscriptions.values().forEach(subscription -> {
            block2: {
                try {
                    subscription.cancel();
                }
                catch (Throwable e) {
                    if (!LOGGER.isDebugEnabled()) break block2;
                    LOGGER.debug("Dropped exception", t);
                }
            }
        });
        this.channelProcessors.values().forEach(subscription -> {
            block2: {
                try {
                    subscription.onError(t);
                }
                catch (Throwable e) {
                    if (!LOGGER.isDebugEnabled()) break block2;
                    LOGGER.debug("Dropped exception", t);
                }
            }
        });
    }

    private void tryTerminateOnConnectionError(Throwable e) {
        this.tryTerminate(() -> e);
    }

    private void tryTerminateOnConnectionClose() {
        this.tryTerminate(() -> CLOSED_CHANNEL_EXCEPTION);
    }

    private void tryTerminate(Supplier<Throwable> errorSupplier) {
        Throwable e;
        if (this.terminationError == null && TERMINATION_ERROR.compareAndSet(this, null, e = errorSupplier.get())) {
            this.cleanup(e);
        }
    }

    @Override
    public Mono<Void> fireAndForget(Payload payload) {
        try {
            if (this.leaseHandler.useLease()) {
                return this.requestHandler.fireAndForget(payload);
            }
            payload.release();
            return Mono.error((Throwable)this.leaseHandler.leaseError());
        }
        catch (Throwable t) {
            return Mono.error((Throwable)t);
        }
    }

    @Override
    public Mono<Payload> requestResponse(Payload payload) {
        try {
            if (this.leaseHandler.useLease()) {
                return this.requestHandler.requestResponse(payload);
            }
            payload.release();
            return Mono.error((Throwable)this.leaseHandler.leaseError());
        }
        catch (Throwable t) {
            return Mono.error((Throwable)t);
        }
    }

    @Override
    public Flux<Payload> requestStream(Payload payload) {
        try {
            if (this.leaseHandler.useLease()) {
                return this.requestHandler.requestStream(payload);
            }
            payload.release();
            return Flux.error((Throwable)this.leaseHandler.leaseError());
        }
        catch (Throwable t) {
            return Flux.error((Throwable)t);
        }
    }

    @Override
    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
        try {
            if (this.leaseHandler.useLease()) {
                return this.requestHandler.requestChannel(payloads);
            }
            return Flux.error((Throwable)this.leaseHandler.leaseError());
        }
        catch (Throwable t) {
            return Flux.error((Throwable)t);
        }
    }

    private Flux<Payload> requestChannel(Payload payload, Publisher<Payload> payloads) {
        try {
            if (this.leaseHandler.useLease()) {
                return this.responderRSocket.requestChannel(payload, payloads);
            }
            payload.release();
            return Flux.error((Throwable)this.leaseHandler.leaseError());
        }
        catch (Throwable t) {
            return Flux.error((Throwable)t);
        }
    }

    @Override
    public Mono<Void> metadataPush(Payload payload) {
        try {
            return this.requestHandler.metadataPush(payload);
        }
        catch (Throwable t) {
            return Mono.error((Throwable)t);
        }
    }

    @Override
    public void dispose() {
        this.tryTerminate(() -> new CancellationException("Disposed"));
    }

    @Override
    public boolean isDisposed() {
        return this.connection.isDisposed();
    }

    @Override
    public Mono<Void> onClose() {
        return this.connection.onClose();
    }

    private void cleanup(Throwable e) {
        this.cleanUpSendingSubscriptions();
        this.cleanUpChannelProcessors(e);
        this.connection.dispose();
        this.leaseHandlerDisposable.dispose();
        this.requestHandler.dispose();
        this.sendProcessor.dispose();
    }

    private synchronized void cleanUpSendingSubscriptions() {
        this.sendingSubscriptions.values().forEach(Subscription::cancel);
        this.sendingSubscriptions.clear();
    }

    private synchronized void cleanUpChannelProcessors(Throwable e) {
        this.channelProcessors.values().forEach(payloadPayloadProcessor -> {
            try {
                payloadPayloadProcessor.onError(e);
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        });
        this.channelProcessors.clear();
    }

    private void handleFrame(ByteBuf frame) {
        try {
            int streamId = FrameHeaderCodec.streamId(frame);
            FrameType frameType = FrameHeaderCodec.frameType(frame);
            switch (frameType) {
                case REQUEST_FNF: {
                    this.handleFireAndForget(streamId, this.fireAndForget((Payload)this.payloadDecoder.apply(frame)));
                    break;
                }
                case REQUEST_RESPONSE: {
                    this.handleRequestResponse(streamId, this.requestResponse((Payload)this.payloadDecoder.apply(frame)));
                    break;
                }
                case CANCEL: {
                    this.handleCancelFrame(streamId);
                    break;
                }
                case REQUEST_N: {
                    this.handleRequestN(streamId, frame);
                    break;
                }
                case REQUEST_STREAM: {
                    long streamInitialRequestN = RequestStreamFrameCodec.initialRequestN(frame);
                    Payload streamPayload = (Payload)this.payloadDecoder.apply(frame);
                    this.handleStream(streamId, this.requestStream(streamPayload), streamInitialRequestN, null);
                    break;
                }
                case REQUEST_CHANNEL: {
                    long channelInitialRequestN = RequestChannelFrameCodec.initialRequestN(frame);
                    Payload channelPayload = (Payload)this.payloadDecoder.apply(frame);
                    this.handleChannel(streamId, channelPayload, channelInitialRequestN);
                    break;
                }
                case METADATA_PUSH: {
                    this.handleMetadataPush(this.metadataPush((Payload)this.payloadDecoder.apply(frame)));
                    break;
                }
                case PAYLOAD: {
                    break;
                }
                case NEXT: {
                    Subscriber receiver = (Subscriber)this.channelProcessors.get(streamId);
                    if (receiver == null) break;
                    receiver.onNext(this.payloadDecoder.apply(frame));
                    break;
                }
                case COMPLETE: {
                    Subscriber receiver = (Subscriber)this.channelProcessors.get(streamId);
                    if (receiver == null) break;
                    receiver.onComplete();
                    break;
                }
                case ERROR: {
                    Subscriber receiver = (Subscriber)this.channelProcessors.get(streamId);
                    if (receiver == null) break;
                    try {
                        receiver.onError((Throwable)io.rsocket.exceptions.Exceptions.from(streamId, frame));
                    }
                    catch (RuntimeException e) {
                        if (!Exceptions.isBubbling((Throwable)e) && !Exceptions.isErrorCallbackNotImplemented((Throwable)e) || !LOGGER.isDebugEnabled()) break;
                        Throwable unwrapped = Exceptions.unwrap((Throwable)e);
                        LOGGER.debug("Unhandled dropped exception", unwrapped);
                    }
                    break;
                }
                case NEXT_COMPLETE: {
                    Subscriber receiver = (Subscriber)this.channelProcessors.get(streamId);
                    if (receiver == null) break;
                    receiver.onNext(this.payloadDecoder.apply(frame));
                    receiver.onComplete();
                    break;
                }
                case SETUP: {
                    this.handleError(streamId, new IllegalStateException("Setup frame received post setup."));
                    break;
                }
                default: {
                    this.handleError(streamId, new IllegalStateException("ServerRSocket: Unexpected frame type: " + (Object)((Object)frameType)));
                }
            }
            ReferenceCountUtil.safeRelease((Object)frame);
        }
        catch (Throwable t) {
            ReferenceCountUtil.safeRelease((Object)frame);
            throw Exceptions.propagate((Throwable)t);
        }
    }

    private void handleFireAndForget(final int streamId, Mono<Void> result) {
        result.subscribe((CoreSubscriber)new BaseSubscriber<Void>(){

            protected void hookOnSubscribe(Subscription subscription) {
                RSocketResponder.this.sendingSubscriptions.put(streamId, (Object)subscription);
                subscription.request(Long.MAX_VALUE);
            }

            protected void hookOnError(Throwable throwable) {
            }

            protected void hookFinally(SignalType type) {
                RSocketResponder.this.sendingSubscriptions.remove(streamId);
            }
        });
    }

    private void handleRequestResponse(final int streamId, Mono<Payload> response) {
        BaseSubscriber<Payload> subscriber = new BaseSubscriber<Payload>(){
            private boolean isEmpty = true;

            protected void hookOnNext(Payload payload) {
                if (this.isEmpty) {
                    this.isEmpty = false;
                }
                if (!PayloadValidationUtils.isValid(RSocketResponder.this.mtu, payload, RSocketResponder.this.maxFrameLength)) {
                    payload.release();
                    this.cancel();
                    IllegalArgumentException t = new IllegalArgumentException("The payload is too big to send as a single frame with a 24-bit encoded length. Consider enabling fragmentation via RSocketFactory.");
                    RSocketResponder.this.handleError(streamId, t);
                    return;
                }
                ByteBuf byteBuf = PayloadFrameCodec.encodeNextCompleteReleasingPayload(RSocketResponder.this.allocator, streamId, payload);
                RSocketResponder.this.sendProcessor.onNext(byteBuf);
            }

            protected void hookOnError(Throwable throwable) {
                if (RSocketResponder.this.sendingSubscriptions.remove((Object)streamId, (Object)this)) {
                    RSocketResponder.this.handleError(streamId, throwable);
                }
            }

            protected void hookOnComplete() {
                if (this.isEmpty && RSocketResponder.this.sendingSubscriptions.remove((Object)streamId, (Object)this)) {
                    RSocketResponder.this.sendProcessor.onNext(PayloadFrameCodec.encodeComplete(RSocketResponder.this.allocator, streamId));
                }
            }
        };
        this.sendingSubscriptions.put(streamId, (Object)subscriber);
        response.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER).subscribe((CoreSubscriber)subscriber);
    }

    private void handleStream(final int streamId, Flux<Payload> response, final long initialRequestN, final @Nullable UnicastProcessor<Payload> requestChannel) {
        BaseSubscriber<Payload> subscriber = new BaseSubscriber<Payload>(){

            protected void hookOnSubscribe(Subscription s) {
                s.request(initialRequestN);
            }

            protected void hookOnNext(Payload payload) {
                try {
                    if (!PayloadValidationUtils.isValid(RSocketResponder.this.mtu, payload, RSocketResponder.this.maxFrameLength)) {
                        payload.release();
                        IllegalArgumentException t = new IllegalArgumentException("The payload is too big to send as a single frame with a 24-bit encoded length. Consider enabling fragmentation via RSocketFactory.");
                        this.cancelStream(t);
                        return;
                    }
                    ByteBuf byteBuf = PayloadFrameCodec.encodeNextReleasingPayload(RSocketResponder.this.allocator, streamId, payload);
                    RSocketResponder.this.sendProcessor.onNext(byteBuf);
                }
                catch (Throwable e) {
                    this.cancelStream(e);
                }
            }

            private void cancelStream(Throwable t) {
                if (requestChannel != null) {
                    RSocketResponder.this.channelProcessors.remove((Object)streamId, (Object)requestChannel);
                }
                this.cancel();
                RSocketResponder.this.handleError(streamId, t);
            }

            protected void hookOnComplete() {
                if (RSocketResponder.this.sendingSubscriptions.remove((Object)streamId, (Object)this)) {
                    RSocketResponder.this.sendProcessor.onNext(PayloadFrameCodec.encodeComplete(RSocketResponder.this.allocator, streamId));
                }
            }

            protected void hookOnError(Throwable throwable) {
                if (RSocketResponder.this.sendingSubscriptions.remove((Object)streamId, (Object)this)) {
                    if (requestChannel != null && !requestChannel.isDisposed() && RSocketResponder.this.channelProcessors.remove((Object)streamId, (Object)requestChannel)) {
                        try {
                            requestChannel.dispose();
                        }
                        catch (Throwable throwable2) {
                            // empty catch block
                        }
                    }
                    RSocketResponder.this.handleError(streamId, throwable);
                }
            }
        };
        this.sendingSubscriptions.put(streamId, (Object)subscriber);
        response.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER).subscribe((CoreSubscriber)subscriber);
    }

    private void handleChannel(final int streamId, Payload payload, long initialRequestN) {
        UnicastProcessor frames = UnicastProcessor.create();
        this.channelProcessors.put(streamId, (Object)frames);
        Flux payloads = frames.doOnRequest(new LongConsumer(){
            boolean first = true;

            @Override
            public void accept(long l) {
                long n;
                if (this.first) {
                    this.first = false;
                    n = l - 1L;
                } else {
                    n = l;
                }
                if (n > 0L) {
                    RSocketResponder.this.sendProcessor.onNext(RequestNFrameCodec.encode(RSocketResponder.this.allocator, streamId, n));
                }
            }
        }).doFinally(signalType -> {
            if (this.channelProcessors.remove((Object)streamId, (Object)frames)) {
                Subscription subscription;
                if (signalType == SignalType.CANCEL) {
                    this.sendProcessor.onNext(CancelFrameCodec.encode(this.allocator, streamId));
                } else if (signalType == SignalType.ON_ERROR && (subscription = (Subscription)this.sendingSubscriptions.remove(streamId)) != null) {
                    subscription.cancel();
                }
            }
        }).doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
        frames.onNext((Object)payload);
        if (this.responderRSocket != null) {
            this.handleStream(streamId, this.requestChannel(payload, (Publisher<Payload>)payloads), initialRequestN, (UnicastProcessor<Payload>)frames);
        } else {
            this.handleStream(streamId, this.requestChannel((Publisher<Payload>)payloads), initialRequestN, (UnicastProcessor<Payload>)frames);
        }
    }

    private void handleMetadataPush(Mono<Void> result) {
        result.subscribe((CoreSubscriber)new BaseSubscriber<Void>(){

            protected void hookOnSubscribe(Subscription subscription) {
                subscription.request(Long.MAX_VALUE);
            }

            protected void hookOnError(Throwable throwable) {
            }
        });
    }

    private void handleCancelFrame(int streamId) {
        Subscription subscription = (Subscription)this.sendingSubscriptions.remove(streamId);
        Processor processor = (Processor)this.channelProcessors.remove(streamId);
        if (processor != null) {
            try {
                processor.onError((Throwable)new CancellationException("Disposed"));
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        if (subscription != null) {
            subscription.cancel();
        }
    }

    private void handleError(int streamId, Throwable t) {
        this.sendProcessor.onNext(ErrorFrameCodec.encode(this.allocator, streamId, t));
    }

    private void handleRequestN(int streamId, ByteBuf frame) {
        Subscription subscription = (Subscription)this.sendingSubscriptions.get(streamId);
        if (subscription != null) {
            long n = RequestNFrameCodec.requestN(frame);
            subscription.request(n);
        }
    }
}

