/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.ResultStream;
import io.axoniq.axonserver.connector.impl.FlowControlledBuffer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public abstract class AbstractBufferedStream<T, R>
extends FlowControlledBuffer<T, R>
implements ResultStream<T> {
    public static final Runnable NO_OP = () -> {};
    private final AtomicReference<Runnable> onAvailableCallback = new AtomicReference<Runnable>(NO_OP);

    public AbstractBufferedStream(String clientId, int bufferSize, int refillBatch) {
        super(clientId, bufferSize, refillBatch);
    }

    @Override
    public T next() throws InterruptedException {
        return this.take();
    }

    @Override
    public T nextIfAvailable() {
        return this.tryTakeNow();
    }

    @Override
    public T nextIfAvailable(long timeout, TimeUnit unit) throws InterruptedException {
        return this.tryTake(timeout, unit);
    }

    @Override
    public void onNext(T value) {
        super.onNext(value);
        this.onAvailableCallback.get().run();
    }

    @Override
    public void onError(Throwable t) {
        super.onError(t);
        this.onAvailableCallback.get().run();
    }

    @Override
    public void onCompleted() {
        super.onCompleted();
        this.onAvailableCallback.get().run();
    }

    @Override
    public T peek() {
        return super.peek();
    }

    @Override
    public void onAvailable(Runnable callback) {
        if (callback == null) {
            this.onAvailableCallback.set(NO_OP);
        } else {
            this.onAvailableCallback.set(callback);
            if (this.peek() != null) {
                callback.run();
            }
        }
    }

    @Override
    public boolean isClosed() {
        return super.isClosed();
    }

    @Override
    public void close() {
        this.outboundStream().onCompleted();
    }
}

