/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.event.axon;

import com.google.protobuf.ByteString;
import io.axoniq.axonserver.connector.event.AggregateEventStream;
import io.axoniq.axonserver.connector.event.AppendEventsTransaction;
import io.axoniq.axonserver.connector.event.EventChannel;
import io.axoniq.axonserver.connector.event.EventStream;
import io.axoniq.axonserver.grpc.event.Event;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Spliterators;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.axonserver.connector.event.axon.EventBuffer;
import org.axonframework.axonserver.connector.event.axon.GrpcBackedDomainEventData;
import org.axonframework.axonserver.connector.event.axon.GrpcMetaDataAwareSerializer;
import org.axonframework.axonserver.connector.event.axon.QueryResultStream;
import org.axonframework.axonserver.connector.util.GrpcMetaDataConverter;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.ObjectUtils;
import org.axonframework.common.jdbc.PersistenceExceptionResolver;
import org.axonframework.common.stream.BlockingStream;
import org.axonframework.eventhandling.DomainEventData;
import org.axonframework.eventhandling.DomainEventMessage;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericDomainEventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.TrackedEventData;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingEventStream;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventsourcing.EventStreamUtils;
import org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine;
import org.axonframework.eventsourcing.eventstore.AbstractEventStore;
import org.axonframework.eventsourcing.eventstore.DomainEventStream;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.eventsourcing.eventstore.EventStoreException;
import org.axonframework.eventsourcing.snapshotting.SnapshotFilter;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcaster;
import org.axonframework.serialization.upcasting.event.NoOpEventUpcaster;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AxonServerEventStore
extends AbstractEventStore {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    public static Builder builder() {
        return new Builder();
    }

    protected AxonServerEventStore(Builder builder) {
        super((AbstractEventStore.Builder)builder);
    }

    public TrackingEventStream openStream(TrackingToken trackingToken) {
        return this.storageEngine().openStream(trackingToken);
    }

    public QueryResultStream query(String query, boolean liveUpdates) {
        return this.storageEngine().query(query, liveUpdates);
    }

    protected AxonIQEventStorageEngine storageEngine() {
        return (AxonIQEventStorageEngine)super.storageEngine();
    }

    public StreamableMessageSource<TrackedEventMessage<?>> createStreamableMessageSourceForContext(String context) {
        return new AxonServerMessageSource(this.storageEngine().createInstanceForContext(context));
    }

    private static class AxonServerMessageSource
    implements StreamableMessageSource<TrackedEventMessage<?>> {
        private final AxonIQEventStorageEngine eventStorageEngine;

        AxonServerMessageSource(AxonIQEventStorageEngine eventStorageEngine) {
            this.eventStorageEngine = eventStorageEngine;
        }

        public BlockingStream<TrackedEventMessage<?>> openStream(TrackingToken trackingToken) {
            return this.eventStorageEngine.openStream(trackingToken);
        }

        public TrackingToken createTailToken() {
            return this.eventStorageEngine.createTailToken();
        }

        public TrackingToken createHeadToken() {
            return this.eventStorageEngine.createHeadToken();
        }

        public TrackingToken createTokenAt(Instant dateTime) {
            return this.eventStorageEngine.createTokenAt(dateTime);
        }
    }

    private static class AxonIQEventStorageEngine
    extends AbstractEventStorageEngine {
        private static final int ALLOW_SNAPSHOTS_MAGIC_VALUE = -42;
        private final String APPEND_EVENT_TRANSACTION = (Object)((Object)this) + "/APPEND_EVENT_TRANSACTION";
        private final AxonServerConfiguration configuration;
        private final AxonServerConnectionManager connectionManager;
        private final GrpcMetaDataConverter converter;
        private final boolean snapshotFilterSet;
        private final Serializer snapshotSerializer;
        private final Serializer eventSerializer;
        private final Builder builder;
        private final String context;

        private static Builder builder() {
            return new Builder();
        }

        private AxonIQEventStorageEngine(Builder builder) {
            this(builder, builder.configuration.getContext());
        }

        private AxonIQEventStorageEngine(Builder builder, String context) {
            super((AbstractEventStorageEngine.Builder)builder);
            this.snapshotFilterSet = builder.snapshotFilterSet;
            this.configuration = builder.configuration;
            this.connectionManager = builder.connectionManager;
            this.converter = builder.converter;
            this.builder = builder;
            this.context = context;
            this.snapshotSerializer = new GrpcMetaDataAwareSerializer(this.getSnapshotSerializer());
            this.eventSerializer = new GrpcMetaDataAwareSerializer(this.getEventSerializer());
        }

        private AxonIQEventStorageEngine createInstanceForContext(String context) {
            return new AxonIQEventStorageEngine(this.builder, context);
        }

        protected void appendEvents(List<? extends EventMessage<?>> events, Serializer serializer) {
            AppendEventsTransaction sender = CurrentUnitOfWork.isStarted() ? (AppendEventsTransaction)CurrentUnitOfWork.get().root().getOrComputeResource(this.APPEND_EVENT_TRANSACTION, k -> {
                AppendEventsTransaction appendEventTransaction = this.connectionManager.getConnection(this.context).eventChannel().startAppendEventsTransaction();
                CurrentUnitOfWork.get().root().onRollback(u -> appendEventTransaction.rollback());
                CurrentUnitOfWork.get().root().onCommit(u -> this.commit(appendEventTransaction));
                return appendEventTransaction;
            }) : this.connectionManager.getConnection(this.context).eventChannel().startAppendEventsTransaction();
            for (EventMessage<?> eventMessage : events) {
                sender.appendEvent(this.map(eventMessage, serializer));
            }
            if (!CurrentUnitOfWork.isStarted()) {
                this.commit(sender);
            }
        }

        private void commit(AppendEventsTransaction appendEventTransaction) {
            try {
                appendEventTransaction.commit().get(this.configuration.getCommitTimeout(), TimeUnit.MILLISECONDS);
            }
            catch (ExecutionException e) {
                if (e.getCause() instanceof RuntimeException) {
                    throw (RuntimeException)e.getCause();
                }
                throw new EventStoreException(e.getMessage(), e.getCause());
            }
            catch (TimeoutException e) {
                throw new org.axonframework.messaging.ExecutionException("Timeout while executing request", (Throwable)e);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new EventStoreException(e.getMessage(), (Throwable)e);
            }
        }

        public Event map(EventMessage<?> eventMessage, Serializer serializer) {
            Event.Builder builder = Event.newBuilder();
            if (eventMessage instanceof GenericDomainEventMessage) {
                builder.setAggregateIdentifier(((GenericDomainEventMessage)eventMessage).getAggregateIdentifier()).setAggregateSequenceNumber(((GenericDomainEventMessage)eventMessage).getSequenceNumber()).setAggregateType(((GenericDomainEventMessage)eventMessage).getType());
            }
            SerializedObject serializedPayload = eventMessage.serializePayload(serializer, byte[].class);
            builder.setMessageIdentifier(eventMessage.getIdentifier()).setPayload(io.axoniq.axonserver.grpc.SerializedObject.newBuilder().setType(serializedPayload.getType().getName()).setRevision((String)ObjectUtils.getOrDefault((Object)serializedPayload.getType().getRevision(), (Object)"")).setData(ByteString.copyFrom((byte[])((byte[])serializedPayload.getData())))).setTimestamp(eventMessage.getTimestamp().toEpochMilli());
            eventMessage.getMetaData().forEach((k, v) -> builder.putMetaData(k, this.converter.convertToMetaDataValue(v)));
            return builder.build();
        }

        protected void storeSnapshot(DomainEventMessage<?> snapshot, Serializer serializer) {
            this.connectionManager.getConnection(this.context).eventChannel().appendSnapshot(this.map((EventMessage<?>)snapshot, serializer)).whenComplete((c, e) -> {
                if (e != null) {
                    logger.warn("Error occurred while creating a snapshot", e);
                } else if (c != null) {
                    if (c.getSuccess()) {
                        logger.info("Snapshot created");
                    } else {
                        logger.warn("Snapshot creation failed for unknown reason. Check server logs for details.");
                    }
                }
            });
        }

        protected Stream<? extends DomainEventData<?>> readEventData(String aggregateIdentifier, long firstSequenceNumber) {
            logger.debug("Reading events for aggregate id {}", (Object)aggregateIdentifier);
            EventChannel eventChannel = this.connectionManager.getConnection(this.context).eventChannel();
            AggregateEventStream aggregateStream = firstSequenceNumber > 0L ? eventChannel.openAggregateStream(aggregateIdentifier, firstSequenceNumber) : (firstSequenceNumber == -42L && !this.snapshotFilterSet ? eventChannel.openAggregateStream(aggregateIdentifier, true) : eventChannel.openAggregateStream(aggregateIdentifier));
            return aggregateStream.asStream().map(GrpcBackedDomainEventData::new);
        }

        public TrackingEventStream openStream(TrackingToken trackingToken) {
            Assert.isTrue((trackingToken == null || trackingToken instanceof GlobalSequenceTrackingToken ? 1 : 0) != 0, () -> "Invalid tracking token type. Must be GlobalSequenceTrackingToken.");
            long nextToken = trackingToken == null ? -1L : ((GlobalSequenceTrackingToken)trackingToken).getGlobalIndex();
            EventStream stream = this.connectionManager.getConnection(this.context).eventChannel().openStream(nextToken, this.configuration.getEventFlowControl().getInitialNrOfPermits().intValue(), this.configuration.getEventFlowControl().getNrOfNewPermits().intValue());
            return new EventBuffer(stream, this.upcasterChain, this.eventSerializer, this.configuration.isDisableEventBlacklisting());
        }

        public QueryResultStream query(String query, boolean liveUpdates) {
            throw new UnsupportedOperationException("Not supported in this connector, yet");
        }

        public DomainEventStream readEvents(String aggregateIdentifier) {
            AtomicLong lastSequenceNumber = new AtomicLong();
            Stream<DomainEventData> input = this.readEventData(aggregateIdentifier, -42L).peek(i -> lastSequenceNumber.getAndUpdate(seq -> Math.max(seq, i.getSequenceNumber())));
            return DomainEventStream.of(input.flatMap(ded -> this.upcastAndDeserializeDomainEvent((DomainEventData<?>)ded, this.isSnapshot((DomainEventData<?>)ded) ? this.snapshotSerializer : this.eventSerializer)).filter(Objects::nonNull), lastSequenceNumber::get);
        }

        private Stream<? extends DomainEventMessage<?>> upcastAndDeserializeDomainEvent(DomainEventData<?> domainEventData, Serializer serializer) {
            DomainEventStream upcastedStream = EventStreamUtils.upcastAndDeserializeDomainEvents(Stream.of(domainEventData), (Serializer)serializer, (EventUpcaster)this.upcasterChain);
            return upcastedStream.asStream();
        }

        private boolean isSnapshot(DomainEventData<?> domainEventData) {
            if (domainEventData instanceof GrpcBackedDomainEventData) {
                GrpcBackedDomainEventData grpcBackedDomainEventData = (GrpcBackedDomainEventData)domainEventData;
                return grpcBackedDomainEventData.isSnapshot();
            }
            return false;
        }

        public Optional<Long> lastSequenceNumberFor(String aggregateIdentifier) {
            try {
                Long lastSequenceNumber = (Long)this.connectionManager.getConnection(this.context).eventChannel().findHighestSequence(aggregateIdentifier).get(this.configuration.getCommitTimeout(), TimeUnit.MILLISECONDS);
                return lastSequenceNumber == null || lastSequenceNumber < 0L ? Optional.empty() : Optional.of(lastSequenceNumber);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new EventStoreException(e.getMessage(), (Throwable)e);
            }
            catch (ExecutionException | TimeoutException e) {
                throw new EventStoreException(e.getMessage(), (Throwable)e);
            }
        }

        public TrackingToken createTailToken() {
            try {
                Long token = (Long)this.connectionManager.getConnection(this.context).eventChannel().getFirstToken().get(this.configuration.getCommitTimeout(), TimeUnit.MILLISECONDS);
                return token == null || token < 0L ? null : new GlobalSequenceTrackingToken(token.longValue());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new EventStoreException(e.getMessage(), (Throwable)e);
            }
            catch (ExecutionException | TimeoutException e) {
                throw new EventStoreException(e.getMessage(), (Throwable)e);
            }
        }

        public TrackingToken createHeadToken() {
            try {
                Long token = (Long)this.connectionManager.getConnection(this.context).eventChannel().getLastToken().get(this.configuration.getCommitTimeout(), TimeUnit.MILLISECONDS);
                return token == null || token < 0L ? null : new GlobalSequenceTrackingToken(token.longValue());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new EventStoreException(e.getMessage(), (Throwable)e);
            }
            catch (ExecutionException | TimeoutException e) {
                throw new EventStoreException(e.getMessage(), (Throwable)e);
            }
        }

        public TrackingToken createTokenAt(Instant instant) {
            try {
                Long token = (Long)this.connectionManager.getConnection(this.context).eventChannel().getTokenAt(instant.toEpochMilli()).get(this.configuration.getCommitTimeout(), TimeUnit.MILLISECONDS);
                return token == null || token < 0L ? null : new GlobalSequenceTrackingToken(token.longValue());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new EventStoreException(e.getMessage(), (Throwable)e);
            }
            catch (ExecutionException | TimeoutException e) {
                throw new EventStoreException(e.getMessage(), (Throwable)e);
            }
        }

        protected Stream<? extends TrackedEventData<?>> readEventData(TrackingToken trackingToken, boolean mayBlock) {
            throw new UnsupportedOperationException("This method is not optimized for the AxonIQ Event Store and should not be used");
        }

        protected Stream<? extends DomainEventData<?>> readSnapshotData(final String aggregateIdentifier) {
            if (!this.snapshotFilterSet) {
                return Stream.empty();
            }
            return StreamSupport.stream(new Spliterators.AbstractSpliterator<DomainEventData<?>>(Long.MAX_VALUE, 4369){
                private long sequenceNumber;
                private final List<DomainEventData<byte[]>> prefetched;
                {
                    super(x0, x1);
                    this.sequenceNumber = Long.MAX_VALUE;
                    this.prefetched = new ArrayList<DomainEventData<byte[]>>();
                }

                @Override
                public boolean tryAdvance(Consumer<? super DomainEventData<?>> action) {
                    if (this.prefetched.isEmpty() && this.sequenceNumber >= 0L) {
                        connectionManager.getConnection(context).eventChannel().loadSnapshots(aggregateIdentifier, this.sequenceNumber, configuration.getSnapshotPrefetch()).asStream().map(GrpcBackedDomainEventData::new).forEach(this.prefetched::add);
                    }
                    if (this.prefetched.isEmpty()) {
                        return false;
                    }
                    DomainEventData<byte[]> snapshot = this.prefetched.remove(0);
                    this.sequenceNumber = snapshot.getSequenceNumber() - 1L;
                    action.accept(snapshot);
                    return true;
                }
            }, false);
        }

        private static class Builder
        extends AbstractEventStorageEngine.Builder {
            private boolean snapshotFilterSet;
            private AxonServerConfiguration configuration;
            private AxonServerConnectionManager connectionManager;
            private GrpcMetaDataConverter converter;

            private Builder() {
            }

            public Builder snapshotSerializer(Serializer snapshotSerializer) {
                if (snapshotSerializer != null) {
                    super.snapshotSerializer(snapshotSerializer);
                }
                return this;
            }

            public Builder upcasterChain(EventUpcaster upcasterChain) {
                if (upcasterChain != null) {
                    super.upcasterChain(upcasterChain);
                }
                return this;
            }

            public Builder persistenceExceptionResolver(PersistenceExceptionResolver persistenceExceptionResolver) {
                super.persistenceExceptionResolver(persistenceExceptionResolver);
                return this;
            }

            public Builder eventSerializer(Serializer eventSerializer) {
                if (eventSerializer != null) {
                    super.eventSerializer(eventSerializer);
                }
                return this;
            }

            @Deprecated
            public Builder snapshotFilter(Predicate<? super DomainEventData<?>> snapshotFilter) {
                if (snapshotFilter != null) {
                    super.snapshotFilter(snapshotFilter);
                    this.snapshotFilterSet = true;
                }
                return this;
            }

            public Builder snapshotFilter(SnapshotFilter snapshotFilter) {
                if (snapshotFilter != null) {
                    super.snapshotFilter(snapshotFilter);
                    this.snapshotFilterSet = true;
                }
                return this;
            }

            private Builder configuration(AxonServerConfiguration configuration) {
                BuilderUtils.assertNonNull((Object)configuration, (String)"AxonServerConfiguration may not be null");
                this.configuration = configuration;
                return this;
            }

            private Builder eventStoreClient(AxonServerConnectionManager eventStoreClient) {
                BuilderUtils.assertNonNull((Object)eventStoreClient, (String)"AxonServerEventStoreClient may not be null");
                this.connectionManager = eventStoreClient;
                return this;
            }

            private Builder converter(GrpcMetaDataConverter converter) {
                BuilderUtils.assertNonNull((Object)converter, (String)"GrpcMetaDataConverter may not be null");
                this.converter = converter;
                return this;
            }

            private AxonIQEventStorageEngine build() {
                return new AxonIQEventStorageEngine(this);
            }

            protected void validate() throws AxonConfigurationException {
                BuilderUtils.assertNonNull((Object)this.configuration, (String)"The AxonServerConfiguration is a hard requirement and should be provided");
                BuilderUtils.assertNonNull((Object)this.connectionManager, (String)"The AxonServerEventStoreClient is a hard requirement and should be provided");
                BuilderUtils.assertNonNull((Object)this.converter, (String)"The GrpcMetaDataConverter is a hard requirement and should be provided");
            }
        }
    }

    public static class Builder
    extends AbstractEventStore.Builder {
        private AxonServerConfiguration configuration;
        private AxonServerConnectionManager axonServerConnectionManager;
        private Supplier<Serializer> snapshotSerializer = XStreamSerializer::defaultSerializer;
        private Supplier<Serializer> eventSerializer = XStreamSerializer::defaultSerializer;
        private EventUpcaster upcasterChain = NoOpEventUpcaster.INSTANCE;
        private SnapshotFilter snapshotFilter;

        public Builder storageEngine(EventStorageEngine storageEngine) {
            super.storageEngine(storageEngine);
            return this;
        }

        public Builder messageMonitor(MessageMonitor<? super EventMessage<?>> messageMonitor) {
            super.messageMonitor(messageMonitor);
            return this;
        }

        public Builder configuration(AxonServerConfiguration configuration) {
            BuilderUtils.assertNonNull((Object)configuration, (String)"AxonServerConfiguration may not be null");
            this.configuration = configuration;
            return this;
        }

        public Builder platformConnectionManager(AxonServerConnectionManager axonServerConnectionManager) {
            BuilderUtils.assertNonNull((Object)axonServerConnectionManager, (String)"PlatformConnectionManager may not be null");
            this.axonServerConnectionManager = axonServerConnectionManager;
            return this;
        }

        public Builder snapshotSerializer(Serializer snapshotSerializer) {
            BuilderUtils.assertNonNull((Object)snapshotSerializer, (String)"The Snapshot Serializer may not be null");
            this.snapshotSerializer = () -> snapshotSerializer;
            return this;
        }

        public Builder eventSerializer(Serializer eventSerializer) {
            BuilderUtils.assertNonNull((Object)eventSerializer, (String)"The Event Serializer may not be null");
            this.eventSerializer = () -> eventSerializer;
            return this;
        }

        @Deprecated
        public Builder snapshotFilter(Predicate<? super DomainEventData<?>> snapshotFilter) {
            return this.snapshotFilter(snapshotFilter::test);
        }

        public Builder snapshotFilter(SnapshotFilter snapshotFilter) {
            BuilderUtils.assertNonNull((Object)snapshotFilter, (String)"The Snapshot filter may not be null");
            this.snapshotFilter = snapshotFilter;
            return this;
        }

        public Builder upcasterChain(EventUpcaster upcasterChain) {
            BuilderUtils.assertNonNull((Object)upcasterChain, (String)"EventUpcaster may not be null");
            this.upcasterChain = upcasterChain;
            return this;
        }

        public AxonServerEventStore build() {
            if (this.storageEngine == null) {
                this.buildStorageEngine();
            }
            return new AxonServerEventStore(this);
        }

        private void buildStorageEngine() {
            BuilderUtils.assertNonNull((Object)this.configuration, (String)"The AxonServerConfiguration is a hard requirement and should be provided");
            BuilderUtils.assertNonNull((Object)this.axonServerConnectionManager, (String)"The PlatformConnectionManager is a hard requirement and should be provided");
            super.storageEngine((EventStorageEngine)AxonIQEventStorageEngine.builder().snapshotSerializer(this.snapshotSerializer.get()).upcasterChain(this.upcasterChain).snapshotFilter(this.snapshotFilter).eventSerializer(this.eventSerializer.get()).configuration(this.configuration).eventStoreClient(this.axonServerConnectionManager).converter(new GrpcMetaDataConverter(this.eventSerializer.get())).build());
        }

        protected void validate() throws AxonConfigurationException {
            super.validate();
        }
    }
}

