/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.event.axon;

import com.google.protobuf.ByteString;
import io.axoniq.axonserver.grpc.event.Event;
import io.axoniq.axonserver.grpc.event.EventWithToken;
import io.axoniq.axonserver.grpc.event.GetAggregateEventsRequest;
import io.axoniq.axonserver.grpc.event.GetAggregateSnapshotsRequest;
import io.axoniq.axonserver.grpc.event.GetEventsRequest;
import io.axoniq.axonserver.grpc.event.QueryEventsRequest;
import io.axoniq.axonserver.grpc.event.QueryEventsResponse;
import io.axoniq.axonserver.grpc.event.ReadHighestSequenceNrResponse;
import io.grpc.stub.StreamObserver;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Spliterators;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.event.AppendEventTransaction;
import org.axonframework.axonserver.connector.event.AxonServerEventStoreClient;
import org.axonframework.axonserver.connector.event.axon.EventBuffer;
import org.axonframework.axonserver.connector.event.axon.GrpcBackedDomainEventData;
import org.axonframework.axonserver.connector.event.axon.GrpcMetaDataAwareSerializer;
import org.axonframework.axonserver.connector.event.axon.QueryResultBuffer;
import org.axonframework.axonserver.connector.event.axon.QueryResultStream;
import org.axonframework.axonserver.connector.util.FlowControllingStreamObserver;
import org.axonframework.axonserver.connector.util.GrpcMetaDataConverter;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.ObjectUtils;
import org.axonframework.common.jdbc.PersistenceExceptionResolver;
import org.axonframework.eventhandling.DomainEventData;
import org.axonframework.eventhandling.DomainEventMessage;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericDomainEventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.TrackedEventData;
import org.axonframework.eventhandling.TrackingEventStream;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventsourcing.EventStreamUtils;
import org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine;
import org.axonframework.eventsourcing.eventstore.AbstractEventStore;
import org.axonframework.eventsourcing.eventstore.DomainEventStream;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.eventsourcing.eventstore.EventStoreException;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcaster;
import org.axonframework.serialization.upcasting.event.NoOpEventUpcaster;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AxonServerEventStore
extends AbstractEventStore {
    private static final Logger logger = LoggerFactory.getLogger(AxonServerEventStore.class);

    protected AxonServerEventStore(Builder builder) {
        super((AbstractEventStore.Builder)builder);
    }

    public static Builder builder() {
        return new Builder();
    }

    public TrackingEventStream openStream(TrackingToken trackingToken) {
        return this.storageEngine().openStream(trackingToken);
    }

    public QueryResultStream query(String query, boolean liveUpdates) {
        return this.storageEngine().query(query, liveUpdates);
    }

    protected AxonIQEventStorageEngine storageEngine() {
        return (AxonIQEventStorageEngine)super.storageEngine();
    }

    private static class AxonIQEventStorageEngine
    extends AbstractEventStorageEngine {
        private static final int ALLOW_SNAPSHOTS_MAGIC_VALUE = -42;
        private final String APPEND_EVENT_TRANSACTION = (Object)((Object)this) + "/APPEND_EVENT_TRANSACTION";
        private final AxonServerConfiguration configuration;
        private final AxonServerEventStoreClient eventStoreClient;
        private final GrpcMetaDataConverter converter;
        private final boolean snapshotFilterSet;

        private AxonIQEventStorageEngine(Builder builder) {
            super((AbstractEventStorageEngine.Builder)builder);
            this.snapshotFilterSet = builder.snapshotFilterSet;
            this.configuration = builder.configuration;
            this.eventStoreClient = builder.eventStoreClient;
            this.converter = builder.converter;
        }

        private static Builder builder() {
            return new Builder();
        }

        protected void appendEvents(List<? extends EventMessage<?>> events, Serializer serializer) {
            AppendEventTransaction sender = CurrentUnitOfWork.isStarted() ? (AppendEventTransaction)CurrentUnitOfWork.get().root().getOrComputeResource(this.APPEND_EVENT_TRANSACTION, k -> {
                AppendEventTransaction appendEventTransaction = this.eventStoreClient.createAppendEventConnection();
                CurrentUnitOfWork.get().root().onRollback(u -> appendEventTransaction.rollback(u.getExecutionResult().getExceptionResult()));
                CurrentUnitOfWork.get().root().onCommit(u -> this.commit(appendEventTransaction));
                return appendEventTransaction;
            }) : this.eventStoreClient.createAppendEventConnection();
            for (EventMessage<?> eventMessage : events) {
                sender.append(this.map(eventMessage, serializer));
            }
            if (!CurrentUnitOfWork.isStarted()) {
                this.commit(sender);
            }
        }

        private void commit(AppendEventTransaction appendEventTransaction) {
            try {
                appendEventTransaction.commit();
            }
            catch (ExecutionException e) {
                throw ErrorCode.convert(e.getCause());
            }
            catch (TimeoutException e) {
                throw ErrorCode.convert(e);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw ErrorCode.convert(e);
            }
        }

        public Event map(EventMessage<?> eventMessage, Serializer serializer) {
            Event.Builder builder = Event.newBuilder();
            if (eventMessage instanceof GenericDomainEventMessage) {
                builder.setAggregateIdentifier(((GenericDomainEventMessage)eventMessage).getAggregateIdentifier()).setAggregateSequenceNumber(((GenericDomainEventMessage)eventMessage).getSequenceNumber()).setAggregateType(((GenericDomainEventMessage)eventMessage).getType());
            }
            SerializedObject serializedPayload = eventMessage.serializePayload(serializer, byte[].class);
            builder.setMessageIdentifier(eventMessage.getIdentifier()).setPayload(io.axoniq.axonserver.grpc.SerializedObject.newBuilder().setType(serializedPayload.getType().getName()).setRevision((String)ObjectUtils.getOrDefault((Object)serializedPayload.getType().getRevision(), (Object)"")).setData(ByteString.copyFrom((byte[])((byte[])serializedPayload.getData())))).setTimestamp(eventMessage.getTimestamp().toEpochMilli());
            eventMessage.getMetaData().forEach((k, v) -> builder.putMetaData((String)k, this.converter.convertToMetaDataValue(v)));
            return builder.build();
        }

        protected void storeSnapshot(DomainEventMessage<?> snapshot, Serializer serializer) {
            try {
                this.eventStoreClient.appendSnapshot(this.map((EventMessage<?>)snapshot, serializer)).whenComplete((c, e) -> {
                    if (e != null) {
                        logger.warn("Error occurred while creating a snapshot", e);
                    } else if (c != null) {
                        if (c.getSuccess()) {
                            logger.info("Snapshot created");
                        } else {
                            logger.warn("Snapshot creation failed for unknown reason. Check server logs for details.");
                        }
                    }
                });
            }
            catch (Throwable e2) {
                throw ErrorCode.convert(e2);
            }
        }

        protected Stream<? extends DomainEventData<?>> readEventData(String aggregateIdentifier, long firstSequenceNumber) {
            logger.debug("Reading events for aggregate id {}", (Object)aggregateIdentifier);
            GetAggregateEventsRequest.Builder request = GetAggregateEventsRequest.newBuilder().setAggregateId(aggregateIdentifier);
            if (firstSequenceNumber > 0L) {
                request.setInitialSequence(firstSequenceNumber);
            } else if (firstSequenceNumber == -42L && !this.snapshotFilterSet) {
                request.setAllowSnapshots(true);
            }
            try {
                return this.eventStoreClient.listAggregateEvents(request.build()).map(GrpcBackedDomainEventData::new);
            }
            catch (Exception e) {
                throw ErrorCode.convert(e);
            }
        }

        public TrackingEventStream openStream(TrackingToken trackingToken) {
            Assert.isTrue((trackingToken == null || trackingToken instanceof GlobalSequenceTrackingToken ? 1 : 0) != 0, () -> "Invalid tracking token type. Must be GlobalSequenceTrackingToken.");
            long nextToken = trackingToken == null ? 0L : ((GlobalSequenceTrackingToken)trackingToken).getGlobalIndex() + 1L;
            final EventBuffer consumer = new EventBuffer(this.upcasterChain, this.getEventSerializer());
            logger.info("open stream: {}", (Object)nextToken);
            StreamObserver<GetEventsRequest> requestStream = this.eventStoreClient.listEvents(new StreamObserver<EventWithToken>(){

                public void onNext(EventWithToken eventWithToken) {
                    logger.debug("Received event with token: {}", (Object)eventWithToken.getToken());
                    consumer.push(eventWithToken);
                }

                public void onError(Throwable throwable) {
                    consumer.fail((RuntimeException)new EventStoreException("Error while reading events from the server", throwable));
                }

                public void onCompleted() {
                    consumer.fail((RuntimeException)new EventStoreException("Error while reading events from the server", (Throwable)new RuntimeException("Connection closed by server")));
                }
            });
            FlowControllingStreamObserver<GetEventsRequest> observer = new FlowControllingStreamObserver<GetEventsRequest>(requestStream, this.configuration, t -> GetEventsRequest.newBuilder().setNumberOfPermits(t.getPermits()).build(), t -> false);
            GetEventsRequest request = GetEventsRequest.newBuilder().setTrackingToken(nextToken).setClient(this.configuration.getClientName()).setComponent(this.configuration.getComponentName()).setNumberOfPermits(this.configuration.getInitialNrOfPermits().intValue()).build();
            observer.onNext(request);
            consumer.registerCloseListener(eventConsumer -> observer.onCompleted());
            consumer.registerConsumeListener(observer::markConsumed);
            return consumer;
        }

        public QueryResultStream query(String query, boolean liveUpdates) {
            final QueryResultBuffer consumer = new QueryResultBuffer();
            logger.debug("query: {}", (Object)query);
            StreamObserver<QueryEventsRequest> requestStream = this.eventStoreClient.query(new StreamObserver<QueryEventsResponse>(){

                public void onNext(QueryEventsResponse eventWithToken) {
                    consumer.push(eventWithToken);
                }

                public void onError(Throwable throwable) {
                    logger.info("Failed to receive events - {}", (Object)throwable.getMessage());
                    consumer.fail(new EventStoreException("Error while reading query results from the server", throwable));
                }

                public void onCompleted() {
                    consumer.close();
                }
            });
            FlowControllingStreamObserver<QueryEventsRequest> observer = new FlowControllingStreamObserver<QueryEventsRequest>(requestStream, this.configuration, t -> QueryEventsRequest.newBuilder().setNumberOfPermits(t.getPermits()).build(), t -> false);
            observer.onNext(QueryEventsRequest.newBuilder().setQuery(query).setNumberOfPermits(this.configuration.getInitialNrOfPermits().intValue()).setLiveEvents(liveUpdates).build());
            consumer.registerCloseListener(eventConsumer -> observer.onCompleted());
            consumer.registerConsumeListener(observer::markConsumed);
            return consumer;
        }

        public DomainEventStream readEvents(String aggregateIdentifier) {
            Stream<DomainEventData<?>> input = this.readEventData(aggregateIdentifier, -42L);
            return DomainEventStream.of(input.map(this::upcastAndDeserializeDomainEvent).filter(Objects::nonNull));
        }

        private DomainEventMessage<?> upcastAndDeserializeDomainEvent(DomainEventData<?> domainEventData) {
            DomainEventStream upcastedStream = EventStreamUtils.upcastAndDeserializeDomainEvents(Stream.of(domainEventData), (Serializer)new GrpcMetaDataAwareSerializer(this.isSnapshot(domainEventData) ? this.getSnapshotSerializer() : this.getEventSerializer()), (EventUpcaster)this.upcasterChain);
            return upcastedStream.hasNext() ? upcastedStream.next() : null;
        }

        private boolean isSnapshot(DomainEventData<?> domainEventData) {
            if (domainEventData instanceof GrpcBackedDomainEventData) {
                GrpcBackedDomainEventData grpcBackedDomainEventData = (GrpcBackedDomainEventData)domainEventData;
                return grpcBackedDomainEventData.isSnapshot();
            }
            return false;
        }

        public Optional<Long> lastSequenceNumberFor(String aggregateIdentifier) {
            try {
                ReadHighestSequenceNrResponse lastSequenceNumber = this.eventStoreClient.lastSequenceNumberFor(aggregateIdentifier).get();
                return lastSequenceNumber.getToSequenceNr() < 0L ? Optional.empty() : Optional.of(lastSequenceNumber.getToSequenceNr());
            }
            catch (Throwable e) {
                throw ErrorCode.convert(e);
            }
        }

        public TrackingToken createTailToken() {
            try {
                io.axoniq.axonserver.grpc.event.TrackingToken token = this.eventStoreClient.getFirstToken().get();
                if (token.getToken() < 0L) {
                    return null;
                }
                return new GlobalSequenceTrackingToken(token.getToken() - 1L);
            }
            catch (Throwable e) {
                throw ErrorCode.convert(e);
            }
        }

        public TrackingToken createHeadToken() {
            try {
                io.axoniq.axonserver.grpc.event.TrackingToken token = this.eventStoreClient.getLastToken().get();
                return new GlobalSequenceTrackingToken(token.getToken());
            }
            catch (Throwable e) {
                throw ErrorCode.convert(e);
            }
        }

        public TrackingToken createTokenAt(Instant instant) {
            try {
                io.axoniq.axonserver.grpc.event.TrackingToken token = this.eventStoreClient.getTokenAt(instant).get();
                if (token.getToken() < 0L) {
                    return null;
                }
                return new GlobalSequenceTrackingToken(token.getToken() - 1L);
            }
            catch (Throwable e) {
                throw ErrorCode.convert(e);
            }
        }

        protected Stream<? extends TrackedEventData<?>> readEventData(TrackingToken trackingToken, boolean mayBlock) {
            throw new UnsupportedOperationException("This method is not optimized for the AxonIQ Event Store and should not be used");
        }

        protected Stream<? extends DomainEventData<?>> readSnapshotData(final String aggregateIdentifier) {
            if (!this.snapshotFilterSet) {
                return Stream.empty();
            }
            return StreamSupport.stream(new Spliterators.AbstractSpliterator<DomainEventData<?>>(Long.MAX_VALUE, 4369){
                private long sequenceNumber;
                private List<DomainEventData> prefetched;
                {
                    super(x0, x1);
                    this.sequenceNumber = Long.MAX_VALUE;
                    this.prefetched = new ArrayList<DomainEventData>();
                }

                @Override
                public boolean tryAdvance(Consumer<? super DomainEventData<?>> action) {
                    if (this.prefetched.isEmpty() && this.sequenceNumber >= 0L) {
                        GetAggregateSnapshotsRequest request = GetAggregateSnapshotsRequest.newBuilder().setAggregateId(aggregateIdentifier).setMaxResults(configuration.getSnapshotPrefetch()).setMaxSequence(this.sequenceNumber).build();
                        try {
                            eventStoreClient.listAggregateSnapshots(request).map(GrpcBackedDomainEventData::new).forEach(e -> this.prefetched.add((DomainEventData)e));
                        }
                        catch (ExecutionException e2) {
                            throw ErrorCode.convert(e2);
                        }
                        catch (InterruptedException e3) {
                            Thread.currentThread().interrupt();
                            return false;
                        }
                    }
                    if (this.prefetched.isEmpty()) {
                        return false;
                    }
                    DomainEventData snapshot = this.prefetched.remove(0);
                    this.sequenceNumber = snapshot.getSequenceNumber() - 1L;
                    action.accept(snapshot);
                    return true;
                }
            }, false);
        }

        private static class Builder
        extends AbstractEventStorageEngine.Builder {
            private boolean snapshotFilterSet;
            private AxonServerConfiguration configuration;
            private AxonServerEventStoreClient eventStoreClient;
            private GrpcMetaDataConverter converter;

            private Builder() {
            }

            public Builder snapshotSerializer(Serializer snapshotSerializer) {
                if (snapshotSerializer != null) {
                    super.snapshotSerializer(snapshotSerializer);
                }
                return this;
            }

            public Builder upcasterChain(EventUpcaster upcasterChain) {
                if (upcasterChain != null) {
                    super.upcasterChain(upcasterChain);
                }
                return this;
            }

            public Builder persistenceExceptionResolver(PersistenceExceptionResolver persistenceExceptionResolver) {
                super.persistenceExceptionResolver(persistenceExceptionResolver);
                return this;
            }

            public Builder eventSerializer(Serializer eventSerializer) {
                if (eventSerializer != null) {
                    super.eventSerializer(eventSerializer);
                }
                return this;
            }

            public Builder snapshotFilter(Predicate<? super DomainEventData<?>> snapshotFilter) {
                if (snapshotFilter != null) {
                    super.snapshotFilter(snapshotFilter);
                    this.snapshotFilterSet = true;
                }
                return this;
            }

            private Builder configuration(AxonServerConfiguration configuration) {
                BuilderUtils.assertNonNull((Object)configuration, (String)"AxonServerConfiguration may not be null");
                this.configuration = configuration;
                return this;
            }

            private Builder eventStoreClient(AxonServerEventStoreClient eventStoreClient) {
                BuilderUtils.assertNonNull((Object)eventStoreClient, (String)"AxonServerEventStoreClient may not be null");
                this.eventStoreClient = eventStoreClient;
                return this;
            }

            private Builder converter(GrpcMetaDataConverter converter) {
                BuilderUtils.assertNonNull((Object)converter, (String)"GrpcMetaDataConverter may not be null");
                this.converter = converter;
                return this;
            }

            private AxonIQEventStorageEngine build() {
                return new AxonIQEventStorageEngine(this);
            }

            protected void validate() throws AxonConfigurationException {
                BuilderUtils.assertNonNull((Object)this.configuration, (String)"The AxonServerConfiguration is a hard requirement and should be provided");
                BuilderUtils.assertNonNull((Object)this.eventStoreClient, (String)"The AxonServerEventStoreClient is a hard requirement and should be provided");
                BuilderUtils.assertNonNull((Object)this.converter, (String)"The GrpcMetaDataConverter is a hard requirement and should be provided");
            }
        }
    }

    public static class Builder
    extends AbstractEventStore.Builder {
        private AxonServerConfiguration configuration;
        private AxonServerConnectionManager axonServerConnectionManager;
        private Serializer snapshotSerializer = XStreamSerializer.builder().build();
        private Serializer eventSerializer = XStreamSerializer.builder().build();
        private EventUpcaster upcasterChain = NoOpEventUpcaster.INSTANCE;
        private Predicate<? super DomainEventData<?>> snapshotFilter;

        public Builder storageEngine(EventStorageEngine storageEngine) {
            super.storageEngine(storageEngine);
            return this;
        }

        public Builder messageMonitor(MessageMonitor<? super EventMessage<?>> messageMonitor) {
            super.messageMonitor(messageMonitor);
            return this;
        }

        public Builder configuration(AxonServerConfiguration configuration) {
            BuilderUtils.assertNonNull((Object)configuration, (String)"AxonServerConfiguration may not be null");
            this.configuration = configuration;
            return this;
        }

        public Builder platformConnectionManager(AxonServerConnectionManager axonServerConnectionManager) {
            BuilderUtils.assertNonNull((Object)axonServerConnectionManager, (String)"PlatformConnectionManager may not be null");
            this.axonServerConnectionManager = axonServerConnectionManager;
            return this;
        }

        public Builder snapshotSerializer(Serializer snapshotSerializer) {
            BuilderUtils.assertNonNull((Object)snapshotSerializer, (String)"The Snapshot Serializer may not be null");
            this.snapshotSerializer = snapshotSerializer;
            return this;
        }

        public Builder eventSerializer(Serializer eventSerializer) {
            BuilderUtils.assertNonNull((Object)eventSerializer, (String)"The Event Serializer may not be null");
            this.eventSerializer = eventSerializer;
            return this;
        }

        public Builder snapshotFilter(Predicate<? super DomainEventData<?>> snapshotFilter) {
            BuilderUtils.assertNonNull(snapshotFilter, (String)"The Snapshot filter may not be null");
            this.snapshotFilter = snapshotFilter;
            return this;
        }

        public Builder upcasterChain(EventUpcaster upcasterChain) {
            BuilderUtils.assertNonNull((Object)upcasterChain, (String)"EventUpcaster may not be null");
            this.upcasterChain = upcasterChain;
            return this;
        }

        public AxonServerEventStore build() {
            if (this.storageEngine == null) {
                this.buildStorageEngine();
            }
            return new AxonServerEventStore(this);
        }

        private void buildStorageEngine() {
            BuilderUtils.assertNonNull((Object)this.configuration, (String)"The AxonServerConfiguration is a hard requirement and should be provided");
            BuilderUtils.assertNonNull((Object)this.axonServerConnectionManager, (String)"The PlatformConnectionManager is a hard requirement and should be provided");
            AxonServerEventStoreClient eventStoreClient = new AxonServerEventStoreClient(this.configuration, this.axonServerConnectionManager);
            super.storageEngine((EventStorageEngine)AxonIQEventStorageEngine.builder().snapshotSerializer(this.snapshotSerializer).upcasterChain(this.upcasterChain).snapshotFilter((Predicate)this.snapshotFilter).eventSerializer(this.eventSerializer).configuration(this.configuration).eventStoreClient(eventStoreClient).converter(new GrpcMetaDataConverter(this.eventSerializer)).build());
        }

        protected void validate() throws AxonConfigurationException {
            super.validate();
        }
    }
}

