/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.internal;

import io.rsocket.Closeable;
import io.rsocket.DuplexConnection;
import io.rsocket.Frame;
import io.rsocket.framing.FrameType;
import io.rsocket.plugins.DuplexConnectionInterceptor;
import io.rsocket.plugins.PluginRegistry;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

public class ClientServerInputMultiplexer
implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger((String)"io.rsocket.FrameLogger");
    private final DuplexConnection streamZeroConnection;
    private final DuplexConnection serverConnection;
    private final DuplexConnection clientConnection;
    private final DuplexConnection source;

    public ClientServerInputMultiplexer(DuplexConnection source, PluginRegistry plugins) {
        this.source = source;
        MonoProcessor streamZero = MonoProcessor.create();
        MonoProcessor server = MonoProcessor.create();
        MonoProcessor client = MonoProcessor.create();
        source = plugins.applyConnection(DuplexConnectionInterceptor.Type.SOURCE, source);
        this.streamZeroConnection = plugins.applyConnection(DuplexConnectionInterceptor.Type.STREAM_ZERO, new InternalDuplexConnection(source, (MonoProcessor<Flux<Frame>>)streamZero));
        this.serverConnection = plugins.applyConnection(DuplexConnectionInterceptor.Type.SERVER, new InternalDuplexConnection(source, (MonoProcessor<Flux<Frame>>)server));
        this.clientConnection = plugins.applyConnection(DuplexConnectionInterceptor.Type.CLIENT, new InternalDuplexConnection(source, (MonoProcessor<Flux<Frame>>)client));
        source.receive().groupBy(frame -> {
            int streamId = frame.getStreamId();
            DuplexConnectionInterceptor.Type type = streamId == 0 ? (frame.getType() == FrameType.SETUP ? DuplexConnectionInterceptor.Type.STREAM_ZERO : DuplexConnectionInterceptor.Type.CLIENT) : ((streamId & 1) == 0 ? DuplexConnectionInterceptor.Type.SERVER : DuplexConnectionInterceptor.Type.CLIENT);
            return type;
        }).subscribe(group -> {
            switch ((DuplexConnectionInterceptor.Type)((Object)((Object)group.key()))) {
                case STREAM_ZERO: {
                    streamZero.onNext(group);
                    break;
                }
                case SERVER: {
                    server.onNext(group);
                    break;
                }
                case CLIENT: {
                    client.onNext(group);
                }
            }
        }, t -> {
            LOGGER.error("test", t);
            this.dispose();
        });
    }

    public DuplexConnection asServerConnection() {
        return this.serverConnection;
    }

    public DuplexConnection asClientConnection() {
        return this.clientConnection;
    }

    public DuplexConnection asStreamZeroConnection() {
        return this.streamZeroConnection;
    }

    public void dispose() {
        this.source.dispose();
    }

    public boolean isDisposed() {
        return this.source.isDisposed();
    }

    @Override
    public Mono<Void> onClose() {
        return this.source.onClose();
    }

    private static class InternalDuplexConnection
    implements DuplexConnection {
        private final DuplexConnection source;
        private final MonoProcessor<Flux<Frame>> processor;
        private final boolean debugEnabled;

        public InternalDuplexConnection(DuplexConnection source, MonoProcessor<Flux<Frame>> processor) {
            this.source = source;
            this.processor = processor;
            this.debugEnabled = LOGGER.isDebugEnabled();
        }

        @Override
        public Mono<Void> send(Publisher<Frame> frame) {
            if (this.debugEnabled) {
                frame = Flux.from(frame).doOnNext(f -> LOGGER.debug("sending -> " + f.toString()));
            }
            return this.source.send((Publisher<Frame>)frame);
        }

        @Override
        public Mono<Void> sendOne(Frame frame) {
            if (this.debugEnabled) {
                LOGGER.debug("sending -> " + frame.toString());
            }
            return this.source.sendOne(frame);
        }

        @Override
        public Flux<Frame> receive() {
            return this.processor.flatMapMany(f -> {
                if (this.debugEnabled) {
                    return f.doOnNext(frame -> LOGGER.debug("receiving -> " + frame.toString()));
                }
                return f;
            });
        }

        public void dispose() {
            this.source.dispose();
        }

        public boolean isDisposed() {
            return this.source.isDisposed();
        }

        @Override
        public Mono<Void> onClose() {
            return this.source.onClose();
        }

        @Override
        public double availability() {
            return this.source.availability();
        }
    }
}

