package io.smallrye.reactive.messaging.kafka.fault;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.commit.ContextHolder;
import io.smallrye.reactive.messaging.kafka.commit.KafkaLatestCommit;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.ConfigurationCleaner;
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer;
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer;
import io.smallrye.reactive.messaging.kafka.impl.RuntimeKafkaSourceConfiguration;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.reactive.messaging.Metadata;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/fault/KafkaDelayedRetryTopic.class */
public class KafkaDelayedRetryTopic extends ContextHolder implements KafkaFailureHandler {
    public static final String DELAYED_RETRY_TOPIC_STRATEGY = "delayed-retry-topic";
    public static final String DELAYED_RETRY_COUNT = "delayed-retry-count";
    public static final String DELAYED_RETRY_ORIGINAL_TIMESTAMP = "delayed-retry-original-timestamp";
    public static final String DELAYED_RETRY_FIRST_PROCESSING_TIMESTAMP = "delayed-retry-first-processing-timestamp";
    public static final String DELAYED_RETRY_EXCEPTION_CLASS_NAME = "delayed-retry-exception-class-name";
    public static final String DELAYED_RETRY_CAUSE_CLASS_NAME = "delayed-retry-cause-class-name";
    public static final String DELAYED_RETRY_REASON = "delayed-retry-reason";
    public static final String DELAYED_RETRY_CAUSE = "delayed-retry-cause";
    public static final String DELAYED_RETRY_TOPIC = "delayed-retry-topic";
    public static final String DELAYED_RETRY_OFFSET = "delayed-retry-offset";
    public static final String DELAYED_RETRY_PARTITION = "delayed-retry-partition";
    private final String channel;
    private final Vertx vertx;
    private final KafkaConnectorIncomingConfiguration configuration;
    private final String deadQueueTopic;
    private final KafkaProducer producer;
    private final ReactiveKafkaConsumer consumer;
    private final List<String> retryTopics;
    private final int maxRetries;
    private final long retryTimeout;
    private final BiConsumer<Throwable, Boolean> reportFailure;

    @ApplicationScoped
    @Identifier("delayed-retry-topic")
    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/fault/KafkaDelayedRetryTopic$Factory.class */
    public static class Factory implements KafkaFailureHandler.Factory {

        @Inject
        KafkaCDIEvents kafkaCDIEvents;

        @Inject
        Instance<DeserializationFailureHandler<?>> failureHandlers;

        @Override // io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler.Factory
        public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Vertx vertx, KafkaConsumer<?, ?> kafkaConsumer, BiConsumer<Throwable, Boolean> biConsumer) {
            HashMap hashMap = new HashMap(kafkaConsumer.configuration());
            String str = (String) hashMap.remove("key.deserializer");
            String str2 = (String) hashMap.remove("value.deserializer");
            hashMap.remove("interceptor.classes");
            hashMap.put("key.serializer", kafkaConnectorIncomingConfiguration.getDeadLetterQueueKeySerializer().orElse(KafkaDelayedRetryTopic.getMirrorSerializer(str)));
            hashMap.put("value.serializer", kafkaConnectorIncomingConfiguration.getDeadLetterQueueValueSerializer().orElse(KafkaDelayedRetryTopic.getMirrorSerializer(str2)));
            hashMap.put("client.id", kafkaConnectorIncomingConfiguration.getDeadLetterQueueProducerClientId().orElse("kafka-delayed-retry-topic-producer-" + hashMap.get("client.id")));
            ConfigurationCleaner.cleanupProducerConfiguration(hashMap);
            List list = (List) kafkaConnectorIncomingConfiguration.getDelayedRetryTopicTopics().map(str3 -> {
                return (List) Arrays.stream(str3.split(",")).collect(Collectors.toList());
            }).orElseGet(() -> {
                return (List) Stream.of((Object[]) new String[]{KafkaDelayedRetryTopic.getRetryTopic(kafkaConnectorIncomingConfiguration.getChannel(), 10000), KafkaDelayedRetryTopic.getRetryTopic(kafkaConnectorIncomingConfiguration.getChannel(), 20000), KafkaDelayedRetryTopic.getRetryTopic(kafkaConnectorIncomingConfiguration.getChannel(), 50000)}).collect(Collectors.toList());
            });
            int intValue = kafkaConnectorIncomingConfiguration.getDelayedRetryTopicMaxRetries().orElse(Integer.valueOf(list.size())).intValue();
            long intValue2 = kafkaConnectorIncomingConfiguration.getDelayedRetryTopicTimeout().intValue();
            String orElse = kafkaConnectorIncomingConfiguration.getDeadLetterQueueTopic().orElse(null);
            KafkaLogging.log.delayedRetryTopic(kafkaConnectorIncomingConfiguration.getChannel(), list, intValue, intValue2, orElse);
            KafkaDeadLetterSerializationHandler kafkaDeadLetterSerializationHandler = new KafkaDeadLetterSerializationHandler();
            ReactiveKafkaProducer reactiveKafkaProducer = new ReactiveKafkaProducer(hashMap, (String) list.get(0), 10000, false, null, kafkaDeadLetterSerializationHandler, kafkaDeadLetterSerializationHandler, (producer, map) -> {
                this.kafkaCDIEvents.producer().fire(producer);
            });
            HashMap hashMap2 = new HashMap(kafkaConsumer.configuration());
            hashMap2.put("client.id", "kafka-delayed-retry-topic-" + hashMap2.get("client.id"));
            hashMap2.put("group.id", "kafka-delayed-retry-topic-" + hashMap2.get("group.id"));
            return new KafkaDelayedRetryTopic(kafkaConnectorIncomingConfiguration.getChannel(), vertx, kafkaConnectorIncomingConfiguration, list, intValue, intValue2, orElse, reactiveKafkaProducer, new ReactiveKafkaConsumer(hashMap2, ReactiveKafkaConsumer.createDeserializationFailureHandler(true, this.failureHandlers, kafkaConnectorIncomingConfiguration), ReactiveKafkaConsumer.createDeserializationFailureHandler(false, this.failureHandlers, kafkaConnectorIncomingConfiguration), RuntimeKafkaSourceConfiguration.buildFromConfiguration(kafkaConnectorIncomingConfiguration), true, kafkaConnectorIncomingConfiguration.getPollTimeout().intValue(), kafkaConnectorIncomingConfiguration.getFailOnDeserializationFailure().booleanValue(), consumer -> {
                this.kafkaCDIEvents.consumer().fire(consumer);
            }, biConsumer, vertx.getDelegate().createEventLoopContext()), biConsumer);
        }
    }

    public KafkaDelayedRetryTopic(String str, Vertx vertx, KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, List<String> list, int i, long j, String str2, KafkaProducer kafkaProducer, ReactiveKafkaConsumer reactiveKafkaConsumer, BiConsumer<Throwable, Boolean> biConsumer) {
        super(vertx, ((Integer) kafkaConnectorIncomingConfiguration.config().getOptionalValue("default.api.timeout.ms", Integer.class).orElse(60000)).intValue());
        this.channel = str;
        this.vertx = vertx;
        this.configuration = kafkaConnectorIncomingConfiguration;
        this.retryTopics = list;
        this.maxRetries = i;
        this.retryTimeout = j;
        this.deadQueueTopic = str2;
        this.producer = kafkaProducer;
        this.consumer = reactiveKafkaConsumer;
        this.reportFailure = biConsumer;
    }

    public static String getRetryTopic(String str, int i) {
        return String.format("%s_retry_%d", str, Integer.valueOf(i));
    }

    private static String getMirrorSerializer(String str) {
        return str == null ? StringSerializer.class.getName() : str.replace("Deserializer", "Serializer");
    }

    private String getThrowableMessage(Throwable th) {
        String message = th.getMessage();
        if (message == null) {
            message = th.toString();
        }
        return message;
    }

    public Multi<? extends IncomingKafkaRecord<?, ?>> retryStream() {
        KafkaLatestCommit kafkaLatestCommit = new KafkaLatestCommit(this.vertx, this.configuration, this.consumer);
        this.consumer.setRebalanceListener(null, kafkaLatestCommit);
        Multi subscribe = this.consumer.subscribe(new HashSet(this.retryTopics));
        kafkaLatestCommit.capture(getContext());
        return subscribe.onItem().transform(consumerRecord -> {
            return new IncomingKafkaRecord(consumerRecord, this.channel, -1, kafkaLatestCommit, this, this.configuration.getCloudEvents().booleanValue(), this.configuration.getTracingEnabled().booleanValue());
        }).onItem().transformToUni(incomingKafkaRecord -> {
            incrementRetryHeader(incomingKafkaRecord.getHeaders());
            Duration delay = getDelay(incomingKafkaRecord);
            return delay.isNegative() ? Uni.createFrom().item(incomingKafkaRecord) : Uni.createFrom().item(incomingKafkaRecord).onItem().delayIt().by(delay);
        }).concatenate(false);
    }

    @Override // io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler
    public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> incomingKafkaRecord, Throwable th, Metadata metadata) {
        OutgoingKafkaRecordMetadata outgoingKafkaRecordMetadata = metadata != null ? (OutgoingKafkaRecordMetadata) metadata.get(OutgoingKafkaRecordMetadata.class).orElse(null) : null;
        setOriginalTimestampHeader(incomingKafkaRecord);
        setFirstProcessingTimestampHeader(incomingKafkaRecord);
        String nextTopic = getNextTopic(this.retryTopics, this.deadQueueTopic, this.maxRetries, setAndGetRetryHeader(incomingKafkaRecord.getHeaders()));
        if (!Objects.equals(nextTopic, this.deadQueueTopic)) {
            if (retryWillTimeout(incomingKafkaRecord, this.retryTimeout, getDelayFromTopic(nextTopic))) {
                KafkaLogging.log.delayedRetryTimeout(this.channel, this.retryTimeout, recordToString(incomingKafkaRecord));
                nextTopic = this.deadQueueTopic;
            }
        }
        if (outgoingKafkaRecordMetadata != null && outgoingKafkaRecordMetadata.getTopic() != null) {
            nextTopic = outgoingKafkaRecordMetadata.getTopic();
        }
        Object key = incomingKafkaRecord.getKey();
        if (outgoingKafkaRecordMetadata != null && outgoingKafkaRecordMetadata.getKey() != null) {
            key = outgoingKafkaRecordMetadata.getKey();
        }
        int partition = incomingKafkaRecord.getPartition();
        if (outgoingKafkaRecordMetadata != null && outgoingKafkaRecordMetadata.getPartition() >= 0) {
            partition = outgoingKafkaRecordMetadata.getPartition();
        }
        if (nextTopic == null) {
            KafkaLogging.log.delayedRetryNoDlq(this.channel);
            Uni completionStage = Uni.createFrom().completionStage(incomingKafkaRecord.ack());
            Objects.requireNonNull(incomingKafkaRecord);
            return completionStage.emitOn(incomingKafkaRecord::runOnMessageContext);
        }
        ProducerRecord<?, ?> producerRecord = new ProducerRecord<>(nextTopic, Integer.valueOf(partition), key, incomingKafkaRecord.getPayload());
        addHeader(producerRecord, DELAYED_RETRY_EXCEPTION_CLASS_NAME, th.getClass().getName());
        addHeader(producerRecord, DELAYED_RETRY_REASON, getThrowableMessage(th));
        if (th.getCause() != null) {
            addHeader(producerRecord, DELAYED_RETRY_CAUSE_CLASS_NAME, th.getCause().getClass().getName());
            addHeader(producerRecord, DELAYED_RETRY_CAUSE, getThrowableMessage(th.getCause()));
        }
        addHeader(producerRecord, "delayed-retry-topic", incomingKafkaRecord.getTopic());
        addHeader(producerRecord, DELAYED_RETRY_PARTITION, Integer.toString(incomingKafkaRecord.getPartition()));
        addHeader(producerRecord, DELAYED_RETRY_OFFSET, Long.toString(incomingKafkaRecord.getOffset()));
        incomingKafkaRecord.getHeaders().forEach(header -> {
            producerRecord.headers().add(header);
        });
        if (outgoingKafkaRecordMetadata != null && outgoingKafkaRecordMetadata.getHeaders() != null) {
            outgoingKafkaRecordMetadata.getHeaders().forEach(header2 -> {
                producerRecord.headers().add(header2);
            });
        }
        producerRecord.headers().remove(DeserializationFailureHandler.DESERIALIZATION_FAILURE_DLQ);
        KafkaLogging.log.delayedRetryNack(this.channel, nextTopic);
        Uni chain = this.producer.send(producerRecord).onFailure().invoke(obj -> {
            this.reportFailure.accept((Throwable) obj, true);
        }).onItem().ignore().andContinueWithNull().chain(() -> {
            return Uni.createFrom().completionStage(incomingKafkaRecord.ack());
        });
        Objects.requireNonNull(incomingKafkaRecord);
        return chain.emitOn(incomingKafkaRecord::runOnMessageContext);
    }

    private boolean retryWillTimeout(IncomingKafkaRecord<?, ?> incomingKafkaRecord, long j, int i) {
        return Duration.between(getFirstProcessingTimestamp(incomingKafkaRecord), Instant.now().plus((long) i, (TemporalUnit) ChronoUnit.MILLIS)).toMillis() > j;
    }

    void addHeader(ProducerRecord<?, ?> producerRecord, String str, String str2) {
        producerRecord.headers().add(str, str2.getBytes(StandardCharsets.UTF_8));
    }

    private static Instant getTimestampHeader(Headers headers, String str, long j) {
        Header lastHeader = headers.lastHeader(str);
        return Instant.ofEpochMilli(lastHeader == null ? j : Long.parseLong(new String(lastHeader.value())));
    }

    private static void setTimestampHeader(Headers headers, String str, long j) {
        if (headers.lastHeader(str) == null) {
            headers.add(str, Long.toString(j).getBytes(StandardCharsets.UTF_8));
        }
    }

    @Override // io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler
    public void terminate() {
        this.producer.close();
        this.consumer.close();
    }

    private static Duration getDelay(IncomingKafkaRecord<?, ?> incomingKafkaRecord) {
        return Duration.between(Instant.now(), incomingKafkaRecord.getTimestamp().plus(getDelayFromTopic(incomingKafkaRecord.getTopic()), (TemporalUnit) ChronoUnit.MILLIS));
    }

    public static String getNextTopic(List<String> list, String str, int i, int i2) {
        return i2 < (i <= 0 ? list.size() : i) ? list.get(Math.min(i2, list.size() - 1)) : str;
    }

    public static int getRetryHeader(Headers headers) {
        Header lastHeader = headers.lastHeader(DELAYED_RETRY_COUNT);
        if (lastHeader == null) {
            return 0;
        }
        return Integer.parseInt(new String(lastHeader.value()));
    }

    private static void incrementRetryHeader(Headers headers) {
        headers.add(DELAYED_RETRY_COUNT, Integer.toString(getRetryHeader(headers) + 1).getBytes(StandardCharsets.UTF_8));
    }

    private static int setAndGetRetryHeader(Headers headers) {
        int retryHeader = getRetryHeader(headers);
        if (retryHeader == 0) {
            headers.add(DELAYED_RETRY_COUNT, Integer.toString(retryHeader).getBytes(StandardCharsets.UTF_8));
        }
        return retryHeader;
    }

    private static void setOriginalTimestampHeader(IncomingKafkaRecord<?, ?> incomingKafkaRecord) {
        setTimestampHeader(incomingKafkaRecord.getHeaders(), DELAYED_RETRY_ORIGINAL_TIMESTAMP, incomingKafkaRecord.getTimestamp().toEpochMilli());
    }

    private static Instant getFirstProcessingTimestamp(IncomingKafkaRecord<?, ?> incomingKafkaRecord) {
        return getTimestampHeader(incomingKafkaRecord.getHeaders(), DELAYED_RETRY_FIRST_PROCESSING_TIMESTAMP, Instant.now().toEpochMilli());
    }

    private static void setFirstProcessingTimestampHeader(IncomingKafkaRecord<?, ?> incomingKafkaRecord) {
        setTimestampHeader(incomingKafkaRecord.getHeaders(), DELAYED_RETRY_FIRST_PROCESSING_TIMESTAMP, Instant.now().toEpochMilli());
    }

    private static int getDelayFromTopic(String str) {
        return Integer.parseInt(str.substring(str.lastIndexOf("_") + 1));
    }

    private static String recordToString(IncomingKafkaRecord<?, ?> incomingKafkaRecord) {
        return String.format("%s-%d:%d", incomingKafkaRecord.getTopic(), Integer.valueOf(incomingKafkaRecord.getPartition()), Long.valueOf(incomingKafkaRecord.getOffset()));
    }
}
