/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling.deadletter;

import jakarta.annotation.Nonnull;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Predicate;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.PropagatingErrorHandler;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.SimpleEventHandlerInvoker;
import org.axonframework.eventhandling.deadletter.DeadLetteredEventProcessingTask;
import org.axonframework.eventhandling.deadletter.SequenceIdentifierCache;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.MessageHandlerInterceptorSupport;
import org.axonframework.messaging.deadletter.DeadLetter;
import org.axonframework.messaging.deadletter.Decisions;
import org.axonframework.messaging.deadletter.EnqueueDecision;
import org.axonframework.messaging.deadletter.EnqueuePolicy;
import org.axonframework.messaging.deadletter.GenericDeadLetter;
import org.axonframework.messaging.deadletter.SequencedDeadLetterProcessor;
import org.axonframework.messaging.deadletter.SequencedDeadLetterQueue;
import org.axonframework.messaging.deadletter.ThrowableCause;
import org.axonframework.messaging.unitofwork.LegacyDefaultUnitOfWork;
import org.axonframework.messaging.unitofwork.ProcessingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeadLetteringEventHandlerInvoker
extends SimpleEventHandlerInvoker
implements SequencedDeadLetterProcessor<EventMessage<?>>,
MessageHandlerInterceptorSupport<EventMessage<?>> {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final SequencedDeadLetterQueue<EventMessage<?>> queue;
    private final EnqueuePolicy<EventMessage<?>> enqueuePolicy;
    private final TransactionManager transactionManager;
    private final boolean allowReset;
    private final boolean sequenceIdentifierCacheEnabled;
    private final int sequenceIdentifierCacheSize;
    private final Map<Segment, SequenceIdentifierCache> sequenceIdentifierCache;
    private final List<MessageHandlerInterceptor<? super EventMessage<?>>> interceptors = new CopyOnWriteArrayList();

    protected DeadLetteringEventHandlerInvoker(Builder builder) {
        super(builder);
        this.queue = builder.queue;
        this.enqueuePolicy = builder.enqueuePolicy;
        this.transactionManager = builder.transactionManager;
        this.allowReset = builder.allowReset;
        this.sequenceIdentifierCacheEnabled = builder.sequenceIdentifierCacheEnabled;
        this.sequenceIdentifierCacheSize = builder.sequenceIdentifierCacheSize;
        this.sequenceIdentifierCache = this.sequenceIdentifierCacheEnabled ? new ConcurrentHashMap<Segment, SequenceIdentifierCache>() : null;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override
    public void handle(@Nonnull EventMessage<?> message, @Nonnull ProcessingContext context, @Nonnull Segment segment) throws Exception {
        if (!super.sequencingPolicyMatchesSegment(message, segment)) {
            logger.trace("Ignoring event with id [{}] as it is not assigned to segment [{}].", (Object)message.getIdentifier(), (Object)segment);
            return;
        }
        Object sequenceIdentifier = super.sequenceIdentifier(message);
        boolean mightBePresent = this.mightBePresent(sequenceIdentifier, segment);
        if (this.isPresent(mightBePresent, sequenceIdentifier, message)) {
            if (logger.isInfoEnabled()) {
                logger.info("Event with id [{}] is added to the dead-letter queue since its queue id [{}] is already present.", (Object)message.getIdentifier(), sequenceIdentifier);
            }
            this.markEnqueued(sequenceIdentifier, segment);
        } else {
            if (mightBePresent) {
                this.markNotEnqueued(sequenceIdentifier, segment);
            }
            this.invokeHandlers(message, context, segment, sequenceIdentifier);
        }
    }

    private boolean isPresent(boolean mightBePresent, Object sequenceIdentifier, EventMessage<?> message) {
        return mightBePresent && this.queue.enqueueIfPresent(sequenceIdentifier, () -> new GenericDeadLetter<EventMessage>(sequenceIdentifier, message));
    }

    private void invokeHandlers(@Nonnull EventMessage<?> message, @Nonnull ProcessingContext context, @Nonnull Segment segment, Object sequenceIdentifier) {
        block4: {
            if (logger.isTraceEnabled()) {
                logger.trace("Event [{}] with queue id [{}] is not present in the dead-letter queue.Handle operation is delegated to the wrapped EventHandlerInvoker.", message, sequenceIdentifier);
            }
            try {
                super.invokeHandlers(message, context);
            }
            catch (Exception e) {
                GenericDeadLetter letter = new GenericDeadLetter(sequenceIdentifier, message, e);
                EnqueueDecision<EventMessage<?>> decision = this.enqueuePolicy.decide(letter, e);
                if (decision.shouldEnqueue()) {
                    Throwable cause = decision.enqueueCause().orElse(null);
                    this.markEnqueued(sequenceIdentifier, segment);
                    this.queue.enqueue(sequenceIdentifier, decision.withDiagnostics(letter.withCause(cause)));
                }
                if (!logger.isInfoEnabled()) break block4;
                logger.info("The enqueue policy decided not to dead letter event [{}].", (Object)message.getIdentifier());
            }
        }
    }

    private boolean mightBePresent(@Nonnull Object sequenceIdentifier, @Nonnull Segment segment) {
        if (!this.sequenceIdentifierCacheEnabled) {
            return true;
        }
        return this.sequenceIdentifierCache.computeIfAbsent(segment, k -> new SequenceIdentifierCache(segment.getSegmentId(), this.sequenceIdentifierCacheSize, this.queue)).mightBePresent(sequenceIdentifier);
    }

    private void markEnqueued(@Nonnull Object sequenceIdentifier, @Nonnull Segment segment) {
        if (this.sequenceIdentifierCacheEnabled) {
            this.sequenceIdentifierCache.computeIfPresent(segment, (k, v) -> v.markEnqueued(sequenceIdentifier));
        }
    }

    private void markNotEnqueued(@Nonnull Object sequenceIdentifier, @Nonnull Segment segment) {
        if (this.sequenceIdentifierCacheEnabled) {
            this.sequenceIdentifierCache.computeIfPresent(segment, (k, v) -> v.markNotEnqueued(sequenceIdentifier));
        }
    }

    @Override
    public void performReset(ProcessingContext context) {
        if (this.allowReset) {
            this.transactionManager.executeInTransaction(this.queue::clear);
        }
        super.performReset(null, context);
    }

    @Override
    public <R> void performReset(R resetContext, ProcessingContext context) {
        if (this.allowReset) {
            this.transactionManager.executeInTransaction(this.queue::clear);
        }
        super.performReset(resetContext, context);
    }

    @Override
    public boolean process(Predicate<DeadLetter<? extends EventMessage<?>>> sequenceFilter) {
        DeadLetteredEventProcessingTask processingTask = new DeadLetteredEventProcessingTask(super.eventHandlers(), this.interceptors, this.enqueuePolicy, this.transactionManager);
        LegacyDefaultUnitOfWork<Object> uow = new LegacyDefaultUnitOfWork<Object>(null);
        uow.attachTransaction(this.transactionManager);
        return (Boolean)uow.executeWithResult(ctx -> this.queue.process(sequenceFilter, processingTask::process)).getPayload();
    }

    @Override
    public Registration registerHandlerInterceptor(@Nonnull MessageHandlerInterceptor<? super EventMessage<?>> interceptor) {
        this.interceptors.add(interceptor);
        return () -> this.interceptors.remove(interceptor);
    }

    @Override
    public void segmentReleased(Segment segment) {
        if (this.sequenceIdentifierCacheEnabled) {
            if (logger.isTraceEnabled()) {
                logger.trace("Clearing the cache for segment [{}].", (Object)segment.getSegmentId());
            }
            this.sequenceIdentifierCache.remove(segment);
        }
        super.segmentReleased(segment);
    }

    public static class Builder
    extends SimpleEventHandlerInvoker.Builder<Builder> {
        private SequencedDeadLetterQueue<EventMessage<?>> queue;
        private EnqueuePolicy<EventMessage<?>> enqueuePolicy = (letter, cause) -> Decisions.enqueue(ThrowableCause.truncated(cause));
        private TransactionManager transactionManager;
        private boolean allowReset = false;
        private boolean sequenceIdentifierCacheEnabled = false;
        private int sequenceIdentifierCacheSize = 1024;

        private Builder() {
            super.listenerInvocationErrorHandler(PropagatingErrorHandler.instance());
        }

        public Builder queue(@Nonnull SequencedDeadLetterQueue<EventMessage<?>> queue) {
            BuilderUtils.assertNonNull(queue, "The DeadLetterQueue may not be null");
            this.queue = queue;
            return this;
        }

        public Builder enqueuePolicy(EnqueuePolicy<EventMessage<?>> enqueuePolicy) {
            BuilderUtils.assertNonNull(enqueuePolicy, "The EnqueuePolicy should be non null");
            this.enqueuePolicy = enqueuePolicy;
            return this;
        }

        public Builder transactionManager(@Nonnull TransactionManager transactionManager) {
            BuilderUtils.assertNonNull(transactionManager, "The TransactionManager may not be null");
            this.transactionManager = transactionManager;
            return this;
        }

        public Builder allowReset(boolean allowReset) {
            this.allowReset = allowReset;
            return this;
        }

        public Builder enableSequenceIdentifierCache() {
            this.sequenceIdentifierCacheEnabled = true;
            return this;
        }

        public Builder sequenceIdentifierCacheSize(int sequenceIdentifierCacheSize) {
            this.sequenceIdentifierCacheSize = sequenceIdentifierCacheSize;
            return this;
        }

        @Override
        public DeadLetteringEventHandlerInvoker build() {
            return new DeadLetteringEventHandlerInvoker(this);
        }

        @Override
        protected void validate() {
            BuilderUtils.assertNonNull(this.queue, "The DeadLetterQueue is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.transactionManager, "The TransactionManager is a hard requirement and should be provided");
        }
    }
}

