/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.test;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.util.ByteBufPayload;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class TestRSocket
implements RSocket {
    private final String data;
    private final String metadata;
    private final AtomicLong observedInteractions = new AtomicLong();
    private final AtomicLong activeInteractions = new AtomicLong();

    public TestRSocket(String data, String metadata) {
        this.data = data;
        this.metadata = metadata;
    }

    public Mono<Payload> requestResponse(Payload payload) {
        this.activeInteractions.getAndIncrement();
        payload.release();
        this.observedInteractions.getAndIncrement();
        return Mono.just((Object)ByteBufPayload.create((String)this.data, (String)this.metadata)).doFinally(__ -> this.activeInteractions.getAndDecrement());
    }

    public Flux<Payload> requestStream(Payload payload) {
        this.activeInteractions.getAndIncrement();
        payload.release();
        this.observedInteractions.getAndIncrement();
        return Flux.range((int)1, (int)10000).map(l -> ByteBufPayload.create((String)this.data, (String)this.metadata)).doFinally(__ -> this.activeInteractions.getAndDecrement());
    }

    public Mono<Void> metadataPush(Payload payload) {
        this.activeInteractions.getAndIncrement();
        payload.release();
        this.observedInteractions.getAndIncrement();
        return Mono.empty().doFinally(__ -> this.activeInteractions.getAndDecrement());
    }

    public Mono<Void> fireAndForget(Payload payload) {
        this.activeInteractions.getAndIncrement();
        payload.release();
        this.observedInteractions.getAndIncrement();
        return Mono.empty().doFinally(__ -> this.activeInteractions.getAndDecrement());
    }

    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
        this.activeInteractions.getAndIncrement();
        this.observedInteractions.getAndIncrement();
        return Flux.from(payloads).doFinally(__ -> this.activeInteractions.getAndDecrement());
    }

    public boolean awaitAllInteractionTermination(Duration duration) {
        long activeNow;
        long end = duration.plusNanos(System.nanoTime()).toNanos();
        while ((activeNow = this.activeInteractions.get()) > 0L) {
            if (System.nanoTime() >= end) {
                return false;
            }
            LockSupport.parkNanos(100L);
        }
        return activeNow == 0L;
    }

    public boolean awaitUntilObserved(int interactions, Duration duration) {
        long observed;
        long end = duration.plusNanos(System.nanoTime()).toNanos();
        while ((observed = this.observedInteractions.get()) < (long)interactions) {
            if (System.nanoTime() >= end) {
                return false;
            }
            LockSupport.parkNanos(100L);
        }
        return observed >= (long)interactions;
    }
}

