/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.test;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.ResourceLeakDetector;
import io.rsocket.Closeable;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketErrorException;
import io.rsocket.core.RSocketConnector;
import io.rsocket.core.RSocketServer;
import io.rsocket.core.Resume;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.plugins.DuplexConnectionInterceptor;
import io.rsocket.resume.InMemoryResumableFramesStore;
import io.rsocket.test.ByteBufRepresentation;
import io.rsocket.test.LeaksTrackingByteBufAllocator;
import io.rsocket.test.SlowTest;
import io.rsocket.test.TestRSocket;
import io.rsocket.test.TriFunction;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.ServerTransport;
import io.rsocket.util.ByteBufPayload;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.util.Logger;
import reactor.util.Loggers;

public interface TransportTest {
    public static final Logger logger = Loggers.getLogger(TransportTest.class);
    public static final String MOCK_DATA = "test-data";
    public static final String MOCK_METADATA = "metadata";
    public static final String LARGE_DATA = TransportTest.read("words.shakespeare.txt.gz");
    public static final Payload LARGE_PAYLOAD = ByteBufPayload.create((String)LARGE_DATA, (String)LARGE_DATA);

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public static String read(String resourceName) {
        try (BufferedReader br = new BufferedReader(new InputStreamReader(new GZIPInputStream(TransportTest.class.getClassLoader().getResourceAsStream(resourceName))));){
            String string = br.lines().map(String::toLowerCase).collect(Collectors.joining("\n\r"));
            return string;
        }
        catch (Throwable e) {
            throw new RuntimeException(e);
        }
    }

    @BeforeEach
    default public void setup() {
        Hooks.onOperatorDebug();
    }

    @AfterEach
    default public void close() {
        try {
            logger.debug("------------------Awaiting communication to finish------------------");
            this.getTransportPair().responder.awaitAllInteractionTermination(this.getTimeout());
            logger.debug("---------------------Disposing Client And Server--------------------");
            this.getTransportPair().dispose();
            this.getTransportPair().awaitClosed(this.getTimeout());
            logger.debug("------------------------Disposing Schedulers-------------------------");
            Schedulers.parallel().disposeGracefully().timeout(this.getTimeout(), Mono.empty()).block();
            Schedulers.boundedElastic().disposeGracefully().timeout(this.getTimeout(), Mono.empty()).block();
            Schedulers.single().disposeGracefully().timeout(this.getTimeout(), Mono.empty()).block();
            logger.debug("---------------------------Leaks Checking----------------------------");
            RuntimeException throwable = new RuntimeException(){

                @Override
                public synchronized Throwable fillInStackTrace() {
                    return this;
                }

                @Override
                public String getMessage() {
                    return Arrays.toString(this.getSuppressed());
                }
            };
            try {
                this.getTransportPair().byteBufAllocator2.assertHasNoLeaks();
            }
            catch (Throwable t) {
                throwable = Exceptions.addSuppressed((RuntimeException)throwable, (Throwable)t);
            }
            try {
                this.getTransportPair().byteBufAllocator1.assertHasNoLeaks();
            }
            catch (Throwable t) {
                throwable = Exceptions.addSuppressed((RuntimeException)throwable, (Throwable)t);
            }
            if (throwable.getSuppressed().length > 0) {
                throw throwable;
            }
        }
        finally {
            Hooks.resetOnOperatorDebug();
            Schedulers.resetOnHandleError();
        }
    }

    default public Payload createTestPayload(int metadataPresent) {
        String metadata1;
        switch (metadataPresent % 5) {
            case 0: {
                metadata1 = null;
                break;
            }
            case 1: {
                metadata1 = "";
                break;
            }
            default: {
                metadata1 = MOCK_METADATA;
            }
        }
        String metadata = metadata1;
        return ByteBufPayload.create((String)MOCK_DATA, (String)metadata);
    }

    @DisplayName(value="makes 10 fireAndForget requests")
    @Test
    default public void fireAndForget10() {
        ((StepVerifier.FirstStep)Flux.range((int)1, (int)10).flatMap(i -> this.getClient().fireAndForget(this.createTestPayload((int)i))).as(StepVerifier::create)).expectComplete().verify(this.getTimeout());
        this.getTransportPair().responder.awaitUntilObserved(10, this.getTimeout());
    }

    @DisplayName(value="makes 10 fireAndForget with Large Payload in Requests")
    @Test
    default public void largePayloadFireAndForget10() {
        ((StepVerifier.FirstStep)Flux.range((int)1, (int)10).flatMap(i -> this.getClient().fireAndForget(LARGE_PAYLOAD.retain())).as(StepVerifier::create)).expectComplete().verify(this.getTimeout());
        this.getTransportPair().responder.awaitUntilObserved(10, this.getTimeout());
    }

    default public RSocket getClient() {
        return this.getTransportPair().getClient();
    }

    public Duration getTimeout();

    public TransportPair getTransportPair();

    @DisplayName(value="makes 10 metadataPush requests")
    @Test
    default public void metadataPush10() {
        Assumptions.assumeThat((boolean)this.getTransportPair().withResumability).isFalse();
        ((StepVerifier.FirstStep)Flux.range((int)1, (int)10).flatMap(i -> this.getClient().metadataPush(ByteBufPayload.create((String)"", (String)"test-metadata"))).as(StepVerifier::create)).expectComplete().verify(this.getTimeout());
        this.getTransportPair().responder.awaitUntilObserved(10, this.getTimeout());
    }

    @DisplayName(value="makes 10 metadataPush with Large Metadata in requests")
    @Test
    default public void largePayloadMetadataPush10() {
        Assumptions.assumeThat((boolean)this.getTransportPair().withResumability).isFalse();
        ((StepVerifier.FirstStep)Flux.range((int)1, (int)10).flatMap(i -> this.getClient().metadataPush(ByteBufPayload.create((String)"", (String)LARGE_DATA))).as(StepVerifier::create)).expectComplete().verify(this.getTimeout());
        this.getTransportPair().responder.awaitUntilObserved(10, this.getTimeout());
    }

    @DisplayName(value="makes 1 requestChannel request with 0 payloads")
    @Test
    default public void requestChannel0() {
        ((StepVerifier.FirstStep)this.getClient().requestChannel((Publisher)Flux.empty()).as(StepVerifier::create)).expectErrorSatisfies(t -> ((AbstractThrowableAssert)Assertions.assertThat((Throwable)t).isInstanceOf(CancellationException.class)).hasMessage("Empty Source")).verify(this.getTimeout());
    }

    @DisplayName(value="makes 1 requestChannel request with 1 payloads")
    @Test
    default public void requestChannel1() {
        ((StepVerifier.FirstStep)this.getClient().requestChannel((Publisher)Mono.just((Object)this.createTestPayload(0))).doOnNext(ReferenceCounted::release).as(StepVerifier::create)).thenConsumeWhile((Predicate)new PayloadPredicate(1)).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 1 requestChannel request with 200,000 payloads")
    @Test
    default public void requestChannel200_000() {
        Flux payloads = Flux.range((int)0, (int)200000).map(this::createTestPayload);
        ((StepVerifier.FirstStep)this.getClient().requestChannel((Publisher)payloads).doOnNext(ReferenceCounted::release).limitRate(8).as(StepVerifier::create)).thenConsumeWhile((Predicate)new PayloadPredicate(200000)).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 1 requestChannel request with 50 large payloads")
    @Test
    default public void largePayloadRequestChannel50() {
        Flux payloads = Flux.range((int)0, (int)50).map(__ -> LARGE_PAYLOAD.retain());
        ((StepVerifier.FirstStep)this.getClient().requestChannel((Publisher)payloads).doOnNext(ReferenceCounted::release).as(StepVerifier::create)).thenConsumeWhile((Predicate)new PayloadPredicate(50)).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 1 requestChannel request with 20,000 payloads")
    @Test
    default public void requestChannel20_000() {
        Flux payloads = Flux.range((int)0, (int)20000).map(metadataPresent -> this.createTestPayload(7));
        ((StepVerifier.FirstStep)this.getClient().requestChannel((Publisher)payloads).doOnNext(this::assertChannelPayload).doOnNext(ReferenceCounted::release).as(StepVerifier::create)).thenConsumeWhile((Predicate)new PayloadPredicate(20000)).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 1 requestChannel request with 2,000,000 payloads")
    @SlowTest
    default public void requestChannel2_000_000() {
        Flux payloads = Flux.range((int)0, (int)2000000).map(this::createTestPayload);
        ((StepVerifier.FirstStep)this.getClient().requestChannel((Publisher)payloads).doOnNext(ReferenceCounted::release).limitRate(8).as(StepVerifier::create)).thenConsumeWhile((Predicate)new PayloadPredicate(2000000)).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 1 requestChannel request with 3 payloads")
    @Test
    default public void requestChannel3() {
        AtomicLong requested = new AtomicLong();
        Flux payloads = Flux.range((int)0, (int)3).doOnRequest(requested::addAndGet).map(this::createTestPayload);
        ((StepVerifier.FirstStep)this.getClient().requestChannel((Publisher)payloads).doOnNext(ReferenceCounted::release).as(publisher -> StepVerifier.create((Publisher)publisher, (long)3L))).thenConsumeWhile((Predicate)new PayloadPredicate(3)).expectComplete().verify(this.getTimeout());
        Assertions.assertThat((long)requested.get()).isEqualTo(3L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @DisplayName(value="makes 1 requestChannel request with 256 payloads")
    @Test
    default public void requestChannel256() {
        AtomicInteger counter = new AtomicInteger();
        Flux payloads = Flux.defer(() -> {
            int subscription = counter.getAndIncrement();
            return Flux.range((int)0, (int)256).map(i -> "S{" + subscription + "}: Data{" + i + "}").map(data -> ByteBufPayload.create((String)data));
        });
        Scheduler scheduler = Schedulers.fromExecutorService((ExecutorService)Executors.newFixedThreadPool(12));
        try {
            Flux.range((int)0, (int)1024).flatMap(v -> Mono.fromRunnable(() -> this.check((Flux<Payload>)payloads)).subscribeOn(scheduler), 12).blockLast();
        }
        finally {
            scheduler.disposeGracefully().block();
        }
    }

    default public void check(Flux<Payload> payloads) {
        ((StepVerifier.FirstStep)this.getClient().requestChannel(payloads).doOnNext(ReferenceCounted::release).limitRate(8).as(StepVerifier::create)).thenConsumeWhile((Predicate)new PayloadPredicate(256)).as("expected 256 items").expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 1 requestResponse request")
    @Test
    default public void requestResponse1() {
        ((StepVerifier.FirstStep)this.getClient().requestResponse(this.createTestPayload(1)).doOnNext(this::assertPayload).doOnNext(ReferenceCounted::release).as(StepVerifier::create)).expectNextCount(1L).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 10 requestResponse requests")
    @Test
    default public void requestResponse10() {
        ((StepVerifier.FirstStep)Flux.range((int)1, (int)10).flatMap(i -> this.getClient().requestResponse(this.createTestPayload((int)i)).doOnNext(v -> this.assertPayload((Payload)v)).doOnNext(ReferenceCounted::release)).as(StepVerifier::create)).expectNextCount(10L).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 100 requestResponse requests")
    @Test
    default public void requestResponse100() {
        ((StepVerifier.FirstStep)Flux.range((int)1, (int)100).flatMap(i -> this.getClient().requestResponse(this.createTestPayload((int)i)).doOnNext(ReferenceCounted::release)).as(StepVerifier::create)).expectNextCount(100L).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 50 requestResponse requests")
    @Test
    default public void largePayloadRequestResponse50() {
        ((StepVerifier.FirstStep)Flux.range((int)1, (int)50).flatMap(i -> this.getClient().requestResponse(LARGE_PAYLOAD.retain()).doOnNext(ReferenceCounted::release)).as(StepVerifier::create)).expectNextCount(50L).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 10,000 requestResponse requests")
    @Test
    default public void requestResponse10_000() {
        ((StepVerifier.FirstStep)Flux.range((int)1, (int)10000).flatMap(i -> this.getClient().requestResponse(this.createTestPayload((int)i)).doOnNext(ReferenceCounted::release)).as(StepVerifier::create)).expectNextCount(10000L).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 1 requestStream request and receives 10,000 responses")
    @Test
    default public void requestStream10_000() {
        ((StepVerifier.FirstStep)this.getClient().requestStream(this.createTestPayload(3)).doOnNext(this::assertPayload).doOnNext(ReferenceCounted::release).as(StepVerifier::create)).expectNextCount(10000L).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 1 requestStream request and receives 5 responses")
    @Test
    default public void requestStream5() {
        ((StepVerifier.FirstStep)this.getClient().requestStream(this.createTestPayload(3)).doOnNext(this::assertPayload).doOnNext(ReferenceCounted::release).take(5L).as(StepVerifier::create)).expectNextCount(5L).expectComplete().verify(this.getTimeout());
    }

    @DisplayName(value="makes 1 requestStream request and consumes result incrementally")
    @Test
    default public void requestStreamDelayedRequestN() {
        ((StepVerifier.FirstStep)this.getClient().requestStream(this.createTestPayload(3)).take(10L).doOnNext(ReferenceCounted::release).as(StepVerifier::create)).thenRequest(5L).expectNextCount(5L).thenRequest(5L).expectNextCount(5L).expectComplete().verify(this.getTimeout());
    }

    default public void assertPayload(Payload p) {
        TransportPair transportPair = this.getTransportPair();
        if (!transportPair.expectedPayloadData().equals(p.getDataUtf8()) || !transportPair.expectedPayloadMetadata().equals(p.getMetadataUtf8())) {
            throw new IllegalStateException("Unexpected payload");
        }
    }

    default public void assertChannelPayload(Payload p) {
        if (!MOCK_DATA.equals(p.getDataUtf8()) || !MOCK_METADATA.equals(p.getMetadataUtf8())) {
            throw new IllegalStateException("Unexpected payload");
        }
    }

    public static class PayloadPredicate
    implements Predicate<Payload> {
        final int expectedCnt;
        int cnt;

        public PayloadPredicate(int expectedCnt) {
            this.expectedCnt = expectedCnt;
        }

        @Override
        public boolean test(Payload p) {
            boolean shouldConsume;
            boolean bl = shouldConsume = this.cnt++ < this.expectedCnt;
            if (!shouldConsume) {
                logger.info("Metadata: \n\r{}\n\rData:{}", new Object[]{p.hasMetadata() ? new ByteBufRepresentation().fallbackToStringOf(p.sliceMetadata()) : "Empty", new ByteBufRepresentation().fallbackToStringOf(p.sliceData())});
            }
            return shouldConsume;
        }
    }

    public static class TransportPair<T, S extends Closeable>
    implements Disposable {
        private static final String data = "hello world";
        private static final String metadata = "metadata";
        private final boolean withResumability;
        private final boolean runClientWithAsyncInterceptors;
        private final boolean runServerWithAsyncInterceptors;
        private final LeaksTrackingByteBufAllocator byteBufAllocator1 = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT, Duration.ofMinutes(1L), "Client");
        private final LeaksTrackingByteBufAllocator byteBufAllocator2 = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT, Duration.ofMinutes(1L), "Server");
        private final TestRSocket responder;
        private final RSocket client;
        private final S server;

        public TransportPair(Supplier<T> addressSupplier, TriFunction<T, S, ByteBufAllocator, ClientTransport> clientTransportSupplier, BiFunction<T, ByteBufAllocator, ServerTransport<S>> serverTransportSupplier) {
            this(addressSupplier, clientTransportSupplier, serverTransportSupplier, false);
        }

        public TransportPair(Supplier<T> addressSupplier, TriFunction<T, S, ByteBufAllocator, ClientTransport> clientTransportSupplier, BiFunction<T, ByteBufAllocator, ServerTransport<S>> serverTransportSupplier, boolean withRandomFragmentation) {
            this(addressSupplier, clientTransportSupplier, serverTransportSupplier, withRandomFragmentation, false);
        }

        public TransportPair(Supplier<T> addressSupplier, TriFunction<T, S, ByteBufAllocator, ClientTransport> clientTransportSupplier, BiFunction<T, ByteBufAllocator, ServerTransport<S>> serverTransportSupplier, boolean withRandomFragmentation, boolean withResumability) {
            ByteBufAllocator allocatorToSupply2;
            ByteBufAllocator allocatorToSupply1;
            Schedulers.onHandleError((t, e) -> e.printStackTrace());
            Schedulers.resetFactory();
            this.withResumability = withResumability;
            T address = addressSupplier.get();
            this.runClientWithAsyncInterceptors = ThreadLocalRandom.current().nextBoolean();
            this.runServerWithAsyncInterceptors = ThreadLocalRandom.current().nextBoolean();
            if (ResourceLeakDetector.getLevel() == ResourceLeakDetector.Level.ADVANCED || ResourceLeakDetector.getLevel() == ResourceLeakDetector.Level.PARANOID) {
                logger.info("Using LeakTrackingByteBufAllocator");
                allocatorToSupply1 = this.byteBufAllocator1;
                allocatorToSupply2 = this.byteBufAllocator2;
            } else {
                allocatorToSupply1 = ByteBufAllocator.DEFAULT;
                allocatorToSupply2 = ByteBufAllocator.DEFAULT;
            }
            this.responder = new TestRSocket(data, "metadata");
            RSocketServer rSocketServer = RSocketServer.create((setup, sendingSocket) -> Mono.just((Object)this.responder)).payloadDecoder(PayloadDecoder.ZERO_COPY).interceptors(registry -> {
                if (this.runServerWithAsyncInterceptors && !withResumability) {
                    logger.info("Perform Integration Test with Async Interceptors Enabled For Server");
                    registry.forConnection((type, duplexConnection) -> new AsyncDuplexConnection((DuplexConnection)duplexConnection, "server")).forSocketAcceptor(delegate -> (connectionSetupPayload, sendingSocket) -> delegate.accept(connectionSetupPayload, sendingSocket).subscribeOn(Schedulers.parallel()));
                }
                if (withResumability) {
                    registry.forConnection((type, duplexConnection) -> type == DuplexConnectionInterceptor.Type.SOURCE ? new DisconnectingDuplexConnection("Server", (DuplexConnection)duplexConnection, Duration.ofMillis(ThreadLocalRandom.current().nextInt(100, 1000))) : duplexConnection);
                }
            });
            if (withResumability) {
                rSocketServer.resume(new Resume().storeFactory(token -> new InMemoryResumableFramesStore("server", token, Integer.MAX_VALUE)));
            }
            if (withRandomFragmentation) {
                rSocketServer.fragment(ThreadLocalRandom.current().nextInt(256, 512));
            }
            this.server = (Closeable)rSocketServer.bind(serverTransportSupplier.apply(address, allocatorToSupply2)).block();
            RSocketConnector rSocketConnector = RSocketConnector.create().payloadDecoder(PayloadDecoder.ZERO_COPY).keepAlive(Duration.ofMillis(10L), Duration.ofHours(1L)).interceptors(registry -> {
                if (this.runClientWithAsyncInterceptors && !withResumability) {
                    logger.info("Perform Integration Test with Async Interceptors Enabled For Client");
                    registry.forConnection((type, duplexConnection) -> new AsyncDuplexConnection((DuplexConnection)duplexConnection, "client")).forSocketAcceptor(delegate -> (connectionSetupPayload, sendingSocket) -> delegate.accept(connectionSetupPayload, sendingSocket).subscribeOn(Schedulers.parallel()));
                }
                if (withResumability) {
                    registry.forConnection((type, duplexConnection) -> type == DuplexConnectionInterceptor.Type.SOURCE ? new DisconnectingDuplexConnection("Client", (DuplexConnection)duplexConnection, Duration.ofMillis(ThreadLocalRandom.current().nextInt(10, 1500))) : duplexConnection);
                }
            });
            if (withResumability) {
                rSocketConnector.resume(new Resume().storeFactory(token -> new InMemoryResumableFramesStore("client", token, Integer.MAX_VALUE)));
            }
            if (withRandomFragmentation) {
                rSocketConnector.fragment(ThreadLocalRandom.current().nextInt(256, 512));
            }
            this.client = (RSocket)rSocketConnector.connect(clientTransportSupplier.apply(address, this.server, allocatorToSupply1)).doOnError(Throwable::printStackTrace).block();
        }

        public void dispose() {
            logger.info("terminating transport pair");
            this.client.dispose();
        }

        RSocket getClient() {
            return this.client;
        }

        public String expectedPayloadData() {
            return data;
        }

        public String expectedPayloadMetadata() {
            return "metadata";
        }

        public void awaitClosed(Duration timeout) {
            logger.info("awaiting termination of transport pair");
            logger.info("wrappers combination: client{async=" + this.runClientWithAsyncInterceptors + "; resume=" + this.withResumability + "} server{async=" + this.runServerWithAsyncInterceptors + "; resume=" + this.withResumability + "}");
            this.client.onClose().doOnSubscribe(s -> logger.info("Client termination stage=onSubscribe(" + s + ")")).doOnEach(s -> logger.info("Client termination stage=" + s)).onErrorResume(t -> Mono.empty()).doOnTerminate(() -> logger.info("Client terminated. Terminating Server")).then(Mono.fromRunnable(() -> this.server.dispose())).then(this.server.onClose().doOnSubscribe(s -> logger.info("Server termination stage=onSubscribe(" + s + ")")).doOnEach(s -> logger.info("Server termination stage=" + s))).onErrorResume(t -> Mono.empty()).block(timeout);
            logger.info("TransportPair has been terminated");
        }

        private static class ByteBufReleaserOperator
        implements CoreSubscriber<ByteBuf>,
        Subscription,
        Fuseable.QueueSubscription<ByteBuf> {
            CoreSubscriber<? super ByteBuf> actual;
            final Sinks.Empty<Void> closeableMonoSink = Sinks.unsafe().empty();
            Subscription s;

            public void onSubscribe(Subscription s) {
                if (Operators.validate((Subscription)this.s, (Subscription)s)) {
                    this.s = s;
                    this.actual.onSubscribe((Subscription)this);
                }
            }

            public void onNext(ByteBuf buf) {
                try {
                    this.actual.onNext((Object)buf);
                }
                finally {
                    buf.release();
                }
            }

            Mono<Void> onClose() {
                return this.closeableMonoSink.asMono();
            }

            public void onError(Throwable t) {
                this.actual.onError(t);
                this.closeableMonoSink.tryEmitError(t);
            }

            public void onComplete() {
                this.actual.onComplete();
                this.closeableMonoSink.tryEmitEmpty();
            }

            public void request(long n) {
                this.s.request(n);
            }

            public void cancel() {
                this.s.cancel();
                this.closeableMonoSink.tryEmitEmpty();
            }

            public int requestFusion(int requestedMode) {
                return 0;
            }

            public ByteBuf poll() {
                throw new UnsupportedOperationException("Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.");
            }

            public int size() {
                throw new UnsupportedOperationException("Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.");
            }

            public boolean isEmpty() {
                throw new UnsupportedOperationException("Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.");
            }

            public void clear() {
                throw new UnsupportedOperationException("Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.");
            }

            public String toString() {
                return "ByteBufReleaserOperator{isActualPresent=" + (this.actual != null) + ", isSubscriptionPresent=" + (this.s != null) + '}';
            }
        }

        private static class DisconnectingDuplexConnection
        implements DuplexConnection {
            private final String tag;
            final DuplexConnection source;
            final Duration delay;
            final Disposable.Swap disposables = Disposables.swap();
            boolean receivedFirst;

            DisconnectingDuplexConnection(String tag, DuplexConnection source, Duration delay) {
                this.tag = tag;
                this.source = source;
                this.delay = delay;
            }

            public void dispose() {
                this.disposables.dispose();
                this.source.dispose();
            }

            public Mono<Void> onClose() {
                return this.source.onClose().doOnTerminate(() -> logger.info("[" + this + "] Source Connection is done"));
            }

            public void sendFrame(int streamId, ByteBuf frame) {
                this.source.sendFrame(streamId, frame);
            }

            public void sendErrorAndClose(RSocketErrorException errorException) {
                this.source.sendErrorAndClose(errorException);
            }

            public Flux<ByteBuf> receive() {
                return this.source.receive().doOnSubscribe(__ -> logger.warn("Tag {}. Subscribing Connection[{}]", new Object[]{this.tag, this.source.hashCode()})).doOnNext(bb -> {
                    if (!this.receivedFirst) {
                        this.receivedFirst = true;
                        this.disposables.replace(Mono.delay((Duration)this.delay).takeUntilOther((Publisher)this.source.onClose()).subscribe(__ -> {
                            logger.warn("Tag {}. Disposing Connection[{}]", new Object[]{this.tag, this.source.hashCode()});
                            this.source.dispose();
                        }));
                    }
                });
            }

            public ByteBufAllocator alloc() {
                return this.source.alloc();
            }

            public SocketAddress remoteAddress() {
                return this.source.remoteAddress();
            }

            public String toString() {
                return "DisconnectingDuplexConnection{tag='" + this.tag + '\'' + ", source=" + this.source + ", disposables=" + this.disposables + '}';
            }
        }

        private static class AsyncDuplexConnection
        implements DuplexConnection {
            private final DuplexConnection duplexConnection;
            private String tag;
            private final ByteBufReleaserOperator bufReleaserOperator;

            public AsyncDuplexConnection(DuplexConnection duplexConnection, String tag) {
                this.duplexConnection = duplexConnection;
                this.tag = tag;
                this.bufReleaserOperator = new ByteBufReleaserOperator();
            }

            public void sendFrame(int streamId, ByteBuf frame) {
                this.duplexConnection.sendFrame(streamId, frame);
            }

            public void sendErrorAndClose(RSocketErrorException e) {
                this.duplexConnection.sendErrorAndClose(e);
            }

            public Flux<ByteBuf> receive() {
                return this.duplexConnection.receive().doOnTerminate(() -> logger.info("[" + this + "] Receive is done before PO")).subscribeOn(Schedulers.boundedElastic()).doOnNext(ByteBuf::retain).publishOn(Schedulers.boundedElastic(), Integer.MAX_VALUE).doOnTerminate(() -> logger.info("[" + this + "] Receive is done after PO")).doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::safeRelease).transform(Operators.lift((__, actual) -> {
                    this.bufReleaserOperator.actual = actual;
                    return this.bufReleaserOperator;
                }));
            }

            public ByteBufAllocator alloc() {
                return this.duplexConnection.alloc();
            }

            public SocketAddress remoteAddress() {
                return this.duplexConnection.remoteAddress();
            }

            public Mono<Void> onClose() {
                return Mono.whenDelayError((Publisher[])new Publisher[]{this.duplexConnection.onClose().doOnTerminate(() -> logger.info("[" + this + "] Source Connection is done")), this.bufReleaserOperator.onClose().doOnTerminate(() -> logger.info("[" + this + "] BufferReleaser is done"))});
            }

            public void dispose() {
                this.duplexConnection.dispose();
            }

            public String toString() {
                return "AsyncDuplexConnection{duplexConnection=" + this.duplexConnection + ", tag='" + this.tag + '\'' + ", bufReleaserOperator=" + this.bufReleaserOperator + '}';
            }
        }
    }
}

