/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.event;

import io.axoniq.axonserver.connector.ResultStream;
import io.axoniq.axonserver.grpc.event.dcb.SequencedEvent;
import io.axoniq.axonserver.grpc.event.dcb.SourceEventsResponse;
import jakarta.annotation.Nonnull;
import java.lang.invoke.MethodHandles;
import java.util.Objects;
import java.util.Optional;
import org.axonframework.axonserver.connector.event.TaggedEventConverter;
import org.axonframework.common.annotation.Internal;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.TerminalEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventsourcing.eventstore.ConsistencyMarker;
import org.axonframework.eventsourcing.eventstore.GlobalIndexConsistencyMarker;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.SimpleEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class SourcingEventMessageStream
implements MessageStream<EventMessage> {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final ResultStream<SourceEventsResponse> stream;
    private final TaggedEventConverter converter;

    public SourcingEventMessageStream(@Nonnull ResultStream<SourceEventsResponse> stream, @Nonnull TaggedEventConverter converter) {
        this.stream = Objects.requireNonNull(stream, "The source result stream cannot be null.");
        this.converter = Objects.requireNonNull(converter, "The converter cannot be null.");
    }

    public Optional<MessageStream.Entry<EventMessage>> next() {
        SourceEventsResponse next = (SourceEventsResponse)this.stream.nextIfAvailable();
        if (next == null) {
            logger.debug("Reached the end of the source result stream.");
            return Optional.empty();
        }
        if (next.hasConsistencyMarker()) {
            logger.debug("Reached the consistency marker message of the source result stream.");
            return SourcingEventMessageStream.convertToMarkerEntry(next.getConsistencyMarker());
        }
        return this.convertToEventEntry(next.getEvent());
    }

    private Optional<MessageStream.Entry<EventMessage>> convertToEventEntry(SequencedEvent event) {
        EventMessage eventMessage = this.converter.convertEvent(event.getEvent());
        GlobalSequenceTrackingToken token = new GlobalSequenceTrackingToken(event.getSequence() + 1L);
        Context context = Context.with((Context.ResourceKey)TrackingToken.RESOURCE_KEY, (Object)token);
        return Optional.of(new SimpleEntry((Message)eventMessage, context));
    }

    private static Optional<MessageStream.Entry<EventMessage>> convertToMarkerEntry(long marker) {
        Context context = ConsistencyMarker.addToContext((Context)Context.empty(), (ConsistencyMarker)new GlobalIndexConsistencyMarker(marker));
        return Optional.of(new SimpleEntry((Message)TerminalEventMessage.INSTANCE, context));
    }

    public Optional<MessageStream.Entry<EventMessage>> peek() {
        SourceEventsResponse peeked = (SourceEventsResponse)this.stream.peek();
        if (peeked == null) {
            logger.debug("Peeked the end of the source result stream.");
            return Optional.empty();
        }
        if (peeked.hasConsistencyMarker()) {
            logger.debug("Peeked the consistency marker message of the source result stream.");
            return SourcingEventMessageStream.convertToMarkerEntry(peeked.getConsistencyMarker());
        }
        return this.convertToEventEntry(peeked.getEvent());
    }

    public void onAvailable(@Nonnull Runnable callback) {
        this.stream.onAvailable(callback);
    }

    public Optional<Throwable> error() {
        return this.stream.getError();
    }

    public boolean isCompleted() {
        return this.stream.isClosed();
    }

    public boolean hasNextAvailable() {
        return this.stream.peek() != null;
    }

    public void close() {
        this.stream.close();
    }
}

