/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.event;

import io.axoniq.axonserver.connector.AxonServerConnection;
import io.axoniq.axonserver.connector.ResultStream;
import io.axoniq.axonserver.connector.event.DcbEventChannel;
import io.axoniq.axonserver.grpc.event.dcb.AppendEventsResponse;
import io.axoniq.axonserver.grpc.event.dcb.SourceEventsRequest;
import io.axoniq.axonserver.grpc.event.dcb.SourceEventsResponse;
import io.axoniq.axonserver.grpc.event.dcb.StreamEventsRequest;
import io.axoniq.axonserver.grpc.event.dcb.StreamEventsResponse;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.axonframework.axonserver.connector.event.ConditionConverter;
import org.axonframework.axonserver.connector.event.SourcingEventMessageStream;
import org.axonframework.axonserver.connector.event.StreamingEventMessageStream;
import org.axonframework.axonserver.connector.event.TaggedEventConverter;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.eventsourcing.eventstore.AppendCondition;
import org.axonframework.eventsourcing.eventstore.AppendEventsTransactionRejectedException;
import org.axonframework.eventsourcing.eventstore.ConsistencyMarker;
import org.axonframework.eventsourcing.eventstore.EmptyAppendTransaction;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.eventsourcing.eventstore.GlobalIndexConsistencyMarker;
import org.axonframework.eventsourcing.eventstore.SourcingCondition;
import org.axonframework.eventsourcing.eventstore.TaggedEventMessage;
import org.axonframework.messaging.core.MessageStream;
import org.axonframework.messaging.core.unitofwork.ProcessingContext;
import org.axonframework.messaging.eventhandling.EventMessage;
import org.axonframework.messaging.eventhandling.conversion.EventConverter;
import org.axonframework.messaging.eventhandling.processing.streaming.token.GlobalSequenceTrackingToken;
import org.axonframework.messaging.eventhandling.processing.streaming.token.TrackingToken;
import org.axonframework.messaging.eventstreaming.StreamingCondition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AxonServerEventStorageEngine
implements EventStorageEngine {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final AxonServerConnection connection;
    private final TaggedEventConverter converter;

    public AxonServerEventStorageEngine(@Nonnull AxonServerConnection connection, @Nonnull EventConverter converter) {
        this.connection = Objects.requireNonNull(connection, "The Axon Server connection cannot be null.");
        this.converter = new TaggedEventConverter(converter);
    }

    public CompletableFuture<EventStorageEngine.AppendTransaction<?>> appendEvents(@Nonnull AppendCondition condition, @Nullable ProcessingContext context, @Nonnull List<TaggedEventMessage<?>> events) {
        if (events.isEmpty()) {
            return CompletableFuture.completedFuture(EmptyAppendTransaction.INSTANCE);
        }
        DcbEventChannel.AppendEventsTransaction appendTransaction = this.eventChannel().startTransaction(ConditionConverter.convertAppendCondition(condition));
        events.stream().map(this.converter::convertTaggedEventMessage).forEach(taggedEvent -> {
            if (logger.isDebugEnabled()) {
                logger.debug("Appended event [{}] with timestamp [{}].", (Object)taggedEvent.getEvent().getIdentifier(), (Object)taggedEvent.getEvent().getTimestamp());
            }
            appendTransaction.append(taggedEvent);
        });
        return CompletableFuture.completedFuture(new AxonServerAppendTransaction(appendTransaction));
    }

    public MessageStream<EventMessage> source(@Nonnull SourcingCondition condition, @Nullable ProcessingContext context) {
        if (logger.isDebugEnabled()) {
            logger.debug("Start sourcing events with condition [{}].", (Object)condition);
        }
        SourceEventsRequest sourcingRequest = ConditionConverter.convertSourcingCondition(condition);
        ResultStream sourcingStream = this.eventChannel().source(sourcingRequest);
        return new SourcingEventMessageStream((ResultStream<SourceEventsResponse>)sourcingStream, this.converter);
    }

    public MessageStream<EventMessage> stream(@Nonnull StreamingCondition condition, @Nullable ProcessingContext context) {
        if (logger.isDebugEnabled()) {
            logger.debug("Start streaming events with condition [{}].", (Object)condition);
        }
        StreamEventsRequest streamingRequest = ConditionConverter.convertStreamingCondition(condition);
        ResultStream stream = this.eventChannel().stream(streamingRequest);
        return new StreamingEventMessageStream((ResultStream<StreamEventsResponse>)stream, this.converter);
    }

    public CompletableFuture<TrackingToken> firstToken(@Nullable ProcessingContext context) {
        if (logger.isDebugEnabled()) {
            logger.debug("Operation firstToken() is invoked.");
        }
        return this.eventChannel().tail().thenApply(response -> new GlobalSequenceTrackingToken(response.getSequence()));
    }

    public CompletableFuture<TrackingToken> latestToken(@Nullable ProcessingContext context) {
        if (logger.isDebugEnabled()) {
            logger.debug("Operation latestToken() is invoked.");
        }
        return this.eventChannel().head().thenApply(response -> new GlobalSequenceTrackingToken(response.getSequence()));
    }

    public CompletableFuture<TrackingToken> tokenAt(@Nonnull Instant at, @Nullable ProcessingContext context) {
        if (logger.isDebugEnabled()) {
            logger.debug("Operation tokenAt() is invoked with Instant [{}].", (Object)at);
        }
        return this.eventChannel().getSequenceAt(at).thenApply(response -> new GlobalSequenceTrackingToken(response.getSequence()));
    }

    private DcbEventChannel eventChannel() {
        return this.connection.dcbEventChannel();
    }

    public void describeTo(@Nonnull ComponentDescriptor descriptor) {
        descriptor.describeProperty("connection", (Object)this.connection);
        descriptor.describeProperty("converter", (Object)this.converter);
    }

    private record AxonServerAppendTransaction(DcbEventChannel.AppendEventsTransaction appendTransaction) implements EventStorageEngine.AppendTransaction<AppendEventsResponse>
    {
        public CompletableFuture<AppendEventsResponse> commit(@Nullable ProcessingContext context) {
            logger.debug("Committing append event transaction...");
            return this.appendTransaction.commit().exceptionallyCompose(throwable -> {
                logger.warn("Committing append transaction failed.", throwable);
                return CompletableFuture.failedFuture((Throwable)new AppendEventsTransactionRejectedException(throwable.getMessage()));
            });
        }

        public CompletableFuture<ConsistencyMarker> afterCommit(@Nonnull AppendEventsResponse appendResponse, @Nullable ProcessingContext context) {
            long marker = appendResponse.getConsistencyMarker();
            logger.debug("Committing append transaction succeeded with marker [{}].", (Object)marker);
            return CompletableFuture.completedFuture(new GlobalIndexConsistencyMarker(marker));
        }

        public void rollback(@Nullable ProcessingContext context) {
            this.appendTransaction.rollback();
        }
    }
}

