/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.collection.IntObjectHashMap;
import io.rsocket.DuplexConnection;
import io.rsocket.Frame;
import io.rsocket.FrameType;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.exceptions.ApplicationException;
import io.rsocket.internal.LimitableRequestPublisher;
import io.rsocket.internal.UnboundedProcessor;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Iterator;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.UnicastProcessor;

class RSocketServer
implements RSocket {
    private final DuplexConnection connection;
    private final RSocket requestHandler;
    private final Function<Frame, ? extends Payload> frameDecoder;
    private final Consumer<Throwable> errorConsumer;
    private final IntObjectHashMap<Subscription> sendingSubscriptions;
    private final IntObjectHashMap<UnicastProcessor<Payload>> channelProcessors;
    private final UnboundedProcessor<Frame> sendProcessor;
    private Disposable receiveDisposable;

    RSocketServer(DuplexConnection connection, RSocket requestHandler, Function<Frame, ? extends Payload> frameDecoder, Consumer<Throwable> errorConsumer) {
        this.connection = connection;
        this.requestHandler = requestHandler;
        this.frameDecoder = frameDecoder;
        this.errorConsumer = errorConsumer;
        this.sendingSubscriptions = new IntObjectHashMap();
        this.channelProcessors = new IntObjectHashMap();
        this.sendProcessor = new UnboundedProcessor();
        connection.send((Publisher<Frame>)this.sendProcessor).doOnError(this::handleSendProcessorError).doFinally(this::handleSendProcessorCancel).subscribe();
        this.receiveDisposable = connection.receive().flatMapSequential(frame -> this.handleFrame((Frame)frame).onErrorResume(t -> {
            errorConsumer.accept((Throwable)t);
            return Mono.empty();
        })).doOnError(errorConsumer).then().subscribe();
        this.connection.onClose().doOnError(errorConsumer).doFinally(s -> {
            this.cleanup();
            this.receiveDisposable.dispose();
        }).subscribe();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleSendProcessorError(Throwable t) {
        Collection values1;
        Collection values;
        Iterator iterator = this;
        synchronized (iterator) {
            values = this.sendingSubscriptions.values();
            values1 = this.channelProcessors.values();
        }
        for (Subscription subscription : values) {
            try {
                subscription.cancel();
            }
            catch (Throwable e) {
                this.errorConsumer.accept(e);
            }
        }
        for (Subscription subscription : values1) {
            try {
                subscription.cancel();
            }
            catch (Throwable e) {
                this.errorConsumer.accept(e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleSendProcessorCancel(SignalType t) {
        Collection values1;
        Collection values;
        if (SignalType.ON_ERROR == t) {
            return;
        }
        Iterator iterator = this;
        synchronized (iterator) {
            values = this.sendingSubscriptions.values();
            values1 = this.channelProcessors.values();
        }
        for (Subscription subscription : values) {
            try {
                subscription.cancel();
            }
            catch (Throwable e) {
                this.errorConsumer.accept(e);
            }
        }
        for (Subscription subscription : values1) {
            try {
                subscription.cancel();
            }
            catch (Throwable e) {
                this.errorConsumer.accept(e);
            }
        }
    }

    @Override
    public Mono<Void> fireAndForget(Payload payload) {
        try {
            return this.requestHandler.fireAndForget(payload);
        }
        catch (Throwable t) {
            return Mono.error((Throwable)t);
        }
    }

    @Override
    public Mono<Payload> requestResponse(Payload payload) {
        try {
            return this.requestHandler.requestResponse(payload);
        }
        catch (Throwable t) {
            return Mono.error((Throwable)t);
        }
    }

    @Override
    public Flux<Payload> requestStream(Payload payload) {
        try {
            return this.requestHandler.requestStream(payload);
        }
        catch (Throwable t) {
            return Flux.error((Throwable)t);
        }
    }

    @Override
    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
        try {
            return this.requestHandler.requestChannel(payloads);
        }
        catch (Throwable t) {
            return Flux.error((Throwable)t);
        }
    }

    @Override
    public Mono<Void> metadataPush(Payload payload) {
        try {
            return this.requestHandler.metadataPush(payload);
        }
        catch (Throwable t) {
            return Mono.error((Throwable)t);
        }
    }

    @Override
    public Mono<Void> close() {
        return this.connection.close();
    }

    @Override
    public Mono<Void> onClose() {
        return this.connection.onClose();
    }

    private void cleanup() {
        this.cleanUpSendingSubscriptions();
        this.cleanUpChannelProcessors();
        this.requestHandler.close().subscribe();
    }

    private synchronized void cleanUpSendingSubscriptions() {
        this.sendingSubscriptions.values().forEach(Subscription::cancel);
        this.sendingSubscriptions.clear();
    }

    private synchronized void cleanUpChannelProcessors() {
        this.channelProcessors.values().forEach(Subscription::cancel);
        this.channelProcessors.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Mono<Void> handleFrame(Frame frame) {
        try {
            int streamId = frame.getStreamId();
            switch (frame.getType()) {
                case FIRE_AND_FORGET: {
                    Mono<Void> mono = this.handleFireAndForget(streamId, this.fireAndForget(this.frameDecoder.apply(frame)));
                    return mono;
                }
                case REQUEST_RESPONSE: {
                    Mono<Void> mono = this.handleRequestResponse(streamId, this.requestResponse(this.frameDecoder.apply(frame)));
                    return mono;
                }
                case CANCEL: {
                    Mono<Void> mono = this.handleCancelFrame(streamId);
                    return mono;
                }
                case KEEPALIVE: {
                    Mono<Void> mono = this.handleKeepAliveFrame(frame);
                    return mono;
                }
                case REQUEST_N: {
                    Mono<Void> mono = this.handleRequestN(streamId, frame);
                    return mono;
                }
                case REQUEST_STREAM: {
                    Mono<Void> mono = this.handleStream(streamId, this.requestStream(this.frameDecoder.apply(frame)), Frame.Request.initialRequestN(frame));
                    return mono;
                }
                case REQUEST_CHANNEL: {
                    Mono<Void> mono = this.handleChannel(streamId, frame);
                    return mono;
                }
                case PAYLOAD: {
                    Mono mono = Mono.empty();
                    return mono;
                }
                case METADATA_PUSH: {
                    Mono<Void> mono = this.metadataPush(this.frameDecoder.apply(frame));
                    return mono;
                }
                case LEASE: {
                    Mono mono = Mono.empty();
                    return mono;
                }
                case NEXT: {
                    UnicastProcessor<Payload> receiver = this.getChannelProcessor(streamId);
                    if (receiver != null) {
                        receiver.onNext((Object)this.frameDecoder.apply(frame));
                    }
                    Mono mono = Mono.empty();
                    return mono;
                }
                case COMPLETE: {
                    UnicastProcessor<Payload> receiver = this.getChannelProcessor(streamId);
                    if (receiver != null) {
                        receiver.onComplete();
                    }
                    Mono mono = Mono.empty();
                    return mono;
                }
                case ERROR: {
                    UnicastProcessor<Payload> receiver = this.getChannelProcessor(streamId);
                    if (receiver != null) {
                        receiver.onError((Throwable)new ApplicationException(Frame.Error.message(frame)));
                    }
                    Mono mono = Mono.empty();
                    return mono;
                }
                case NEXT_COMPLETE: {
                    UnicastProcessor<Payload> receiver = this.getChannelProcessor(streamId);
                    if (receiver != null) {
                        receiver.onNext((Object)this.frameDecoder.apply(frame));
                        receiver.onComplete();
                    }
                    Mono mono = Mono.empty();
                    return mono;
                }
                case SETUP: {
                    Mono<Void> mono = this.handleError(streamId, new IllegalStateException("Setup frame received post setup."));
                    return mono;
                }
            }
            Mono<Void> mono = this.handleError(streamId, new IllegalStateException("ServerRSocket: Unexpected frame type: " + (Object)((Object)frame.getType())));
            return mono;
        }
        finally {
            frame.release();
        }
    }

    private Mono<Void> handleFireAndForget(int streamId, Mono<Void> result) {
        return result.doOnSubscribe(subscription -> this.addSubscription(streamId, (Subscription)subscription)).doOnError(this.errorConsumer).doFinally(signalType -> this.removeSubscription(streamId)).ignoreElement();
    }

    private Mono<Void> handleRequestResponse(int streamId, Mono<Payload> response) {
        return response.doOnSubscribe(subscription -> this.addSubscription(streamId, (Subscription)subscription)).map(payload -> {
            int flags = 64;
            if (payload.hasMetadata()) {
                flags = Frame.setFlag(flags, 256);
            }
            Frame frame = Frame.PayloadFrame.from(streamId, FrameType.NEXT_COMPLETE, payload, flags);
            payload.release();
            return frame;
        }).doOnError(this.errorConsumer).onErrorResume(t -> Mono.just((Object)Frame.Error.from(streamId, t))).doOnNext(this.sendProcessor::onNext).doFinally(signalType -> this.removeSubscription(streamId)).then();
    }

    private Mono<Void> handleStream(int streamId, Flux<Payload> response, int initialRequestN) {
        response.map(payload -> {
            Frame frame = Frame.PayloadFrame.from(streamId, FrameType.NEXT, payload);
            payload.release();
            return frame;
        }).transform(frameFlux -> {
            LimitableRequestPublisher frames = LimitableRequestPublisher.wrap(frameFlux);
            RSocketServer rSocketServer = this;
            synchronized (rSocketServer) {
                this.sendingSubscriptions.put(streamId, frames);
            }
            frames.increaseRequestLimit(initialRequestN);
            return frames;
        }).concatWith((Publisher)Mono.just((Object)Frame.PayloadFrame.from(streamId, FrameType.COMPLETE))).onErrorResume(t -> Mono.just((Object)Frame.Error.from(streamId, t))).doOnNext(this.sendProcessor::onNext).doFinally(signalType -> this.removeSubscription(streamId)).subscribe();
        return Mono.empty();
    }

    private Mono<Void> handleChannel(int streamId, Frame firstFrame) {
        UnicastProcessor frames = UnicastProcessor.create();
        this.addChannelProcessor(streamId, (UnicastProcessor<Payload>)frames);
        Flux payloads = frames.doOnCancel(() -> this.sendProcessor.onNext(Frame.Cancel.from(streamId))).doOnError(t -> this.sendProcessor.onNext(Frame.Error.from(streamId, t))).doOnRequest(l -> this.sendProcessor.onNext(Frame.RequestN.from(streamId, l))).doFinally(signalType -> this.removeChannelProcessor(streamId));
        frames.onNext((Object)this.frameDecoder.apply(firstFrame));
        return this.handleStream(streamId, this.requestChannel((Publisher<Payload>)payloads), Frame.Request.initialRequestN(firstFrame));
    }

    private Mono<Void> handleKeepAliveFrame(Frame frame) {
        return Mono.fromRunnable(() -> {
            if (Frame.Keepalive.hasRespondFlag(frame)) {
                ByteBuf data = Unpooled.wrappedBuffer((ByteBuffer)frame.getData());
                this.sendProcessor.onNext(Frame.Keepalive.from(data, false));
            }
        });
    }

    private Mono<Void> handleCancelFrame(int streamId) {
        return Mono.fromRunnable(() -> {
            Subscription subscription;
            RSocketServer rSocketServer = this;
            synchronized (rSocketServer) {
                subscription = (Subscription)this.sendingSubscriptions.remove(streamId);
            }
            if (subscription != null) {
                subscription.cancel();
            }
        });
    }

    private Mono<Void> handleError(int streamId, Throwable t) {
        return Mono.fromRunnable(() -> {
            this.errorConsumer.accept(t);
            this.sendProcessor.onNext(Frame.Error.from(streamId, t));
        });
    }

    private Mono<Void> handleRequestN(int streamId, Frame frame) {
        Subscription subscription = this.getSubscription(streamId);
        if (subscription != null) {
            int n = Frame.RequestN.requestN(frame);
            subscription.request(n >= Integer.MAX_VALUE ? Long.MAX_VALUE : (long)n);
        }
        return Mono.empty();
    }

    private synchronized void addSubscription(int streamId, Subscription subscription) {
        this.sendingSubscriptions.put(streamId, (Object)subscription);
    }

    @Nullable
    private synchronized Subscription getSubscription(int streamId) {
        return (Subscription)this.sendingSubscriptions.get(streamId);
    }

    private synchronized void removeSubscription(int streamId) {
        this.sendingSubscriptions.remove(streamId);
    }

    private synchronized void addChannelProcessor(int streamId, UnicastProcessor<Payload> processor) {
        this.channelProcessors.put(streamId, processor);
    }

    @Nullable
    private synchronized UnicastProcessor<Payload> getChannelProcessor(int streamId) {
        return (UnicastProcessor)this.channelProcessors.get(streamId);
    }

    private synchronized void removeChannelProcessor(int streamId) {
        this.channelProcessors.remove(streamId);
    }
}

