/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.internal;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.Closeable;
import io.rsocket.DuplexConnection;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.FrameUtil;
import io.rsocket.plugins.DuplexConnectionInterceptor;
import io.rsocket.plugins.InitializingInterceptorRegistry;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

public class ClientServerInputMultiplexer
implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger((String)"io.rsocket.FrameLogger");
    private static final InitializingInterceptorRegistry emptyInterceptorRegistry = new InitializingInterceptorRegistry();
    private final DuplexConnection setupConnection;
    private final DuplexConnection serverConnection;
    private final DuplexConnection clientConnection;
    private final DuplexConnection source;
    private final DuplexConnection clientServerConnection;
    private boolean setupReceived;

    public ClientServerInputMultiplexer(DuplexConnection source) {
        this(source, emptyInterceptorRegistry, false);
    }

    public ClientServerInputMultiplexer(DuplexConnection source, InitializingInterceptorRegistry registry, boolean isClient) {
        this.source = source;
        MonoProcessor setup = MonoProcessor.create();
        MonoProcessor server = MonoProcessor.create();
        MonoProcessor client = MonoProcessor.create();
        source = registry.initConnection(DuplexConnectionInterceptor.Type.SOURCE, source);
        this.setupConnection = registry.initConnection(DuplexConnectionInterceptor.Type.SETUP, new InternalDuplexConnection(source, setup));
        this.serverConnection = registry.initConnection(DuplexConnectionInterceptor.Type.SERVER, new InternalDuplexConnection(source, server));
        this.clientConnection = registry.initConnection(DuplexConnectionInterceptor.Type.CLIENT, new InternalDuplexConnection(source, client));
        this.clientServerConnection = new InternalDuplexConnection(source, client, server);
        source.receive().groupBy(frame -> {
            DuplexConnectionInterceptor.Type type;
            int streamId = FrameHeaderCodec.streamId(frame);
            if (streamId == 0) {
                switch (FrameHeaderCodec.frameType(frame)) {
                    case SETUP: 
                    case RESUME: 
                    case RESUME_OK: {
                        type = DuplexConnectionInterceptor.Type.SETUP;
                        this.setupReceived = true;
                        break;
                    }
                    case LEASE: 
                    case KEEPALIVE: 
                    case ERROR: {
                        type = isClient ? DuplexConnectionInterceptor.Type.CLIENT : DuplexConnectionInterceptor.Type.SERVER;
                        break;
                    }
                    default: {
                        type = isClient ? DuplexConnectionInterceptor.Type.SERVER : DuplexConnectionInterceptor.Type.CLIENT;
                        break;
                    }
                }
            } else {
                type = (streamId & 1) == 0 ? DuplexConnectionInterceptor.Type.SERVER : DuplexConnectionInterceptor.Type.CLIENT;
            }
            if (!isClient && type != DuplexConnectionInterceptor.Type.SETUP && !this.setupReceived) {
                frame.release();
                throw new IllegalStateException("SETUP or LEASE frame must be received before any others.");
            }
            return type;
        }).subscribe(group -> {
            switch ((DuplexConnectionInterceptor.Type)((Object)((Object)group.key()))) {
                case SETUP: {
                    setup.onNext(group);
                    break;
                }
                case SERVER: {
                    server.onNext(group);
                    break;
                }
                case CLIENT: {
                    client.onNext(group);
                }
            }
        }, ex -> {
            setup.onError(ex);
            server.onError(ex);
            client.onError(ex);
        });
    }

    public DuplexConnection asClientServerConnection() {
        return this.clientServerConnection;
    }

    public DuplexConnection asServerConnection() {
        return this.serverConnection;
    }

    public DuplexConnection asClientConnection() {
        return this.clientConnection;
    }

    public DuplexConnection asSetupConnection() {
        return this.setupConnection;
    }

    public void dispose() {
        this.source.dispose();
    }

    public boolean isDisposed() {
        return this.source.isDisposed();
    }

    @Override
    public Mono<Void> onClose() {
        return this.source.onClose();
    }

    private static class InternalDuplexConnection
    implements DuplexConnection {
        private final DuplexConnection source;
        private final MonoProcessor<Flux<ByteBuf>>[] processors;
        private final boolean debugEnabled;

        @SafeVarargs
        public InternalDuplexConnection(DuplexConnection source, MonoProcessor<Flux<ByteBuf>> ... processors) {
            this.source = source;
            this.processors = processors;
            this.debugEnabled = LOGGER.isDebugEnabled();
        }

        @Override
        public Mono<Void> send(Publisher<ByteBuf> frame) {
            if (this.debugEnabled) {
                frame = Flux.from(frame).doOnNext(f -> LOGGER.debug("sending -> " + FrameUtil.toString(f)));
            }
            return this.source.send((Publisher<ByteBuf>)frame);
        }

        @Override
        public Mono<Void> sendOne(ByteBuf frame) {
            if (this.debugEnabled) {
                LOGGER.debug("sending -> " + FrameUtil.toString(frame));
            }
            return this.source.sendOne(frame);
        }

        @Override
        public Flux<ByteBuf> receive() {
            return Flux.fromArray((Object[])this.processors).flatMap(p -> p.flatMapMany(f -> {
                if (this.debugEnabled) {
                    return f.doOnNext(frame -> LOGGER.debug("receiving -> " + FrameUtil.toString(frame)));
                }
                return f;
            }));
        }

        @Override
        public ByteBufAllocator alloc() {
            return this.source.alloc();
        }

        public void dispose() {
            this.source.dispose();
        }

        public boolean isDisposed() {
            return this.source.isDisposed();
        }

        @Override
        public Mono<Void> onClose() {
            return this.source.onClose();
        }

        @Override
        public double availability() {
            return this.source.availability();
        }
    }
}

