/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.routing;

import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.exception.MessagingExceptionHandler;
import org.mule.runtime.core.api.exception.MessagingExceptionHandlerAware;
import org.mule.runtime.core.api.message.InternalMessage;
import org.mule.runtime.core.api.scheduler.Scheduler;
import org.mule.runtime.core.api.store.ObjectStoreException;
import org.mule.runtime.core.config.ExceptionHelper;
import org.mule.runtime.core.config.i18n.CoreMessages;
import org.mule.runtime.core.exception.MessagingException;
import org.mule.runtime.core.message.DefaultExceptionPayload;
import org.mule.runtime.core.message.ErrorBuilder;
import org.mule.runtime.core.retry.RetryPolicyExhaustedException;
import org.mule.runtime.core.routing.AbstractUntilSuccessfulProcessingStrategy;
import org.mule.runtime.core.util.queue.objectstore.QueueKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsynchronousUntilSuccessfulProcessingStrategy
extends AbstractUntilSuccessfulProcessingStrategy
implements Initialisable,
Startable,
Stoppable,
MessagingExceptionHandlerAware {
    private static final String UNTIL_SUCCESSFUL_MSG_PREFIX = "until-successful retries exhausted. Last exception message was: %s";
    protected transient Logger logger = LoggerFactory.getLogger(this.getClass());
    private MessagingExceptionHandler messagingExceptionHandler;
    private Scheduler pool;

    public void initialise() throws InitialisationException {
        if (this.getUntilSuccessfulConfiguration().getObjectStore() == null) {
            throw new InitialisationException(I18nMessageFactory.createStaticMessage((String)"A ListableObjectStore must be configured on UntilSuccessful."), (Initialisable)this);
        }
    }

    public void start() {
        this.pool = this.muleContext.getSchedulerService().ioScheduler();
        this.scheduleAllPendingEventsForProcessing();
    }

    public void stop() {
        this.pool.shutdown();
        this.pool = null;
    }

    @Override
    protected Event doRoute(Event event, FlowConstruct flow) throws MuleException {
        try {
            Serializable eventStoreKey = this.storeEvent(event, flow);
            this.scheduleForProcessing(eventStoreKey, true);
            if (this.getUntilSuccessfulConfiguration().getAckExpression() == null) {
                return event;
            }
            return this.processResponseThroughAckResponseExpression(event);
        }
        catch (Exception e) {
            throw new MessagingException(I18nMessageFactory.createStaticMessage((String)"Failed to schedule the event for processing"), event, e, this.getUntilSuccessfulConfiguration().getRouter());
        }
    }

    private void scheduleAllPendingEventsForProcessing() {
        block5: {
            try {
                for (Serializable eventStoreKey : this.getUntilSuccessfulConfiguration().getObjectStore().allKeys()) {
                    try {
                        this.scheduleForProcessing(eventStoreKey, true);
                    }
                    catch (Exception e) {
                        this.logger.error(I18nMessageFactory.createStaticMessage((String)("Failed to schedule for processing event stored with key: " + eventStoreKey)).toString(), (Throwable)e);
                    }
                }
            }
            catch (Exception e) {
                this.logger.warn("Failure during scheduling of until successful previous jobs " + e.getMessage());
                if (!this.logger.isDebugEnabled()) break block5;
                this.logger.debug("Failure during scheduling of until successful previous jobs ", (Throwable)e);
            }
        }
    }

    private void scheduleForProcessing(Serializable eventStoreKey, boolean firstTime) {
        if (firstTime) {
            this.submitForProcessing(eventStoreKey);
        } else {
            this.pool.schedule(() -> {
                this.submitForProcessing(eventStoreKey);
                return null;
            }, this.getUntilSuccessfulConfiguration().getMillisBetweenRetries(), TimeUnit.MILLISECONDS);
        }
    }

    protected void submitForProcessing(Serializable eventStoreKey) {
        this.pool.execute(() -> {
            try {
                this.retrieveAndProcessEvent(eventStoreKey);
            }
            catch (Exception e) {
                this.incrementProcessAttemptCountAndRescheduleOrRemoveFromStore(eventStoreKey, e);
            }
        });
    }

    private void incrementProcessAttemptCountAndRescheduleOrRemoveFromStore(Serializable eventStoreKey, Exception lastException) {
        try {
            Event event = (Event)this.getUntilSuccessfulConfiguration().getObjectStore().remove(eventStoreKey);
            Integer configuredAttempts = (Integer)Event.getVariableValueOrNull("process.attempt.count", event);
            Integer deliveryAttemptCount = configuredAttempts != null ? configuredAttempts : 1;
            Event incrementedEvent = event;
            if (deliveryAttemptCount <= this.getUntilSuccessfulConfiguration().getMaxRetries()) {
                incrementedEvent = Event.builder(incrementedEvent).addVariable("process.attempt.count", deliveryAttemptCount + 1).build();
                this.getUntilSuccessfulConfiguration().getObjectStore().store(eventStoreKey, incrementedEvent);
                this.scheduleForProcessing(eventStoreKey, false);
            } else {
                this.abandonRetries(event, incrementedEvent, lastException);
            }
        }
        catch (ObjectStoreException ose) {
            this.logger.error("Failed to increment failure count for event stored with key: " + eventStoreKey, (Throwable)((Object)ose));
        }
    }

    private Serializable storeEvent(Event event, FlowConstruct flow) throws ObjectStoreException {
        Integer configuredAttempts = (Integer)Event.getVariableValueOrNull("process.attempt.count", event);
        Integer deliveryAttemptCount = configuredAttempts != null ? configuredAttempts : 1;
        return this.storeEvent(event, flow, deliveryAttemptCount);
    }

    private Serializable storeEvent(Event event, FlowConstruct flow, int deliveryAttemptCount) throws ObjectStoreException {
        event = Event.builder(event).addVariable("process.attempt.count", deliveryAttemptCount).build();
        Serializable eventStoreKey = AsynchronousUntilSuccessfulProcessingStrategy.buildQueueKey(event, flow, this.muleContext);
        this.getUntilSuccessfulConfiguration().getObjectStore().store(eventStoreKey, event);
        return eventStoreKey;
    }

    public static Serializable buildQueueKey(Event muleEvent, FlowConstruct flow, MuleContext muleContext) {
        StringBuilder keyBuilder = new StringBuilder();
        muleEvent.getGroupCorrelation().getSequence().ifPresent(v -> keyBuilder.append(v + "-"));
        keyBuilder.append(muleEvent.getContext().getId());
        keyBuilder.append("-");
        keyBuilder.append(muleContext.getClusterId());
        keyBuilder.append("-");
        keyBuilder.append(flow);
        return new QueueKey("queuestore", (Serializable)((Object)keyBuilder.toString()));
    }

    private void abandonRetries(Event event, Event mutableEvent, Exception lastException) {
        if (this.getUntilSuccessfulConfiguration().getDlqMP() == null) {
            this.logger.info("Retry attempts exhausted and no DLQ defined");
            this.messagingExceptionHandler.handleException(new MessagingException(mutableEvent, (Throwable)((Object)this.buildRetryPolicyExhaustedException(lastException))), mutableEvent);
            return;
        }
        Event eventCopy = event;
        this.logger.info("Retry attempts exhausted, routing message to DLQ: " + this.getUntilSuccessfulConfiguration().getDlqMP());
        try {
            RetryPolicyExhaustedException exception = this.buildRetryPolicyExhaustedException(lastException);
            Event mutatedEvent = Event.builder(mutableEvent).message(InternalMessage.builder(mutableEvent.getMessage()).exceptionPayload(new DefaultExceptionPayload((Throwable)((Object)exception))).build()).error(ErrorBuilder.builder((Throwable)((Object)exception)).errorType(this.muleContext.getErrorTypeLocator().lookupErrorType((Throwable)((Object)exception))).build()).build();
            this.getUntilSuccessfulConfiguration().getDlqMP().process(mutatedEvent);
        }
        catch (MessagingException e) {
            this.messagingExceptionHandler.handleException(e, eventCopy);
        }
        catch (Exception e) {
            this.messagingExceptionHandler.handleException(new MessagingException(event, e), eventCopy);
        }
    }

    protected RetryPolicyExhaustedException buildRetryPolicyExhaustedException(Exception e) {
        MuleException muleException = ExceptionHelper.getRootMuleException((Throwable)e);
        if (muleException == null) {
            return new RetryPolicyExhaustedException(CoreMessages.createStaticMessage((String)UNTIL_SUCCESSFUL_MSG_PREFIX, (Object[])new Object[]{e.getMessage()}), e, this);
        }
        if (muleException.getCause() != null) {
            RetryPolicyExhaustedException retryPolicyExhaustedException = new RetryPolicyExhaustedException(CoreMessages.createStaticMessage((String)UNTIL_SUCCESSFUL_MSG_PREFIX, (Object[])new Object[]{muleException.getMessage()}), (Object)muleException.getCause());
            retryPolicyExhaustedException.getInfo().putAll(muleException.getInfo());
            return retryPolicyExhaustedException;
        }
        RetryPolicyExhaustedException retryPolicyExhaustedException = new RetryPolicyExhaustedException(CoreMessages.createStaticMessage((String)UNTIL_SUCCESSFUL_MSG_PREFIX, (Object[])new Object[]{muleException.getMessage()}), (Object)muleException);
        retryPolicyExhaustedException.getInfo().putAll(muleException.getInfo());
        return retryPolicyExhaustedException;
    }

    private void removeFromStore(Serializable eventStoreKey) {
        try {
            this.getUntilSuccessfulConfiguration().getObjectStore().remove(eventStoreKey);
        }
        catch (ObjectStoreException ose) {
            this.logger.warn("Failed to remove following event from store with key: " + eventStoreKey);
        }
    }

    private void retrieveAndProcessEvent(Serializable eventStoreKey) throws ObjectStoreException {
        Event persistedEvent = (Event)this.getUntilSuccessfulConfiguration().getObjectStore().retrieve(eventStoreKey);
        this.processEvent(persistedEvent);
        this.removeFromStore(eventStoreKey);
    }

    @Override
    public void setMessagingExceptionHandler(MessagingExceptionHandler messagingExceptionHandler) {
        this.messagingExceptionHandler = messagingExceptionHandler;
    }
}

