/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.pulsar.listener;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.jspecify.annotations.Nullable;
import org.springframework.pulsar.listener.PulsarConsumerErrorHandler;
import org.springframework.pulsar.listener.PulsarMessageRecovererFactory;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.BackOffExecution;

public class DefaultPulsarConsumerErrorHandler<T>
implements PulsarConsumerErrorHandler<T> {
    private final PulsarMessageRecovererFactory<T> pulsarMessageRecovererFactory;
    private final BackOff backOff;
    private final ThreadLocal<Pair> backOffExecutionThreadLocal = new ThreadLocal();

    public DefaultPulsarConsumerErrorHandler(PulsarMessageRecovererFactory<T> pulsarMessageRecovererFactory, BackOff backOff) {
        this.pulsarMessageRecovererFactory = pulsarMessageRecovererFactory;
        this.backOff = backOff;
    }

    @Override
    public boolean shouldRetryMessage(Exception exception, Message<T> message) {
        BackOffExecution backOffExecution;
        Pair pair = this.backOffExecutionThreadLocal.get();
        if (pair != null && pair.message.equals(message)) {
            backOffExecution = pair.backOffExecution;
        } else {
            backOffExecution = this.backOff.start();
            this.backOffExecutionThreadLocal.set(new Pair(message, backOffExecution));
        }
        long nextBackOff = backOffExecution.nextBackOff();
        this.onNextBackoff(nextBackOff);
        return nextBackOff != -1L;
    }

    private void onNextBackoff(long nextBackOff) {
        if (nextBackOff > -1L) {
            try {
                Thread.sleep(nextBackOff);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override
    public void recoverMessage(Consumer<T> consumer, Message<T> message, Exception exception) {
        this.pulsarMessageRecovererFactory.recovererForConsumer(consumer).recoverMessage(message, exception);
    }

    @Override
    public @Nullable Message<T> currentMessage() {
        Pair pair = this.backOffExecutionThreadLocal.get();
        if (pair == null) {
            return null;
        }
        return pair.message();
    }

    @Override
    public void clearMessage() {
        this.backOffExecutionThreadLocal.remove();
    }

    private record Pair(Message<?> message, BackOffExecution backOffExecution) {
    }
}

