/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.messaging.core.retry;

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.axonframework.common.FutureUtils;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.common.infra.DescribableComponent;
import org.axonframework.messaging.core.DelayedMessageStream;
import org.axonframework.messaging.core.Message;
import org.axonframework.messaging.core.MessageStream;
import org.axonframework.messaging.core.retry.RetryPolicy;
import org.axonframework.messaging.core.retry.RetryScheduler;
import org.axonframework.messaging.core.unitofwork.ProcessingContext;

public class AsyncRetryScheduler
implements RetryScheduler,
DescribableComponent {
    private final RetryPolicy retryPolicy;
    private final ScheduledExecutorService executor;

    public AsyncRetryScheduler(RetryPolicy retryPolicy, ScheduledExecutorService executor) {
        this.retryPolicy = retryPolicy;
        this.executor = executor;
    }

    @Override
    public <M extends Message, R extends Message> MessageStream<R> scheduleRetry(@Nonnull M message, @Nullable ProcessingContext processingContext, @Nonnull Throwable cause, @Nonnull RetryScheduler.Dispatcher<M, R> dispatcher) {
        RetryPolicy.Outcome outcome = this.retryPolicy.defineFor(message, cause, Collections.emptyList());
        if (!outcome.shouldReschedule()) {
            return MessageStream.failed(cause);
        }
        RetryTask retryTask = new RetryTask(message, cause, () -> dispatcher.dispatch(message, processingContext));
        this.executor.schedule(retryTask, outcome.rescheduleInterval(), outcome.rescheduleIntervalTimeUnit());
        return DelayedMessageStream.create(retryTask.finalResult);
    }

    @Override
    public void describeTo(@Nonnull ComponentDescriptor descriptor) {
        descriptor.describeProperty("retryPolicy", this.retryPolicy);
        descriptor.describeProperty("executor", this.executor);
    }

    private class RetryTask<T extends Message>
    implements Runnable {
        private final CompletableFuture<MessageStream<T>> finalResult = new CompletableFuture();
        private final List<Class<? extends Throwable>[]> history;
        private final Message message;
        private final Supplier<MessageStream<T>> dispatcher;

        public RetryTask(Message message, Throwable initialFailure, Supplier<MessageStream<T>> dispatcher) {
            this.message = message;
            this.dispatcher = dispatcher;
            this.history = List.of(this.simplify(initialFailure));
        }

        private RetryTask(RetryTask<T> previous, Throwable newFailure) {
            this.message = previous.message;
            this.dispatcher = previous.dispatcher;
            this.history = new ArrayList<Class<? extends Throwable>[]>(previous.history.size());
            this.history.addAll(previous.history);
            this.history.add(this.simplify(newFailure));
        }

        private Class<? extends Throwable>[] simplify(Throwable failure) {
            ArrayList causes = new ArrayList();
            Throwable cause = failure;
            do {
                causes.add(cause.getClass());
            } while ((cause = cause.getCause()) != null);
            return causes.toArray(new Class[0]);
        }

        @Override
        public void run() {
            AtomicBoolean entrySeen = new AtomicBoolean(false);
            AtomicReference retryResult = new AtomicReference();
            this.finalResult.complete(this.dispatcher.get().onNext(entry -> entrySeen.set(true)).onErrorContinue(failure -> {
                Throwable unwrapped = FutureUtils.unwrap(failure);
                if (entrySeen.get()) {
                    return MessageStream.failed(unwrapped);
                }
                return (MessageStream)retryResult.updateAndGet(current -> {
                    if (current != null) {
                        return current;
                    }
                    RetryPolicy.Outcome decision = AsyncRetryScheduler.this.retryPolicy.defineFor(this.message, unwrapped, this.history);
                    if (decision.shouldReschedule()) {
                        RetryTask<T> newTask = new RetryTask<T>(this, unwrapped);
                        AsyncRetryScheduler.this.executor.schedule(newTask, decision.rescheduleInterval(), decision.rescheduleIntervalTimeUnit());
                        return DelayedMessageStream.create(newTask.finalResult);
                    }
                    return MessageStream.failed(unwrapped);
                });
            }));
        }
    }
}

