/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.fragmentation;

import io.netty.util.collection.IntObjectHashMap;
import io.rsocket.DuplexConnection;
import io.rsocket.Frame;
import io.rsocket.fragmentation.FrameFragmenter;
import io.rsocket.fragmentation.FrameReassembler;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class FragmentationDuplexConnection
implements DuplexConnection {
    private final DuplexConnection source;
    private final IntObjectHashMap<FrameReassembler> frameReassemblers = new IntObjectHashMap();
    private final FrameFragmenter frameFragmenter;

    public FragmentationDuplexConnection(DuplexConnection source, int mtu) {
        this.source = source;
        this.frameFragmenter = new FrameFragmenter(mtu);
    }

    public static int getDefaultMTU() {
        if (Boolean.getBoolean("io.rsocket.fragmentation.enable")) {
            return Integer.getInteger("io.rsocket.fragmentation.mtu", 1024);
        }
        return 0;
    }

    @Override
    public double availability() {
        return this.source.availability();
    }

    @Override
    public Mono<Void> send(Publisher<Frame> frames) {
        return Flux.from(frames).concatMap(this::sendOne).then();
    }

    @Override
    public Mono<Void> sendOne(Frame frame) {
        if (this.frameFragmenter.shouldFragment(frame)) {
            return this.source.send((Publisher<Frame>)this.frameFragmenter.fragment(frame));
        }
        return this.source.sendOne(frame);
    }

    @Override
    public Flux<Frame> receive() {
        return this.source.receive().concatMap(frame -> {
            if (128 == (frame.flags() & 0x80)) {
                FrameReassembler frameReassembler = this.getFrameReassembler((Frame)frame);
                frameReassembler.append((Frame)frame);
                return Mono.empty();
            }
            if (this.frameReassemblersContain(frame.getStreamId())) {
                FrameReassembler frameReassembler = this.removeFrameReassembler(frame.getStreamId());
                frameReassembler.append((Frame)frame);
                Frame reassembled = frameReassembler.reassemble();
                return Mono.just((Object)reassembled);
            }
            return Mono.just((Object)frame);
        });
    }

    @Override
    public Mono<Void> close() {
        return this.source.close();
    }

    private synchronized FrameReassembler getFrameReassembler(Frame frame) {
        return (FrameReassembler)this.frameReassemblers.computeIfAbsent((Object)frame.getStreamId(), s -> new FrameReassembler(frame));
    }

    private synchronized FrameReassembler removeFrameReassembler(int streamId) {
        return (FrameReassembler)this.frameReassemblers.remove(streamId);
    }

    private synchronized boolean frameReassemblersContain(int streamId) {
        return this.frameReassemblers.containsKey(streamId);
    }

    @Override
    public Mono<Void> onClose() {
        return this.source.onClose().doFinally(s -> {
            FragmentationDuplexConnection fragmentationDuplexConnection = this;
            synchronized (fragmentationDuplexConnection) {
                this.frameReassemblers.values().forEach(FrameReassembler::dispose);
                this.frameReassemblers.clear();
            }
        });
    }
}

