/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.messaging.eventhandling.processing.streaming.token.store.inmemory;

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.axonframework.common.FutureUtils;
import org.axonframework.common.ObjectUtils;
import org.axonframework.messaging.core.unitofwork.ProcessingContext;
import org.axonframework.messaging.eventhandling.processing.streaming.segmenting.Segment;
import org.axonframework.messaging.eventhandling.processing.streaming.token.GlobalSequenceTrackingToken;
import org.axonframework.messaging.eventhandling.processing.streaming.token.TrackingToken;
import org.axonframework.messaging.eventhandling.processing.streaming.token.store.TokenStore;
import org.axonframework.messaging.eventhandling.processing.streaming.token.store.UnableToClaimTokenException;
import org.axonframework.messaging.eventhandling.processing.streaming.token.store.UnableToInitializeTokenException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryTokenStore
implements TokenStore {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final GlobalSequenceTrackingToken NULL_TOKEN = new GlobalSequenceTrackingToken(-1L);
    private final Map<ProcessAndSegmentId, SegmentAndToken> tokens = new ConcurrentHashMap<ProcessAndSegmentId, SegmentAndToken>();
    private final String identifier = UUID.randomUUID().toString();

    public InMemoryTokenStore() {
        logger.warn("An in memory token store is being created.\nThis means the event processor using this token store might process the same events again when the application is restarted.\nIf the use of an in memory token store is intentional, this warning can be ignored.\nIf the tokens should be persisted, use the JPA, JDBC or MongoDB token store instead.\n");
    }

    @Override
    @Nonnull
    public CompletableFuture<List<Segment>> initializeTokenSegments(@Nonnull String processorName, int segmentCount, @Nullable TrackingToken initialToken, @Nullable ProcessingContext context) throws UnableToClaimTokenException {
        return this.fetchSegments(processorName, context).thenApply(segments -> {
            if (segments.size() > 0) {
                throw new UnableToClaimTokenException("Could not initialize segments. Some segments were already present.");
            }
            List<Segment> newSegments = Segment.splitBalanced(Segment.ROOT_SEGMENT, segmentCount - 1);
            for (Segment segment : newSegments) {
                this.tokens.put(new ProcessAndSegmentId(processorName, segment.getSegmentId()), new SegmentAndToken(segment, (TrackingToken)ObjectUtils.getOrDefault((Object)initialToken, (Object)NULL_TOKEN)));
            }
            return newSegments;
        });
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> storeToken(@Nullable TrackingToken token, @Nonnull String processorName, int segmentId, @Nullable ProcessingContext context) {
        Objects.requireNonNull(context, "processingContext may not be null for an InMemoryTokenStore");
        if (context.isStarted()) {
            context.runOnAfterCommit(c -> this.updateToken(token, processorName, segmentId));
        } else {
            this.updateToken(token, processorName, segmentId);
        }
        return FutureUtils.emptyCompletedFuture();
    }

    private void updateToken(@Nullable TrackingToken token, @Nonnull String processorName, int segmentId) {
        ProcessAndSegmentId key = new ProcessAndSegmentId(processorName, segmentId);
        SegmentAndToken old = this.tokens.computeIfPresent(key, (ps, st) -> new SegmentAndToken(st.segment, token));
        if (old == null) {
            throw new UnableToClaimTokenException("No such token for processor '%s' and segment %d".formatted(processorName, segmentId));
        }
    }

    @Override
    @Nonnull
    public CompletableFuture<TrackingToken> fetchToken(@Nonnull String processorName, int segmentId, @Nullable ProcessingContext context) {
        SegmentAndToken st = this.tokens.get(new ProcessAndSegmentId(processorName, segmentId));
        if (st == null) {
            throw new UnableToClaimTokenException("No token was initialized for segment " + segmentId + " for processor " + processorName);
        }
        if (NULL_TOKEN == st.trackingToken) {
            return FutureUtils.emptyCompletedFuture();
        }
        return CompletableFuture.completedFuture(st.trackingToken);
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> releaseClaim(@Nonnull String processorName, int segment, @Nullable ProcessingContext context) {
        return FutureUtils.emptyCompletedFuture();
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> deleteToken(@Nonnull String processorName, int segment, @Nullable ProcessingContext context) throws UnableToClaimTokenException {
        this.tokens.remove(new ProcessAndSegmentId(processorName, segment));
        return FutureUtils.emptyCompletedFuture();
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> initializeSegment(@Nullable TrackingToken token, @Nonnull String processorName, @Nonnull Segment segment, @Nullable ProcessingContext context) throws UnableToInitializeTokenException {
        SegmentAndToken previous = this.tokens.putIfAbsent(new ProcessAndSegmentId(processorName, segment.getSegmentId()), new SegmentAndToken(segment, token == null ? NULL_TOKEN : token));
        if (previous != null) {
            throw new UnableToInitializeTokenException("Token was already present");
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    @Nonnull
    public CompletableFuture<Segment> fetchSegment(@Nonnull String processorName, int segmentId, @Nullable ProcessingContext context) {
        SegmentAndToken st = this.tokens.get(new ProcessAndSegmentId(processorName, segmentId));
        return CompletableFuture.completedFuture(st == null ? null : st.segment);
    }

    @Override
    @Nonnull
    public CompletableFuture<List<Segment>> fetchSegments(@Nonnull String processorName, @Nullable ProcessingContext context) {
        return CompletableFuture.completedFuture(this.tokens.entrySet().stream().filter(e -> ((ProcessAndSegmentId)e.getKey()).processorName.equals(processorName)).map(e -> ((SegmentAndToken)e.getValue()).segment).distinct().toList());
    }

    @Override
    public CompletableFuture<List<Segment>> fetchAvailableSegments(String processorName, ProcessingContext context) {
        return this.fetchSegments(processorName, context);
    }

    @Override
    @Nonnull
    public CompletableFuture<String> retrieveStorageIdentifier(@Nullable ProcessingContext context) {
        return CompletableFuture.completedFuture(this.identifier);
    }

    private record ProcessAndSegmentId(String processorName, int segmentId) {
    }

    private record SegmentAndToken(Segment segment, TrackingToken trackingToken) {
    }
}

