/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventsourcing.eventstore.inmemory;

import java.time.Instant;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.axonframework.eventhandling.DomainEventMessage;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventUtils;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventsourcing.eventstore.DomainEventStream;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;

public class InMemoryEventStorageEngine
implements EventStorageEngine {
    private final NavigableMap<TrackingToken, TrackedEventMessage<?>> events = new ConcurrentSkipListMap();
    private final Map<String, List<DomainEventMessage<?>>> snapshots = new ConcurrentHashMap();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void appendEvents(List<? extends EventMessage<?>> events) {
        NavigableMap<TrackingToken, TrackedEventMessage<?>> navigableMap = this.events;
        synchronized (navigableMap) {
            GlobalSequenceTrackingToken trackingToken = this.nextTrackingToken();
            this.events.putAll(IntStream.range(0, events.size()).mapToObj(i -> EventUtils.asTrackedEventMessage((EventMessage)((EventMessage)events.get(i)), (TrackingToken)trackingToken.offsetBy(i))).collect(Collectors.toMap(TrackedEventMessage::trackingToken, Function.identity())));
        }
    }

    @Override
    public void storeSnapshot(DomainEventMessage<?> snapshot) {
        this.snapshots.compute(snapshot.getAggregateIdentifier(), (aggregateId, snapshotsSoFar) -> {
            if (snapshotsSoFar == null) {
                CopyOnWriteArrayList<DomainEventMessage> newSnapshots = new CopyOnWriteArrayList<DomainEventMessage>();
                newSnapshots.add(snapshot);
                return newSnapshots;
            }
            snapshotsSoFar.add(snapshot);
            return snapshotsSoFar;
        });
    }

    @Override
    public Stream<? extends TrackedEventMessage<?>> readEvents(TrackingToken trackingToken, boolean mayBlock) {
        if (trackingToken == null) {
            return this.events.values().stream();
        }
        return this.events.tailMap(trackingToken, false).values().stream();
    }

    @Override
    public DomainEventStream readEvents(String aggregateIdentifier, long firstSequenceNumber) {
        AtomicReference sequenceNumber = new AtomicReference();
        Stream<DomainEventMessage> stream = this.events.values().stream().filter(event -> event instanceof DomainEventMessage).map(event -> (DomainEventMessage)event).filter(event -> aggregateIdentifier.equals(event.getAggregateIdentifier()) && event.getSequenceNumber() >= firstSequenceNumber).peek(event -> sequenceNumber.set(event.getSequenceNumber()));
        return DomainEventStream.of(stream, sequenceNumber::get);
    }

    @Override
    public Optional<DomainEventMessage<?>> readSnapshot(String aggregateIdentifier) {
        return this.snapshots.getOrDefault(aggregateIdentifier, Collections.emptyList()).stream().max(Comparator.comparingLong(DomainEventMessage::getSequenceNumber));
    }

    @Override
    public TrackingToken createTailToken() {
        if (this.events.size() == 0) {
            return null;
        }
        GlobalSequenceTrackingToken firstToken = (GlobalSequenceTrackingToken)this.events.firstKey();
        return new GlobalSequenceTrackingToken(firstToken.getGlobalIndex() - 1L);
    }

    @Override
    public TrackingToken createHeadToken() {
        if (this.events.size() == 0) {
            return null;
        }
        return (TrackingToken)this.events.lastKey();
    }

    @Override
    public TrackingToken createTokenAt(Instant dateTime) {
        return this.events.values().stream().filter(event -> event.getTimestamp().equals(dateTime) || event.getTimestamp().isAfter(dateTime)).min(Comparator.comparingLong(e -> ((GlobalSequenceTrackingToken)e.trackingToken()).getGlobalIndex())).map(TrackedEventMessage::trackingToken).map(tt -> (GlobalSequenceTrackingToken)tt).map(tt -> new GlobalSequenceTrackingToken(tt.getGlobalIndex() - 1L)).orElse(null);
    }

    protected GlobalSequenceTrackingToken nextTrackingToken() {
        return this.events.isEmpty() ? new GlobalSequenceTrackingToken(0L) : ((GlobalSequenceTrackingToken)this.events.lastKey()).next();
    }
}

