/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.messaging.eventhandling.processing.streaming.token.store.jpa;

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import jakarta.persistence.EntityManager;
import jakarta.persistence.LockModeType;
import java.time.temporal.TemporalAccessor;
import java.time.temporal.TemporalAmount;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.DateTimeUtils;
import org.axonframework.common.FutureUtils;
import org.axonframework.common.annotation.Internal;
import org.axonframework.common.jpa.EntityManagerProvider;
import org.axonframework.conversion.Converter;
import org.axonframework.messaging.core.unitofwork.ProcessingContext;
import org.axonframework.messaging.eventhandling.processing.streaming.segmenting.Segment;
import org.axonframework.messaging.eventhandling.processing.streaming.token.TrackingToken;
import org.axonframework.messaging.eventhandling.processing.streaming.token.store.ConfigToken;
import org.axonframework.messaging.eventhandling.processing.streaming.token.store.TokenStore;
import org.axonframework.messaging.eventhandling.processing.streaming.token.store.UnableToClaimTokenException;
import org.axonframework.messaging.eventhandling.processing.streaming.token.store.UnableToInitializeTokenException;
import org.axonframework.messaging.eventhandling.processing.streaming.token.store.UnableToRetrieveIdentifierException;
import org.axonframework.messaging.eventhandling.processing.streaming.token.store.jpa.JpaTokenStoreConfiguration;
import org.axonframework.messaging.eventhandling.processing.streaming.token.store.jpa.TokenEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JpaTokenStore
implements TokenStore {
    private static final Logger logger = LoggerFactory.getLogger(JpaTokenStore.class);
    private static final String CONFIG_TOKEN_ID = "__config";
    private static final Segment CONFIG_SEGMENT = new Segment(0, 0);
    private static final String OWNER_PARAM = "owner";
    private static final String PROCESSOR_NAME_PARAM = "processorName";
    private static final String SEGMENT_PARAM = "segment";
    private final EntityManagerProvider entityManagerProvider;
    private final Converter converter;
    private final TemporalAmount claimTimeout;
    private final String nodeId;
    private final LockModeType loadingLockMode;

    public JpaTokenStore(@Nonnull EntityManagerProvider entityManagerProvider, @Nonnull Converter converter, @Nonnull JpaTokenStoreConfiguration configuration) {
        BuilderUtils.assertNonNull((Object)entityManagerProvider, (String)"EntityManagerProvider is a hard requirement and should be provided");
        BuilderUtils.assertNonNull((Object)converter, (String)"The Converter is a hard requirement and should be provided");
        BuilderUtils.assertNonNull((Object)configuration, (String)"The JpaTokenStoreConfiguration should be provided");
        this.entityManagerProvider = entityManagerProvider;
        this.converter = converter;
        this.claimTimeout = configuration.claimTimeout();
        this.nodeId = configuration.nodeId();
        this.loadingLockMode = configuration.loadingLockMode();
    }

    @Override
    @Nonnull
    public CompletableFuture<List<Segment>> initializeTokenSegments(@Nonnull String processorName, int segmentCount, @Nullable TrackingToken initialToken, @Nullable ProcessingContext context) {
        try {
            EntityManager entityManager = this.entityManagerProvider.getEntityManager();
            if (((List)FutureUtils.joinAndUnwrap(this.fetchSegments(processorName, context))).size() > 0) {
                throw new UnableToClaimTokenException("Could not initialize segments. Some segments were already present.");
            }
            List<Segment> segments = Segment.splitBalanced(Segment.ROOT_SEGMENT, segmentCount - 1);
            for (Segment segment : segments) {
                entityManager.persist((Object)new TokenEntry(processorName, segment, initialToken, this.converter));
            }
            entityManager.flush();
            return CompletableFuture.completedFuture(segments);
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> storeToken(@Nullable TrackingToken token, @Nonnull String processorName, int segment, @Nullable ProcessingContext context) {
        try {
            String tokenTypeToStore;
            byte[] tokenDataToStore;
            EntityManager entityManager = this.entityManagerProvider.getEntityManager();
            if (token != null) {
                tokenDataToStore = (byte[])this.converter.convert((Object)token, byte[].class);
                tokenTypeToStore = token.getClass().getName();
            } else {
                tokenDataToStore = null;
                tokenTypeToStore = TrackingToken.class.getName();
            }
            int updatedTokens = entityManager.createQuery("UPDATE TokenEntry te SET te.token = :token, te.tokenType = :tokenType, te.timestamp = :timestamp WHERE te.owner = :owner AND te.processorName = :processorName AND te.segment = :segment").setParameter("token", (Object)tokenDataToStore).setParameter("tokenType", (Object)tokenTypeToStore).setParameter("timestamp", (Object)TokenEntry.computeTokenTimestamp()).setParameter(OWNER_PARAM, (Object)this.nodeId).setParameter(PROCESSOR_NAME_PARAM, (Object)processorName).setParameter(SEGMENT_PARAM, (Object)segment).executeUpdate();
            if (updatedTokens == 0) {
                logger.debug("Could not update token [{}] for processor [{}] and segment [{}]. Trying load-then-save approach instead.", new Object[]{token, processorName, segment});
                TokenEntry tokenEntry = this.loadToken(processorName, segment, entityManager);
                tokenEntry.updateToken(token, this.converter);
            }
            return FutureUtils.emptyCompletedFuture();
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> releaseClaim(@Nonnull String processorName, int segment, @Nullable ProcessingContext context) {
        try {
            EntityManager entityManager = this.entityManagerProvider.getEntityManager();
            entityManager.createQuery("UPDATE TokenEntry te SET te.owner = null WHERE te.owner = :owner AND te.processorName = :processorName AND te.segment = :segment").setParameter(PROCESSOR_NAME_PARAM, (Object)processorName).setParameter(SEGMENT_PARAM, (Object)segment).setParameter(OWNER_PARAM, (Object)this.nodeId).executeUpdate();
            return FutureUtils.emptyCompletedFuture();
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> initializeSegment(@Nullable TrackingToken token, @Nonnull String processorName, @Nonnull Segment segment, @Nullable ProcessingContext context) {
        try {
            EntityManager entityManager = this.entityManagerProvider.getEntityManager();
            TokenEntry entry = new TokenEntry(processorName, segment, token, this.converter);
            entityManager.persist((Object)entry);
            entityManager.flush();
            return FutureUtils.emptyCompletedFuture();
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture((Throwable)((Object)new UnableToInitializeTokenException("Could not initialize processor %d segment %s".formatted(processorName, segment), e)));
        }
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> deleteToken(@Nonnull String processorName, int segment, @Nullable ProcessingContext context) {
        try {
            EntityManager entityManager = this.entityManagerProvider.getEntityManager();
            int updates = entityManager.createQuery("DELETE FROM TokenEntry te WHERE te.owner = :owner AND te.processorName = :processorName AND te.segment = :segment").setParameter(PROCESSOR_NAME_PARAM, (Object)processorName).setParameter(SEGMENT_PARAM, (Object)segment).setParameter(OWNER_PARAM, (Object)this.nodeId).executeUpdate();
            if (updates == 0) {
                throw new UnableToClaimTokenException("Unable to remove token. It is not owned by " + this.nodeId);
            }
            return FutureUtils.emptyCompletedFuture();
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    @Override
    @Nonnull
    public CompletableFuture<TrackingToken> fetchToken(@Nonnull String processorName, int segment, @Nullable ProcessingContext context) {
        try {
            EntityManager entityManager = this.entityManagerProvider.getEntityManager();
            return CompletableFuture.completedFuture(this.loadToken(processorName, segment, entityManager).getToken(this.converter));
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    @Override
    @Nonnull
    public CompletableFuture<TrackingToken> fetchToken(@Nonnull String processorName, @Nonnull Segment segment, @Nullable ProcessingContext context) {
        try {
            EntityManager entityManager = this.entityManagerProvider.getEntityManager();
            return CompletableFuture.completedFuture(this.loadToken(processorName, segment, entityManager).getToken(this.converter));
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> extendClaim(@Nonnull String processorName, int segment, @Nullable ProcessingContext context) {
        try {
            EntityManager entityManager = this.entityManagerProvider.getEntityManager();
            int updates = entityManager.createQuery("UPDATE TokenEntry te SET te.timestamp = :timestamp WHERE te.processorName = :processorName AND te.segment = :segment AND te.owner = :owner").setParameter(PROCESSOR_NAME_PARAM, (Object)processorName).setParameter(SEGMENT_PARAM, (Object)segment).setParameter(OWNER_PARAM, (Object)this.nodeId).setParameter("timestamp", (Object)DateTimeUtils.formatInstant((TemporalAccessor)TokenEntry.clock.instant())).executeUpdate();
            if (updates == 0) {
                throw new UnableToClaimTokenException("Unable to extend the claim on token for processor '" + processorName + "[" + segment + "]'. It is either claimed by another process, or there is no such token.");
            }
            return FutureUtils.emptyCompletedFuture();
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    @Override
    @Nonnull
    public CompletableFuture<Segment> fetchSegment(@Nonnull String processorName, int segmentId, @Nullable ProcessingContext context) {
        try {
            EntityManager entityManager = this.entityManagerProvider.getEntityManager();
            TokenEntry te = (TokenEntry)entityManager.createQuery("SELECT te FROM TokenEntry te WHERE te.processorName = :processorName AND te.segment = :segment", TokenEntry.class).setParameter(SEGMENT_PARAM, (Object)segmentId).setParameter(PROCESSOR_NAME_PARAM, (Object)processorName).getSingleResultOrNull();
            return CompletableFuture.completedFuture(te == null ? null : te.getSegment());
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    @Override
    @Nonnull
    public CompletableFuture<List<Segment>> fetchSegments(@Nonnull String processorName, @Nullable ProcessingContext context) {
        try {
            EntityManager entityManager = this.entityManagerProvider.getEntityManager();
            List resultList = entityManager.createQuery("SELECT te FROM TokenEntry te WHERE te.processorName = :processorName ORDER BY te.segment ASC", TokenEntry.class).setParameter(PROCESSOR_NAME_PARAM, (Object)processorName).getResultList();
            return CompletableFuture.completedFuture(resultList.stream().map(TokenEntry::getSegment).toList());
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    @Override
    @Nonnull
    public CompletableFuture<List<Segment>> fetchAvailableSegments(@Nonnull String processorName, @Nullable ProcessingContext context) {
        try {
            EntityManager entityManager = this.entityManagerProvider.getEntityManager();
            List resultList = entityManager.createQuery("SELECT te FROM TokenEntry te WHERE te.processorName = :processorName ORDER BY te.segment ASC", TokenEntry.class).setParameter(PROCESSOR_NAME_PARAM, (Object)processorName).getResultList();
            return CompletableFuture.completedFuture(resultList.stream().filter(tokenEntry -> tokenEntry.mayClaim(this.nodeId, this.claimTimeout)).map(TokenEntry::getSegment).collect(Collectors.toList()));
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    protected TokenEntry loadToken(String processorName, int segment, EntityManager entityManager) {
        TokenEntry token = (TokenEntry)entityManager.find(TokenEntry.class, (Object)new TokenEntry.PK(processorName, segment), this.loadingLockMode);
        if (token == null) {
            throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It has not been initialized yet", processorName, segment));
        }
        if (!token.claim(this.nodeId, this.claimTimeout)) {
            throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It is owned by '%s'", processorName, segment, token.getOwner()));
        }
        return token;
    }

    protected TokenEntry loadToken(String processorName, Segment segment, EntityManager entityManager) {
        TokenEntry token = this.loadToken(processorName, segment.getSegmentId(), entityManager);
        try {
            this.validateSegment(processorName, segment, entityManager);
        }
        catch (UnableToClaimTokenException e) {
            token.releaseClaim(this.nodeId);
            throw e;
        }
        return token;
    }

    private void validateSegment(String processorName, Segment segment, EntityManager entityManager) {
        TokenEntry mergeableSegment = (TokenEntry)entityManager.find(TokenEntry.class, (Object)new TokenEntry.PK(processorName, segment.mergeableSegmentId()), this.loadingLockMode);
        if (mergeableSegment == null) {
            throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It has been merged with another segment", processorName, segment.getSegmentId()));
        }
        TokenEntry splitSegment = (TokenEntry)entityManager.find(TokenEntry.class, (Object)new TokenEntry.PK(processorName, segment.splitSegmentId()), this.loadingLockMode);
        if (splitSegment != null) {
            throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It has been split into two segments", processorName, segment.getSegmentId()));
        }
    }

    @Override
    @Nonnull
    public CompletableFuture<String> retrieveStorageIdentifier(@Nullable ProcessingContext context) {
        try {
            return CompletableFuture.completedFuture(this.getConfig().get("id"));
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture((Throwable)((Object)new UnableToRetrieveIdentifierException("Exception occurred while trying to retrieve storage identifier", e)));
        }
    }

    private ConfigToken getConfig() {
        EntityManager em = this.entityManagerProvider.getEntityManager();
        TokenEntry token = (TokenEntry)em.find(TokenEntry.class, (Object)new TokenEntry.PK(CONFIG_TOKEN_ID, CONFIG_SEGMENT.getSegmentId()), LockModeType.NONE);
        if (token == null) {
            token = new TokenEntry(CONFIG_TOKEN_ID, CONFIG_SEGMENT, new ConfigToken(Collections.singletonMap("id", UUID.randomUUID().toString())), this.converter);
            em.persist((Object)token);
            em.flush();
        }
        return (ConfigToken)token.getToken(this.converter);
    }

    @Internal
    public Converter converter() {
        return this.converter;
    }
}

