/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.AxonServerException;
import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.impl.FlowControlledStream;
import io.axoniq.axonserver.connector.impl.StreamClosedException;
import io.axoniq.axonserver.connector.impl.StreamTimeoutException;
import io.axoniq.axonserver.connector.impl.SynchronizedRequestStream;
import io.grpc.stub.ClientCallStreamObserver;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public abstract class FlowControlledBuffer<T, R>
extends FlowControlledStream<T, R> {
    private final BlockingQueue<T> buffer = new LinkedBlockingQueue<T>();
    private final AtomicReference<Throwable> errorResult = new AtomicReference();

    public FlowControlledBuffer(String clientId, int bufferSize, int refillBatch) {
        super(clientId, bufferSize, refillBatch);
    }

    protected abstract T terminalMessage();

    public void onNext(T value) {
        Objects.requireNonNull(value, "Next value of buffer is not allowed to be null");
        this.buffer.offer(value);
    }

    public void onError(Throwable t) {
        this.errorResult.set(t);
        this.buffer.offer(this.getAndValidateTerminalMessage());
    }

    public void onCompleted() {
        this.buffer.offer(this.getAndValidateTerminalMessage());
    }

    private T getAndValidateTerminalMessage() {
        T message = this.terminalMessage();
        Objects.requireNonNull(message, "Result of terminalMessage is not allowed to be null");
        return message;
    }

    public void close() {
        this.errorResult.set(new AxonServerException(ErrorCategory.OTHER, "Stream closed on client request", ""));
    }

    public Throwable getErrorResult() {
        return this.errorResult.get();
    }

    @Override
    public void beforeStart(ClientCallStreamObserver<R> requestStream) {
        SynchronizedRequestStream<R> synchronizedRequestStream = new SynchronizedRequestStream<R>(requestStream);
        super.beforeStart(synchronizedRequestStream);
    }

    protected T tryTakeNow() {
        Object taken = this.validate(this.buffer.poll(), true);
        if (taken != null) {
            this.markConsumed();
        }
        return (T)taken;
    }

    protected T tryTake(long timeout, TimeUnit timeUnit) throws InterruptedException {
        return this.tryTake(timeout, timeUnit, false);
    }

    protected T tryTake(long timeout, TimeUnit timeUnit, boolean exceptionOnTimeout) throws InterruptedException {
        T poll = this.buffer.poll(timeout, timeUnit);
        if (poll == null && exceptionOnTimeout) {
            throw new StreamTimeoutException("Timeout while trying to peek next event from the stream");
        }
        T taken = this.validate(poll, true);
        if (taken != null) {
            this.markConsumed();
        }
        return taken;
    }

    protected T tryTake() throws InterruptedException {
        T taken = this.validate(this.buffer.take(), true);
        if (taken != null) {
            this.markConsumed();
        }
        return taken;
    }

    protected T take() throws InterruptedException {
        T taken = this.validate(this.buffer.take(), false);
        this.markConsumed();
        return taken;
    }

    protected T peek() {
        return (T)this.validate(this.buffer.peek(), true);
    }

    private T validate(T peek, boolean nullOnTerminal) {
        if (this.terminalMessage().equals(peek)) {
            if (this.buffer.isEmpty()) {
                this.buffer.offer(this.getAndValidateTerminalMessage());
            }
            if (nullOnTerminal) {
                return null;
            }
            throw new StreamClosedException(this.errorResult.get());
        }
        return peek;
    }

    protected boolean isClosed() {
        return this.terminalMessage().equals(this.buffer.peek());
    }
}

