/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.FlowControl;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.connector.impl.DisposableReadonlyBuffer;
import io.axoniq.axonserver.connector.impl.buffer.RoundRobinMultiReadonlyBuffer;
import io.axoniq.axonserver.grpc.ErrorMessage;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class FlowControlledReplyChannelWriter<T>
implements FlowControl {
    private final DisposableReadonlyBuffer<T> buffer;
    private final ReplyChannel<T> replyChannel;
    private final AtomicLong requestedRef = new AtomicLong();
    private final AtomicBoolean flowGate = new AtomicBoolean();
    private final AtomicBoolean completed = new AtomicBoolean();

    public FlowControlledReplyChannelWriter(List<? extends DisposableReadonlyBuffer<T>> sources, ReplyChannel<T> destination) {
        this(new RoundRobinMultiReadonlyBuffer(sources), destination);
    }

    public FlowControlledReplyChannelWriter(DisposableReadonlyBuffer<T> source, ReplyChannel<T> destination) {
        this.buffer = source;
        this.replyChannel = destination;
        source.onAvailable(this::stream);
    }

    @Override
    public void request(long requested) {
        if (requested <= 0L) {
            return;
        }
        this.requestedRef.getAndUpdate(current -> {
            try {
                return Math.addExact(requested, current);
            }
            catch (ArithmeticException e) {
                return Long.MAX_VALUE;
            }
        });
        this.stream();
    }

    @Override
    public void cancel() {
        this.buffer.dispose();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stream() {
        do {
            if (!this.flowGate.compareAndSet(false, true)) {
                return;
            }
            try {
                long currentPermits = this.requestedRef.get();
                long sent = this.send(currentPermits);
                this.requestedRef.getAndAccumulate(sent, (current, s) -> current - s);
            }
            finally {
                this.flowGate.set(false);
            }
        } while (this.requestedRef.get() > 0L && !this.buffer.isEmpty());
    }

    private long send(long maxMessages) {
        this.completeIfBufferIsClosedAndEmpty();
        int i = 0;
        while ((long)i < maxMessages) {
            Optional element = this.buffer.poll();
            if (!element.isPresent()) {
                this.completeIfBufferIsClosedAndEmpty();
                return i;
            }
            this.replyChannel.send(element.get());
            ++i;
        }
        return maxMessages;
    }

    private void completeIfBufferIsClosedAndEmpty() {
        if (this.buffer.closed() && this.buffer.isEmpty() && this.completed.compareAndSet(false, true)) {
            Optional<ErrorMessage> error = this.buffer.error();
            if (error.isPresent()) {
                this.replyChannel.completeWithError(error.get());
            } else {
                this.replyChannel.complete();
            }
        }
    }
}

