/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.event.axon;

import io.axoniq.axonserver.grpc.event.EventWithToken;
import java.util.Iterator;
import java.util.Optional;
import java.util.Spliterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
import org.axonframework.axonserver.connector.event.axon.GrpcBackedDomainEventData;
import org.axonframework.axonserver.connector.event.axon.GrpcMetaDataAwareSerializer;
import org.axonframework.common.ObjectUtils;
import org.axonframework.eventhandling.DomainEventData;
import org.axonframework.eventhandling.EventUtils;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.TrackedDomainEventData;
import org.axonframework.eventhandling.TrackedEventData;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingEventStream;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcaster;
import org.axonframework.serialization.upcasting.event.NoOpEventUpcaster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventBuffer
implements TrackingEventStream {
    final Logger logger = LoggerFactory.getLogger(EventBuffer.class);
    private final BlockingQueue<TrackedEventData<byte[]>> events;
    private final Iterator<TrackedEventMessage<?>> eventStream;
    private TrackedEventData<byte[]> peekData;
    private TrackedEventMessage<?> peekEvent;
    private Consumer<EventBuffer> closeCallback;
    private volatile RuntimeException exception;
    private volatile boolean closed;
    private Consumer<Integer> consumeListener = i -> {};

    public EventBuffer(EventUpcaster upcasterChain, Serializer serializer) {
        this.events = new LinkedBlockingQueue<TrackedEventData<byte[]>>();
        this.eventStream = EventUtils.upcastAndDeserializeTrackedEvents(StreamSupport.stream(new SimpleSpliterator<TrackedEventData>(this::poll), false), (Serializer)new GrpcMetaDataAwareSerializer(serializer), (EventUpcaster)((EventUpcaster)ObjectUtils.getOrDefault((Object)upcasterChain, (Object)NoOpEventUpcaster.INSTANCE))).iterator();
    }

    private TrackedEventData<byte[]> poll() {
        if (this.peekData != null) {
            TrackedEventData<byte[]> nextItem = this.peekData;
            this.peekData = null;
            return nextItem;
        }
        TrackedEventData nextItem = (TrackedEventData)this.events.poll();
        if (nextItem != null) {
            this.consumeListener.accept(1);
        }
        return nextItem;
    }

    private void waitForData(long deadline) throws InterruptedException {
        long now = System.currentTimeMillis();
        if (this.peekData == null && now < deadline) {
            this.peekData = this.events.poll(deadline - now, TimeUnit.MILLISECONDS);
            if (this.peekData != null) {
                this.consumeListener.accept(1);
            }
        }
    }

    public void registerCloseListener(Consumer<EventBuffer> closeCallback) {
        this.closeCallback = closeCallback;
    }

    public void registerConsumeListener(Consumer<Integer> consumeListener) {
        this.consumeListener = consumeListener;
    }

    public Optional<TrackedEventMessage<?>> peek() {
        if (this.peekEvent == null && this.eventStream.hasNext()) {
            this.peekEvent = this.eventStream.next();
        }
        return Optional.ofNullable(this.peekEvent);
    }

    public boolean hasNextAvailable(int timeout, TimeUnit timeUnit) throws InterruptedException {
        long deadline = System.currentTimeMillis() + timeUnit.toMillis(timeout);
        try {
            while (this.peekEvent == null && !this.eventStream.hasNext() && System.currentTimeMillis() < deadline) {
                if (this.exception != null) {
                    RuntimeException runtimeException = this.exception;
                    this.exception = null;
                    throw runtimeException;
                }
                this.waitForData(deadline);
            }
            return this.peekEvent != null || this.eventStream.hasNext();
        }
        catch (InterruptedException e) {
            this.logger.warn("Consumer thread was interrupted. Returning thread to event processor.", (Throwable)e);
            Thread.currentThread().interrupt();
            return false;
        }
    }

    public TrackedEventMessage<?> nextAvailable() {
        try {
            this.hasNextAvailable(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
            TrackedEventMessage<?> trackedEventMessage = this.peekEvent == null ? this.eventStream.next() : this.peekEvent;
            return trackedEventMessage;
        }
        catch (InterruptedException e) {
            this.logger.warn("Consumer thread was interrupted. Returning thread to event processor.", (Throwable)e);
            Thread.currentThread().interrupt();
            TrackedEventMessage<?> trackedEventMessage = null;
            return trackedEventMessage;
        }
        finally {
            this.peekEvent = null;
        }
    }

    public void close() {
        this.closed = true;
        if (this.closeCallback != null) {
            this.closeCallback.accept(this);
        }
        this.events.clear();
    }

    public boolean push(EventWithToken event) {
        if (this.closed) {
            this.logger.debug("Received event while closed: {}", (Object)event.getToken());
            return false;
        }
        try {
            GlobalSequenceTrackingToken trackingToken = new GlobalSequenceTrackingToken(event.getToken());
            this.events.put((TrackedEventData<byte[]>)new TrackedDomainEventData((TrackingToken)trackingToken, (DomainEventData)new GrpcBackedDomainEventData(event.getEvent())));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.closeCallback.accept(this);
            return false;
        }
        return true;
    }

    public void fail(RuntimeException e) {
        this.exception = e;
    }

    private static class SimpleSpliterator<T>
    implements Spliterator<T> {
        private final Supplier<T> supplier;

        protected SimpleSpliterator(Supplier<T> supplier) {
            this.supplier = supplier;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            T nextValue = this.supplier.get();
            if (nextValue != null) {
                action.accept(nextValue);
            }
            return nextValue != null;
        }

        @Override
        public Spliterator<T> trySplit() {
            return null;
        }

        @Override
        public long estimateSize() {
            return Long.MAX_VALUE;
        }

        @Override
        public int characteristics() {
            return 1296;
        }
    }
}

