/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.transport.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.concurrent.Future;
import io.rsocket.DuplexConnection;
import io.rsocket.frame.FrameLengthFlyweight;
import io.rsocket.transport.netty.SendPublisher;
import java.util.Objects;
import java.util.Queue;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.FutureMono;

public final class TcpDuplexConnection
implements DuplexConnection {
    private final Connection connection;
    private final Disposable channelClosed;
    private final ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
    private final boolean encodeLength;

    public TcpDuplexConnection(Connection connection) {
        this(connection, true);
    }

    public TcpDuplexConnection(Connection connection, boolean encodeLength) {
        this.encodeLength = encodeLength;
        this.connection = Objects.requireNonNull(connection, "connection must not be null");
        this.channelClosed = FutureMono.from((Future)connection.channel().closeFuture()).doFinally(s -> {
            if (!this.isDisposed()) {
                this.dispose();
            }
        }).subscribe();
    }

    public void dispose() {
        this.connection.dispose();
    }

    public boolean isDisposed() {
        return this.connection.isDisposed();
    }

    public Mono<Void> onClose() {
        return this.connection.onDispose().doFinally(s -> {
            if (!this.channelClosed.isDisposed()) {
                this.channelClosed.dispose();
            }
        });
    }

    public Flux<ByteBuf> receive() {
        return this.connection.inbound().receive().map(this::decode);
    }

    public Mono<Void> send(Publisher<ByteBuf> frames) {
        return Flux.from(frames).transform(frameFlux -> {
            if (frameFlux instanceof Fuseable.QueueSubscription) {
                Fuseable.QueueSubscription queueSubscription = (Fuseable.QueueSubscription)frameFlux;
                queueSubscription.requestFusion(2);
                return new SendPublisher<ByteBuf>((Queue<ByteBuf>)queueSubscription, (Publisher<ByteBuf>)frameFlux, this.connection.channel(), this::encode, ByteBuf::readableBytes);
            }
            return new SendPublisher<ByteBuf>((Publisher<ByteBuf>)frameFlux, this.connection.channel(), this::encode, ByteBuf::readableBytes);
        }).then();
    }

    private ByteBuf encode(ByteBuf frame) {
        if (this.encodeLength) {
            return FrameLengthFlyweight.encode((ByteBufAllocator)this.allocator, (int)frame.readableBytes(), (ByteBuf)frame).retain();
        }
        return frame;
    }

    private ByteBuf decode(ByteBuf frame) {
        if (this.encodeLength) {
            return FrameLengthFlyweight.frame((ByteBuf)frame).retain();
        }
        return frame;
    }
}

