/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.event.axon;

import io.axoniq.axonserver.connector.event.EventStream;
import io.axoniq.axonserver.grpc.event.EventWithToken;
import java.util.Iterator;
import java.util.Optional;
import java.util.Spliterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
import org.axonframework.axonserver.connector.event.axon.GrpcBackedDomainEventData;
import org.axonframework.axonserver.connector.event.axon.GrpcMetaDataAwareSerializer;
import org.axonframework.common.ObjectUtils;
import org.axonframework.eventhandling.DomainEventData;
import org.axonframework.eventhandling.EventUtils;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.TrackedDomainEventData;
import org.axonframework.eventhandling.TrackedEventData;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingEventStream;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.serialization.SerializedType;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.UnknownSerializedType;
import org.axonframework.serialization.upcasting.event.EventUpcaster;
import org.axonframework.serialization.upcasting.event.NoOpEventUpcaster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventBuffer
implements TrackingEventStream {
    private static final Logger logger = LoggerFactory.getLogger(EventBuffer.class);
    private static final int DEFAULT_POLLING_TIME_MILLIS = 500;
    private final Serializer serializer;
    private final EventStream delegate;
    private final boolean disableEventBlacklisting;
    private final Iterator<TrackedEventMessage<?>> eventStream;
    private TrackedEventMessage<?> peekEvent;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition dataAvailable = this.lock.newCondition();

    public EventBuffer(EventStream delegate, EventUpcaster upcasterChain, Serializer serializer, boolean disableEventBlacklisting) {
        this.serializer = serializer;
        this.delegate = delegate;
        this.disableEventBlacklisting = disableEventBlacklisting;
        this.eventStream = EventUtils.upcastAndDeserializeTrackedEvents(StreamSupport.stream(new SimpleSpliterator<TrackedEventData>(this::poll), false), (Serializer)new GrpcMetaDataAwareSerializer(serializer), (EventUpcaster)((EventUpcaster)ObjectUtils.getOrDefault((Object)upcasterChain, (Object)NoOpEventUpcaster.INSTANCE))).iterator();
        delegate.onAvailable(() -> {
            this.lock.lock();
            try {
                this.dataAvailable.signalAll();
            }
            finally {
                this.lock.unlock();
            }
        });
    }

    private TrackedEventData<byte[]> poll() {
        EventWithToken eventWithToken = (EventWithToken)this.delegate.nextIfAvailable();
        if (eventWithToken == null) {
            return null;
        }
        return this.convert(eventWithToken);
    }

    private TrackedEventData<byte[]> convert(EventWithToken eventWithToken) {
        GlobalSequenceTrackingToken trackingToken = new GlobalSequenceTrackingToken(eventWithToken.getToken());
        return new TrackedDomainEventData((TrackingToken)trackingToken, (DomainEventData)new GrpcBackedDomainEventData(eventWithToken.getEvent()));
    }

    public void blacklist(TrackedEventMessage<?> trackedEventMessage) {
        if (!this.disableEventBlacklisting) {
            SerializedType serializedType;
            if (UnknownSerializedType.class.equals((Object)trackedEventMessage.getPayloadType())) {
                UnknownSerializedType unknownSerializedType = (UnknownSerializedType)trackedEventMessage.getPayload();
                serializedType = unknownSerializedType.serializedType();
                this.delegate.excludePayloadType(serializedType.getName(), serializedType.getRevision());
            } else {
                serializedType = this.serializer.typeForClass(trackedEventMessage.getPayloadType());
            }
            this.delegate.excludePayloadType(serializedType.getName(), serializedType.getRevision());
        }
    }

    public Optional<TrackedEventMessage<?>> peek() {
        if (this.peekEvent == null && this.eventStream.hasNext()) {
            this.peekEvent = this.eventStream.next();
        }
        return Optional.ofNullable(this.peekEvent);
    }

    public boolean hasNextAvailable(int timeout, TimeUnit timeUnit) {
        long deadline = System.currentTimeMillis() + timeUnit.toMillis(timeout);
        try {
            do {
                long waitTime = deadline - System.currentTimeMillis();
                this.waitForData(waitTime);
            } while (this.peekEvent == null && System.currentTimeMillis() < deadline && !this.eventStream.hasNext());
            return this.peekEvent != null || this.eventStream.hasNext();
        }
        catch (InterruptedException e) {
            logger.warn("Consumer thread was interrupted. Returning thread to event processor.", (Throwable)e);
            Thread.currentThread().interrupt();
            return false;
        }
    }

    private void waitForData(long timeout) throws InterruptedException {
        if (this.delegate.peek() != null) {
            return;
        }
        if (timeout > 0L) {
            this.lock.lock();
            try {
                if (this.delegate.peek() == null) {
                    this.dataAvailable.await(Math.min(500L, timeout), TimeUnit.MILLISECONDS);
                }
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    public TrackedEventMessage<?> nextAvailable() {
        try {
            this.hasNextAvailable(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
            TrackedEventMessage<?> trackedEventMessage = this.peekEvent == null ? this.eventStream.next() : this.peekEvent;
            return trackedEventMessage;
        }
        finally {
            this.peekEvent = null;
        }
    }

    public void close() {
        this.delegate.close();
    }

    private static class SimpleSpliterator<T>
    implements Spliterator<T> {
        private final Supplier<T> supplier;

        protected SimpleSpliterator(Supplier<T> supplier) {
            this.supplier = supplier;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            T nextValue = this.supplier.get();
            if (nextValue != null) {
                action.accept(nextValue);
            }
            return nextValue != null;
        }

        @Override
        public Spliterator<T> trySplit() {
            return null;
        }

        @Override
        public long estimateSize() {
            return Long.MAX_VALUE;
        }

        @Override
        public int characteristics() {
            return 1296;
        }
    }
}

