/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.retrytopic;

import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.jspecify.annotations.Nullable;
import org.springframework.core.NestedRuntimeException;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekUtils;
import org.springframework.kafka.listener.TimestampedException;
import org.springframework.kafka.retrytopic.DestinationTopic;
import org.springframework.kafka.retrytopic.DestinationTopicResolver;
import org.springframework.util.Assert;

public class DeadLetterPublishingRecovererFactory {
    private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(DeadLetterPublishingRecovererFactory.class));
    private final DestinationTopicResolver destinationTopicResolver;
    private final Set<Class<? extends Exception>> fatalExceptions = new LinkedHashSet<Class<? extends Exception>>();
    private final Set<Class<? extends Exception>> nonFatalExceptions = new HashSet<Class<? extends Exception>>();
    private Consumer<DeadLetterPublishingRecoverer> recovererCustomizer = recoverer -> {};
    private BiFunction<ConsumerRecord<?, ?>, Exception, Headers> headersFunction;
    private ListenerExceptionLoggingStrategy loggingStrategy = ListenerExceptionLoggingStrategy.AFTER_RETRIES_EXHAUSTED;
    private BiFunction<ConsumerRecord<?, ?>, String, Integer> partitionResolver = (cr, nextTopic) -> cr.partition();
    private boolean retainAllRetryHeaderValues = true;
    private DeadLetterPublisherCreator dlpCreator = this::create;

    public DeadLetterPublishingRecovererFactory(DestinationTopicResolver destinationTopicResolver) {
        this.destinationTopicResolver = destinationTopicResolver;
    }

    public void setHeadersFunction(BiFunction<ConsumerRecord<?, ?>, Exception, Headers> headersFunction) {
        this.headersFunction = headersFunction;
    }

    public void setPartitionResolver(BiFunction<ConsumerRecord<?, ?>, String, Integer> resolver) {
        Assert.notNull(resolver, (String)"'resolver' cannot be null");
        this.partitionResolver = resolver;
    }

    public void setRetainAllRetryHeaderValues(boolean retainAllRetryHeaderValues) {
        this.retainAllRetryHeaderValues = retainAllRetryHeaderValues;
    }

    public void setDeadLetterPublisherCreator(DeadLetterPublisherCreator creator) {
        Assert.notNull((Object)creator, (String)"'creator' cannot be null");
        this.dlpCreator = creator;
    }

    public void setDeadLetterPublishingRecovererCustomizer(Consumer<DeadLetterPublishingRecoverer> customizer) {
        Assert.notNull(customizer, (String)"'customizer' cannot be null");
        this.recovererCustomizer = customizer;
    }

    public final void addNotRetryableException(Class<? extends Exception> exceptionType) {
        Assert.notNull(exceptionType, (String)"'exceptionType' cannot be null");
        this.fatalExceptions.add(exceptionType);
    }

    public boolean removeNotRetryableException(Class<? extends Exception> exceptionType) {
        return this.nonFatalExceptions.add(exceptionType);
    }

    public void neverLogListenerException() {
        this.loggingStrategy = ListenerExceptionLoggingStrategy.NEVER;
    }

    public void alwaysLogListenerException() {
        this.loggingStrategy = ListenerExceptionLoggingStrategy.EACH_ATTEMPT;
    }

    public DeadLetterPublishingRecoverer create(String mainListenerId) {
        Assert.notNull((Object)mainListenerId, (String)"'listenerId' cannot be null");
        Supplier<DeadLetterPublishingRecoverer.HeaderNames> headerNamesSupplier = () -> DeadLetterPublishingRecoverer.HeaderNames.Builder.original().offsetHeader("kafka_original-offset").timestampHeader("kafka_original-timestamp").timestampTypeHeader("kafka_original-timestamp-type").topicHeader("kafka_original-topic").partitionHeader("kafka_original-partition").consumerGroupHeader("kafka_dlt-original-consumer-group").exception().keyExceptionFqcn("kafka_key-exception-fqcn").exceptionFqcn("kafka_exception-fqcn").exceptionCauseFqcn("kafka_exception-cause-fqcn").keyExceptionMessage("kafka_key-exception-message").exceptionMessage("kafka_exception-message").keyExceptionStacktrace("kafka_key-exception-stacktrace").exceptionStacktrace("kafka_exception-stacktrace").build();
        DeadLetterPublishingRecoverer recoverer = this.dlpCreator.create(this.templateResolver(mainListenerId), this.destinationResolver(mainListenerId));
        recoverer.setHeaderNamesSupplier(headerNamesSupplier);
        recoverer.setHeadersFunction((consumerRecord, e) -> this.addHeaders(mainListenerId, (ConsumerRecord<?, ?>)consumerRecord, (Exception)e, this.getAttempts((ConsumerRecord<?, ?>)consumerRecord)));
        if (this.headersFunction != null) {
            recoverer.addHeadersFunction(this.headersFunction);
        }
        recoverer.setFailIfSendResultIsError(true);
        recoverer.setAppendOriginalHeaders(false);
        recoverer.setThrowIfNoDestinationReturned(false);
        recoverer.setSkipSameTopicFatalExceptions(false);
        recoverer.setLogRecoveryRecord(false);
        this.recovererCustomizer.accept(recoverer);
        this.fatalExceptions.forEach(xva$0 -> recoverer.addNotRetryableExceptions((Class<? extends Exception>)xva$0));
        this.nonFatalExceptions.forEach(recoverer::removeClassification);
        return recoverer;
    }

    protected void maybeLogListenerException(Exception exception, ConsumerRecord<?, ?> consumerRecord, DestinationTopic nextDestination) {
        if (nextDestination.isDltTopic() && !ListenerExceptionLoggingStrategy.NEVER.equals((Object)this.loggingStrategy)) {
            LOGGER.error((Throwable)exception, () -> DeadLetterPublishingRecovererFactory.getErrorMessage(consumerRecord) + " and won't be retried. Sending to DLT with name " + nextDestination.getDestinationName() + ".");
        } else if (nextDestination.isNoOpsTopic() && !ListenerExceptionLoggingStrategy.NEVER.equals((Object)this.loggingStrategy)) {
            LOGGER.error((Throwable)exception, () -> DeadLetterPublishingRecovererFactory.getErrorMessage(consumerRecord) + " and won't be retried. No further action will be taken with this record.");
        } else if (ListenerExceptionLoggingStrategy.EACH_ATTEMPT.equals((Object)this.loggingStrategy)) {
            LOGGER.error((Throwable)exception, () -> DeadLetterPublishingRecovererFactory.getErrorMessage(consumerRecord) + ". Sending to retry topic " + nextDestination.getDestinationName() + ".");
        } else {
            LOGGER.debug((Throwable)exception, () -> DeadLetterPublishingRecovererFactory.getErrorMessage(consumerRecord) + ". Sending to retry topic " + nextDestination.getDestinationName() + ".");
        }
    }

    private DeadLetterPublishingRecoverer create(Function<ProducerRecord<?, ?>, KafkaOperations<?, ?>> templateResolver, BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
        return new DeadLetterPublishingRecoverer(templateResolver, destinationResolver);
    }

    private Function<ProducerRecord<?, ?>, KafkaOperations<?, ?>> templateResolver(String mainListenerId) {
        return outRecord -> this.destinationTopicResolver.getDestinationTopicByName(mainListenerId, outRecord.topic()).getKafkaOperations();
    }

    private BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver(String mainListenerId) {
        return (cr, ex) -> {
            if (SeekUtils.isBackoffException(ex)) {
                throw (NestedRuntimeException)ex;
            }
            DestinationTopic nextDestination = this.destinationTopicResolver.resolveDestinationTopic(mainListenerId, cr.topic(), this.getAttempts((ConsumerRecord<?, ?>)cr), (Exception)ex, this.getOriginalTimestampHeaderLong((ConsumerRecord<?, ?>)cr));
            LOGGER.debug(() -> "Resolved topic: " + (nextDestination.isNoOpsTopic() ? "none" : nextDestination.getDestinationName()));
            this.maybeLogListenerException((Exception)ex, (ConsumerRecord<?, ?>)cr, nextDestination);
            return nextDestination.isNoOpsTopic() ? null : this.resolveTopicPartition((ConsumerRecord<?, ?>)cr, nextDestination);
        };
    }

    private static String getErrorMessage(ConsumerRecord<?, ?> cr) {
        return "Record: " + DeadLetterPublishingRecovererFactory.getRecordInfo(cr) + " threw an error at topic " + cr.topic();
    }

    private static String getRecordInfo(ConsumerRecord<?, ?> cr) {
        Header originalTopicHeader = cr.headers().lastHeader("kafka_original-topic");
        return String.format("topic = %s, partition = %s, offset = %s, main topic = %s", cr.topic(), cr.partition(), cr.offset(), originalTopicHeader != null ? new String(originalTopicHeader.value()) : cr.topic());
    }

    protected TopicPartition resolveTopicPartition(ConsumerRecord<?, ?> cr, DestinationTopic nextDestination) {
        String nextTopic;
        Integer partition = this.partitionResolver.apply(cr, nextTopic = nextDestination.getDestinationName());
        return new TopicPartition(nextTopic, partition == null ? -1 : partition);
    }

    private int getAttempts(ConsumerRecord<?, ?> consumerRecord) {
        Header header = consumerRecord.headers().lastHeader("retry_topic-attempts");
        if (header != null) {
            byte[] value = header.value();
            if (value.length == 1) {
                return value[0];
            }
            if (value.length == 4) {
                return ByteBuffer.wrap(value).getInt();
            }
            LOGGER.debug(() -> "Unexpected size for retry_topic-attempts header: " + value.length);
        }
        return 1;
    }

    private Headers addHeaders(String mainListenerId, ConsumerRecord<?, ?> consumerRecord, Exception e, int attempts) {
        RecordHeaders headers = new RecordHeaders();
        byte[] originalTimestampHeader = this.getOriginalTimestampHeaderBytes(consumerRecord);
        if (!this.retainAllRetryHeaderValues) {
            headers.add((Header)new DeadLetterPublishingRecoverer.SingleRecordHeader("retry_topic-original-timestamp", originalTimestampHeader));
            headers.add((Header)new DeadLetterPublishingRecoverer.SingleRecordHeader("retry_topic-attempts", ByteBuffer.wrap(new byte[4]).putInt(attempts + 1).array()));
            headers.add((Header)new DeadLetterPublishingRecoverer.SingleRecordHeader("retry_topic-backoff-timestamp", BigInteger.valueOf(this.getNextExecutionTimestamp(mainListenerId, consumerRecord, e, originalTimestampHeader)).toByteArray()));
        } else {
            headers.add("retry_topic-original-timestamp", originalTimestampHeader);
            headers.add("retry_topic-attempts", ByteBuffer.wrap(new byte[4]).putInt(attempts + 1).array());
            headers.add("retry_topic-backoff-timestamp", BigInteger.valueOf(this.getNextExecutionTimestamp(mainListenerId, consumerRecord, e, originalTimestampHeader)).toByteArray());
        }
        return headers;
    }

    private long getNextExecutionTimestamp(String mainListenerId, ConsumerRecord<?, ?> consumerRecord, Exception e, byte[] originalTimestampHeader) {
        long originalTimestamp = new BigInteger(originalTimestampHeader).longValue();
        long failureTimestamp = this.getFailureTimestamp(e);
        long nextExecutionTimestamp = failureTimestamp + this.destinationTopicResolver.resolveDestinationTopic(mainListenerId, consumerRecord.topic(), this.getAttempts(consumerRecord), e, originalTimestamp).getDestinationDelay();
        LOGGER.debug(() -> String.format("FailureTimestamp: %s, Original timestamp: %s, nextExecutionTimestamp: %s", failureTimestamp, originalTimestamp, nextExecutionTimestamp));
        return nextExecutionTimestamp;
    }

    private long getFailureTimestamp(Exception e) {
        return e instanceof NestedRuntimeException && ((NestedRuntimeException)e).contains(TimestampedException.class) ? this.getTimestampedException(e).getTimestamp() : Instant.now().toEpochMilli();
    }

    private TimestampedException getTimestampedException(@Nullable Throwable e) {
        if (e == null) {
            throw new IllegalArgumentException("Provided exception does not contain a " + TimestampedException.class.getSimpleName() + " cause.");
        }
        return e.getClass().isAssignableFrom(TimestampedException.class) ? (TimestampedException)((Object)e) : this.getTimestampedException(e.getCause());
    }

    private byte[] getOriginalTimestampHeaderBytes(ConsumerRecord<?, ?> consumerRecord) {
        Header currentOriginalTimestampHeader = this.getOriginaTimeStampHeader(consumerRecord);
        return currentOriginalTimestampHeader != null ? currentOriginalTimestampHeader.value() : BigInteger.valueOf(consumerRecord.timestamp()).toByteArray();
    }

    private long getOriginalTimestampHeaderLong(ConsumerRecord<?, ?> consumerRecord) {
        Header currentOriginalTimestampHeader = this.getOriginaTimeStampHeader(consumerRecord);
        return currentOriginalTimestampHeader != null ? new BigInteger(currentOriginalTimestampHeader.value()).longValue() : consumerRecord.timestamp();
    }

    private @Nullable Header getOriginaTimeStampHeader(ConsumerRecord<?, ?> consumerRecord) {
        return consumerRecord.headers().lastHeader("retry_topic-original-timestamp");
    }

    private static enum ListenerExceptionLoggingStrategy {
        NEVER,
        EACH_ATTEMPT,
        AFTER_RETRIES_EXHAUSTED;

    }

    @FunctionalInterface
    public static interface DeadLetterPublisherCreator {
        public DeadLetterPublishingRecoverer create(Function<ProducerRecord<?, ?>, KafkaOperations<?, ?>> var1, BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> var2);
    }
}

