/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.test;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.RSocketErrorException;
import io.rsocket.frame.PayloadFrameCodec;
import io.rsocket.test.LeaksTrackingByteBufAllocator;
import java.net.SocketAddress;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;

public class TestDuplexConnection
implements DuplexConnection {
    final ByteBufAllocator allocator;
    final Sinks.Many<ByteBuf> inbound = Sinks.unsafe().many().unicast().onBackpressureError();
    final Sinks.Many<ByteBuf> outbound = Sinks.unsafe().many().unicast().onBackpressureError();
    final Sinks.One<Void> close = Sinks.one();

    public TestDuplexConnection(CoreSubscriber<? super ByteBuf> outboundSubscriber, boolean trackLeaks) {
        this.outbound.asFlux().subscribe(outboundSubscriber);
        this.allocator = trackLeaks ? LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT) : ByteBufAllocator.DEFAULT;
    }

    public void dispose() {
        this.inbound.tryEmitComplete();
        this.outbound.tryEmitComplete();
        this.close.tryEmitEmpty();
    }

    public Mono<Void> onClose() {
        return this.close.asMono();
    }

    public void sendErrorAndClose(RSocketErrorException errorException) {
    }

    public Flux<ByteBuf> receive() {
        return this.inbound.asFlux().transform(Operators.lift(ByteBufReleaserOperator::create));
    }

    public ByteBufAllocator alloc() {
        return this.allocator;
    }

    public SocketAddress remoteAddress() {
        return new SocketAddress(){

            public String toString() {
                return "Test";
            }
        };
    }

    public void sendFrame(int streamId, ByteBuf frame) {
        this.outbound.tryEmitNext((Object)frame);
    }

    public void sendPayloadFrame(int streamId, ByteBuf data, @Nullable ByteBuf metadata, boolean complete) {
        this.sendFrame(streamId, PayloadFrameCodec.encode((ByteBufAllocator)this.allocator, (int)streamId, (boolean)false, (boolean)complete, (boolean)true, (ByteBuf)metadata, (ByteBuf)data));
    }

    static class ByteBufReleaserOperator
    implements CoreSubscriber<ByteBuf>,
    Subscription,
    Fuseable.QueueSubscription<ByteBuf> {
        final CoreSubscriber<? super ByteBuf> actual;
        Subscription s;

        static CoreSubscriber<? super ByteBuf> create(Scannable scannable, CoreSubscriber<? super ByteBuf> actual) {
            return new ByteBufReleaserOperator(actual);
        }

        public ByteBufReleaserOperator(CoreSubscriber<? super ByteBuf> actual) {
            this.actual = actual;
        }

        public void onSubscribe(Subscription s) {
            if (Operators.validate((Subscription)this.s, (Subscription)s)) {
                this.s = s;
                this.actual.onSubscribe((Subscription)this);
            }
        }

        public void onNext(ByteBuf buf) {
            this.actual.onNext((Object)buf);
            buf.release();
        }

        public void onError(Throwable t) {
            this.actual.onError(t);
        }

        public void onComplete() {
            this.actual.onComplete();
        }

        public void request(long n) {
            this.s.request(n);
        }

        public void cancel() {
            this.s.cancel();
        }

        public int requestFusion(int requestedMode) {
            return 0;
        }

        public ByteBuf poll() {
            throw new UnsupportedOperationException("Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.");
        }

        public int size() {
            throw new UnsupportedOperationException("Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.");
        }

        public boolean isEmpty() {
            throw new UnsupportedOperationException("Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.");
        }

        public void clear() {
            throw new UnsupportedOperationException("Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.");
        }
    }
}

