/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.fragmentation;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.rsocket.DuplexConnection;
import io.rsocket.fragmentation.FrameFragmenter;
import io.rsocket.fragmentation.ReassemblyDuplexConnection;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.FrameType;
import java.util.Objects;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public final class FragmentationDuplexConnection
extends ReassemblyDuplexConnection
implements DuplexConnection {
    public static final int MIN_MTU_SIZE = 64;
    private static final Logger logger = LoggerFactory.getLogger(FragmentationDuplexConnection.class);
    final DuplexConnection delegate;
    final int mtu;
    final String type;

    public FragmentationDuplexConnection(DuplexConnection delegate, int mtu, int maxInboundPayloadSize, String type) {
        super(delegate, maxInboundPayloadSize);
        Objects.requireNonNull(delegate, "delegate must not be null");
        this.delegate = delegate;
        this.mtu = FragmentationDuplexConnection.assertMtu(mtu);
        this.type = type;
    }

    private boolean shouldFragment(FrameType frameType, int readableBytes) {
        return frameType.isFragmentable() && readableBytes > this.mtu;
    }

    public static int assertMtu(int mtu) {
        if (mtu > 0 && mtu < 64 || mtu < 0) {
            String msg = String.format("The smallest allowed mtu size is %d bytes, provided: %d", 64, mtu);
            throw new IllegalArgumentException(msg);
        }
        return mtu;
    }

    @Override
    public Mono<Void> send(Publisher<ByteBuf> frames) {
        return this.delegate.send((Publisher<ByteBuf>)Flux.from(frames).concatMap(frame -> {
            int readableBytes;
            FrameType frameType = FrameHeaderCodec.frameType(frame);
            if (!this.shouldFragment(frameType, readableBytes = frame.readableBytes())) {
                return Flux.just((Object)frame);
            }
            return this.logFragments((Flux<ByteBuf>)Flux.from(FrameFragmenter.fragmentFrame(this.alloc(), this.mtu, frame, frameType)));
        }));
    }

    @Override
    public Mono<Void> sendOne(ByteBuf frame) {
        int readableBytes;
        FrameType frameType = FrameHeaderCodec.frameType(frame);
        if (!this.shouldFragment(frameType, readableBytes = frame.readableBytes())) {
            return this.delegate.sendOne(frame);
        }
        Flux<ByteBuf> fragments = Flux.from(FrameFragmenter.fragmentFrame(this.alloc(), this.mtu, frame, frameType));
        fragments = this.logFragments(fragments);
        return this.delegate.send((Publisher<ByteBuf>)fragments);
    }

    protected Flux<ByteBuf> logFragments(Flux<ByteBuf> fragments) {
        if (logger.isDebugEnabled()) {
            fragments = fragments.doOnNext(byteBuf -> logger.debug("{} - stream id {} - frame type {} - \n {}", new Object[]{this.type, FrameHeaderCodec.streamId(byteBuf), FrameHeaderCodec.frameType(byteBuf), ByteBufUtil.prettyHexDump((ByteBuf)byteBuf)}));
        }
        return fragments;
    }
}

