/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.impl.buffer;

import io.axoniq.axonserver.connector.FlowControl;
import io.axoniq.axonserver.connector.impl.CloseableReadonlyBuffer;
import io.axoniq.axonserver.connector.impl.DisposableReadonlyBuffer;
import io.axoniq.axonserver.grpc.ErrorMessage;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

public class FlowControlledDisposableReadonlyBuffer<T>
implements DisposableReadonlyBuffer<T> {
    private final FlowControl flowControl;
    private final CloseableReadonlyBuffer<T> buffer;
    private final AtomicBoolean started = new AtomicBoolean();

    public FlowControlledDisposableReadonlyBuffer(FlowControl flowControl, CloseableReadonlyBuffer<T> buffer) {
        this.flowControl = flowControl;
        this.buffer = buffer;
    }

    @Override
    public Optional<T> poll() {
        this.replenishIfNotStarted();
        Optional<Object> element = this.buffer.poll();
        element.ifPresent(e -> this.markConsumed());
        return element;
    }

    @Override
    public boolean isEmpty() {
        return this.buffer.isEmpty();
    }

    @Override
    public int capacity() {
        return this.buffer.capacity();
    }

    @Override
    public void onAvailable(Runnable onAvailable) {
        this.buffer.onAvailable(onAvailable);
    }

    @Override
    public void dispose() {
        this.flowControl.cancel();
    }

    @Override
    public boolean closed() {
        return this.buffer.closed();
    }

    @Override
    public Optional<ErrorMessage> error() {
        return this.buffer.error();
    }

    private void replenishIfNotStarted() {
        if (this.started.compareAndSet(false, true)) {
            this.flowControl.request(this.buffer.capacity());
        }
    }

    private void markConsumed() {
        if (!this.closed()) {
            this.flowControl.request(1L);
        }
    }
}

