/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.internal;

import io.netty.buffer.ByteBuf;
import io.rsocket.DuplexConnection;
import io.rsocket.internal.UnboundedProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

public abstract class BaseDuplexConnection
implements DuplexConnection {
    protected MonoProcessor<Void> onClose = MonoProcessor.create();
    protected UnboundedProcessor sender = new UnboundedProcessor();

    public BaseDuplexConnection() {
        this.onClose.doFinally(s -> this.doOnClose()).subscribe();
    }

    @Override
    public void sendFrame(int streamId, ByteBuf frame) {
        if (streamId == 0) {
            this.sender.onNextPrioritized(frame);
        } else {
            this.sender.onNext(frame);
        }
    }

    protected abstract void doOnClose();

    @Override
    public final Mono<Void> onClose() {
        return this.onClose;
    }

    public final void dispose() {
        this.onClose.onComplete();
    }

    public final boolean isDisposed() {
        return this.onClose.isDisposed();
    }
}

