/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.event.impl;

import io.axoniq.axonserver.connector.event.AggregateEventStream;
import io.axoniq.axonserver.connector.impl.FlowControlledBuffer;
import io.axoniq.axonserver.connector.impl.StreamClosedException;
import io.axoniq.axonserver.connector.impl.StreamTimeoutException;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.event.Event;
import io.axoniq.axonserver.grpc.event.GetAggregateEventsRequest;
import java.util.concurrent.TimeUnit;

public class BufferedAggregateEventStream
extends FlowControlledBuffer<Event, GetAggregateEventsRequest>
implements AggregateEventStream {
    private static final Event TERMINAL_MESSAGE = Event.newBuilder().setAggregateSequenceNumber(-1729L).build();
    private static final int TAKE_TIMEOUT_MILLIS = Integer.parseInt(System.getProperty("AGGREGATE_TAKE_EVENT_TIMEOUT_MILLIS", "10000"));
    private Event peeked;
    private long lastSequenceNumber = -1L;

    public BufferedAggregateEventStream() {
        this(512, 16);
    }

    public BufferedAggregateEventStream(int bufferSize, int refillBatch) {
        super("unused", bufferSize, refillBatch);
    }

    @Override
    public Event next() throws InterruptedException {
        Event taken;
        if (this.peeked != null) {
            taken = this.peeked;
            this.peeked = null;
        } else {
            taken = (Event)this.take();
        }
        if (taken != null) {
            this.lastSequenceNumber = taken.getAggregateSequenceNumber();
        }
        return taken;
    }

    @Override
    public boolean hasNext() {
        Throwable errorResult;
        if (this.peeked != null) {
            return true;
        }
        try {
            this.peeked = (Event)this.tryTake(TAKE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS, true);
        }
        catch (InterruptedException e) {
            this.cancel();
            Thread.currentThread().interrupt();
            return false;
        }
        catch (StreamTimeoutException e) {
            throw new RuntimeException(String.format("Was unable to load aggregate due to timeout while waiting for events. Last sequence number received: %d", this.lastSequenceNumber), e);
        }
        if (this.peeked == null && (errorResult = this.getErrorResult()) != null) {
            throw new StreamClosedException(errorResult);
        }
        return this.peeked != null;
    }

    @Override
    public void cancel() {
        this.outboundStream().cancel("Request cancelled by client", null);
    }

    @Override
    protected GetAggregateEventsRequest buildFlowControlMessage(FlowControl flowControl) {
        return null;
    }

    @Override
    protected Event terminalMessage() {
        return TERMINAL_MESSAGE;
    }
}

