/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.event;

import com.google.protobuf.ByteString;
import io.axoniq.axonserver.connector.AxonServerConnection;
import io.axoniq.axonserver.connector.event.AggregateEventStream;
import io.axoniq.axonserver.connector.event.AppendEventsTransaction;
import io.axoniq.axonserver.grpc.MetaDataValue;
import io.axoniq.axonserver.grpc.SerializedObject;
import io.axoniq.axonserver.grpc.event.Event;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.axonframework.axonserver.connector.MetadataConverter;
import org.axonframework.axonserver.connector.event.AxonServerMessageStream;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.TerminalEventMessage;
import org.axonframework.eventhandling.conversion.EventConverter;
import org.axonframework.eventhandling.processors.streaming.token.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.processors.streaming.token.TrackingToken;
import org.axonframework.eventsourcing.eventstore.AggregateBasedConsistencyMarker;
import org.axonframework.eventsourcing.eventstore.AggregateBasedEventStorageEngineUtils;
import org.axonframework.eventsourcing.eventstore.AggregateSequenceNumberPosition;
import org.axonframework.eventsourcing.eventstore.AppendCondition;
import org.axonframework.eventsourcing.eventstore.ConsistencyMarker;
import org.axonframework.eventsourcing.eventstore.EmptyAppendTransaction;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.eventsourcing.eventstore.Position;
import org.axonframework.eventsourcing.eventstore.SourcingCondition;
import org.axonframework.eventsourcing.eventstore.TaggedEventMessage;
import org.axonframework.eventstreaming.EventCriterion;
import org.axonframework.eventstreaming.StreamingCondition;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.LegacyResources;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.MessageType;
import org.axonframework.messaging.Metadata;
import org.axonframework.messaging.unitofwork.ProcessingContext;
import org.axonframework.serialization.Converter;

public class AggregateBasedAxonServerEventStorageEngine
implements EventStorageEngine {
    private final AxonServerConnection connection;
    private final EventConverter converter;

    public AggregateBasedAxonServerEventStorageEngine(@Nonnull AxonServerConnection connection, @Nonnull EventConverter converter) {
        this.connection = Objects.requireNonNull(connection, "The connection must not be null.");
        this.converter = Objects.requireNonNull(converter, "The converter must not be null.");
    }

    public CompletableFuture<EventStorageEngine.AppendTransaction<?>> appendEvents(@Nonnull AppendCondition condition, @Nullable ProcessingContext context, @Nonnull List<TaggedEventMessage<?>> events) {
        try {
            AggregateBasedEventStorageEngineUtils.assertValidTags(events);
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
        if (events.isEmpty()) {
            return CompletableFuture.completedFuture(EmptyAppendTransaction.INSTANCE);
        }
        final AggregateBasedConsistencyMarker consistencyMarker = AggregateBasedConsistencyMarker.from((AppendCondition)condition);
        final AggregateBasedEventStorageEngineUtils.AggregateSequencer aggregateSequencer = AggregateBasedEventStorageEngineUtils.AggregateSequencer.with((AggregateBasedConsistencyMarker)consistencyMarker);
        final AppendEventsTransaction tx = this.connection.eventChannel().startAppendEventsTransaction();
        try {
            events.forEach(taggedEvent -> {
                EventMessage event = taggedEvent.event();
                ByteString payloadData = ByteString.copyFrom((byte[])((byte[])event.payloadAs(byte[].class, (Converter)this.converter)));
                Event.Builder builder = Event.newBuilder().setPayload(SerializedObject.newBuilder().setData(payloadData).setType(event.type().name()).setRevision(event.type().version()).build()).setMessageIdentifier(event.identifier()).setTimestamp(event.timestamp().toEpochMilli());
                String aggregateIdentifier = AggregateBasedEventStorageEngineUtils.resolveAggregateIdentifier((Set)taggedEvent.tags());
                String aggregateType = AggregateBasedEventStorageEngineUtils.resolveAggregateType((Set)taggedEvent.tags());
                if (aggregateIdentifier != null && aggregateType != null && !taggedEvent.tags().isEmpty()) {
                    long nextSequence = aggregateSequencer.incrementAndGetSequenceOf(aggregateIdentifier);
                    builder.setAggregateIdentifier(aggregateIdentifier).setAggregateType(aggregateType).setAggregateSequenceNumber(nextSequence);
                }
                HashMap<String, MetaDataValue> modifiableMetadataMap = new HashMap<String, MetaDataValue>(builder.getMetaDataMap());
                this.buildMetadata(event.metadata(), modifiableMetadataMap);
                Event message = builder.build();
                tx.appendEvent(message);
            });
        }
        catch (Exception e) {
            tx.rollback();
            return CompletableFuture.failedFuture(e);
        }
        return CompletableFuture.completedFuture(new EventStorageEngine.AppendTransaction<AggregateBasedConsistencyMarker>(){

            public CompletableFuture<AggregateBasedConsistencyMarker> commit(@Nullable ProcessingContext context) {
                return ((CompletableFuture)tx.commit().exceptionallyCompose(e -> CompletableFuture.failedFuture(this.translateConflictException((Throwable)e)))).thenApply(r -> aggregateSequencer.forwarded());
            }

            public CompletableFuture<ConsistencyMarker> afterCommit(@Nonnull AggregateBasedConsistencyMarker marker, @Nullable ProcessingContext context) {
                return CompletableFuture.completedFuture(marker);
            }

            private Throwable translateConflictException(Throwable e) {
                Predicate<Throwable> isConflictException = ex -> {
                    StatusRuntimeException sre;
                    return ex instanceof StatusRuntimeException && Objects.equals((sre = (StatusRuntimeException)ex).getStatus().getCode(), Status.OUT_OF_RANGE.getCode());
                };
                return AggregateBasedEventStorageEngineUtils.translateConflictException((ConsistencyMarker)consistencyMarker, (Throwable)e, isConflictException);
            }

            public void rollback(@Nullable ProcessingContext context) {
                tx.rollback();
            }
        });
    }

    private void buildMetadata(Metadata metadata, Map<String, MetaDataValue> metadataMap) {
        metadata.forEach((k, v) -> metadataMap.put((String)k, MetaDataValue.newBuilder().setTextValue(v).build()));
    }

    public MessageStream<EventMessage> source(@Nonnull SourcingCondition condition, @Nullable ProcessingContext context) {
        CompletableFuture endOfStreams = new CompletableFuture();
        List<AggregateSource> aggregateSources = condition.criteria().flatten().stream().map(criterion -> this.aggregateSourceForCriterion(condition, (EventCriterion)criterion)).toList();
        return aggregateSources.stream().map(AggregateSource::source).reduce((MessageStream)MessageStream.empty().cast(), MessageStream::concatWith).onComplete(() -> endOfStreams.complete(null)).concatWith((MessageStream)MessageStream.fromFuture((CompletableFuture)endOfStreams.thenApply(event -> TerminalEventMessage.INSTANCE), unused -> Context.with((Context.ResourceKey)ConsistencyMarker.RESOURCE_KEY, (Object)AggregateBasedAxonServerEventStorageEngine.combineAggregateMarkers(aggregateSources.stream()))));
    }

    private AggregateSource aggregateSourceForCriterion(SourcingCondition condition, EventCriterion criterion) {
        AtomicReference<AggregateBasedConsistencyMarker> markerReference = new AtomicReference<AggregateBasedConsistencyMarker>();
        String aggregateIdentifier = AggregateBasedEventStorageEngineUtils.resolveAggregateIdentifier((Set)criterion.tags());
        AggregateEventStream aggregateStream = this.connection.eventChannel().openAggregateStream(aggregateIdentifier, AggregateSequenceNumberPosition.toSequenceNumber((Position)condition.start()));
        MessageStream source = MessageStream.fromStream((Stream)aggregateStream.asStream(), this::convertToMessage, event -> AggregateBasedAxonServerEventStorageEngine.setMarkerAndBuildContext(event.getAggregateIdentifier(), event.getAggregateSequenceNumber(), event.getAggregateType(), markerReference)).onComplete(() -> markerReference.compareAndSet(null, new AggregateBasedConsistencyMarker(aggregateIdentifier, 0L))).cast();
        return new AggregateSource(markerReference, (MessageStream<EventMessage>)source);
    }

    private static Context setMarkerAndBuildContext(String aggregateIdentifier, long sequenceNumber, String aggregateType, AtomicReference<AggregateBasedConsistencyMarker> markerReference) {
        markerReference.set(new AggregateBasedConsistencyMarker(aggregateIdentifier, sequenceNumber));
        return Context.with((Context.ResourceKey)LegacyResources.AGGREGATE_IDENTIFIER_KEY, (Object)aggregateIdentifier).withResource(LegacyResources.AGGREGATE_SEQUENCE_NUMBER_KEY, (Object)sequenceNumber).withResource(LegacyResources.AGGREGATE_TYPE_KEY, (Object)aggregateType);
    }

    private static ConsistencyMarker combineAggregateMarkers(Stream<AggregateSource> resultStream) {
        return resultStream.map(AggregateSource::markerReference).map(AtomicReference::get).map(marker -> marker).reduce(ConsistencyMarker::upperBound).orElseThrow();
    }

    public MessageStream<EventMessage> stream(@Nonnull StreamingCondition condition, @Nullable ProcessingContext context) {
        TrackingToken trackingToken = condition.position();
        if (trackingToken instanceof GlobalSequenceTrackingToken) {
            GlobalSequenceTrackingToken gtt = (GlobalSequenceTrackingToken)trackingToken;
            return new AxonServerMessageStream(this.connection.eventChannel().openStream(gtt.getGlobalIndex(), 32), this::convertToMessage);
        }
        throw new IllegalArgumentException("Tracking Token is not of expected type. Must be GlobalTrackingToken. Is: " + trackingToken.getClass().getName());
    }

    private EventMessage convertToMessage(Event event) {
        SerializedObject payload = event.getPayload();
        return new GenericEventMessage(event.getMessageIdentifier(), new MessageType(payload.getType(), payload.getRevision()), (Object)payload.getData().toByteArray(), (Map)this.getMetadata(event.getMetaDataMap()), Instant.ofEpochMilli(event.getTimestamp()));
    }

    private Metadata getMetadata(Map<String, MetaDataValue> metadataMap) {
        return new Metadata(MetadataConverter.convertMetadataValuesToGrpc(metadataMap));
    }

    public CompletableFuture<TrackingToken> firstToken(@Nullable ProcessingContext context) {
        return this.connection.eventChannel().getFirstToken().thenApply(GlobalSequenceTrackingToken::new);
    }

    public CompletableFuture<TrackingToken> latestToken(@Nullable ProcessingContext context) {
        return this.connection.eventChannel().getLastToken().thenApply(GlobalSequenceTrackingToken::new);
    }

    public CompletableFuture<TrackingToken> tokenAt(@Nonnull Instant at, @Nullable ProcessingContext context) {
        return this.connection.eventChannel().getTokenAt(at.toEpochMilli()).thenApply(GlobalSequenceTrackingToken::new);
    }

    public void describeTo(@Nonnull ComponentDescriptor descriptor) {
        descriptor.describeProperty("connection", (Object)this.connection);
        descriptor.describeProperty("converter", (Object)this.converter);
    }

    private record AggregateSource(AtomicReference<AggregateBasedConsistencyMarker> markerReference, MessageStream<EventMessage> source) {
    }
}

