/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.event.transformation.event.stream;

import io.axoniq.axonserver.connector.event.EventChannel;
import io.axoniq.axonserver.connector.event.EventStream;
import io.axoniq.axonserver.connector.impl.StreamClosedException;
import io.axoniq.axonserver.grpc.event.EventWithToken;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

public class TokenRangeEvents
implements Iterable<EventWithToken> {
    private final Supplier<EventChannel> eventChannel;
    private final long firstToken;
    private final long lastToken;

    public TokenRangeEvents(Supplier<EventChannel> eventChannel, long firstToken, long lastToken) {
        this.eventChannel = eventChannel;
        this.firstToken = firstToken;
        this.lastToken = lastToken;
    }

    private static EventWithToken next(EventStream eventStream) {
        try {
            return (EventWithToken)eventStream.next();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            eventStream.close();
            throw new RuntimeException(e);
        }
    }

    @Override
    public Iterator<EventWithToken> iterator() {
        return this.events(this.firstToken, this.lastToken);
    }

    private Iterator<EventWithToken> events(long firstToken, final long lastToken) {
        final EventStream eventStream = this.eventChannel.get().openStream(firstToken, 10);
        final AtomicReference<EventWithToken> nextRef = new AtomicReference<EventWithToken>(TokenRangeEvents.next(eventStream));
        return new Iterator<EventWithToken>(){

            @Override
            public boolean hasNext() {
                return ((EventWithToken)nextRef.get()).getToken() <= lastToken;
            }

            @Override
            public EventWithToken next() {
                if (!this.hasNext()) {
                    throw new NoSuchElementException();
                }
                if (eventStream.isClosed()) {
                    throw new StreamClosedException(eventStream.getError().orElse(null));
                }
                EventWithToken current = (EventWithToken)nextRef.get();
                if (current.getToken() == lastToken) {
                    nextRef.set(current.toBuilder().setToken(lastToken + 1L).build());
                    eventStream.close();
                } else {
                    nextRef.set(TokenRangeEvents.next(eventStream));
                }
                return current;
            }
        };
    }
}

