package org.springframework.integration.kafka.outbound;

import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.core.AttributeAccessor;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.expression.FunctionExpression;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.kafka.support.KafkaIntegrationHeaders;
import org.springframework.integration.kafka.support.KafkaSendFailureException;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.management.ManageableLifecycle;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonPresent;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.SimpleKafkaHeaderMapper;
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.class */
public class KafkaProducerMessageHandler<K, V> extends AbstractReplyProducingMessageHandler implements ManageableLifecycle {
    private static final int DEFAULT_TIMEOUT_BUFFER = 5000;
    private static final int TWENTY = 20;
    private static final Duration DEFAULT_ASSIGNMENT_TIMEOUT = Duration.ofSeconds(20);
    private final KafkaTemplate<K, V> kafkaTemplate;
    private final boolean isGateway;
    private final boolean transactional;
    private final boolean allowNonTransactional;
    private final long deliveryTimeoutMsProperty;
    private EvaluationContext evaluationContext;
    private Expression topicExpression;
    private Expression messageKeyExpression;
    private Expression partitionIdExpression;
    private Expression timestampExpression;
    private boolean sync;
    private Expression sendTimeoutExpression;
    private KafkaHeaderMapper headerMapper;
    private MessageChannel sendFailureChannel;
    private String sendFailureChannelName;
    private MessageChannel sendSuccessChannel;
    private String sendSuccessChannelName;
    private MessageChannel futuresChannel;
    private String futuresChannelName;
    private boolean useTemplateConverter;
    private volatile byte[] singleReplyTopic;
    private final Map<String, Set<Integer>> replyTopicsAndPartitions = new HashMap();
    private final AtomicBoolean running = new AtomicBoolean();
    private Expression flushExpression = new FunctionExpression(message -> {
        return Boolean.valueOf(Boolean.TRUE.equals(message.getHeaders().get(KafkaIntegrationHeaders.FLUSH)));
    });
    private RecordMessageConverter replyMessageConverter = new MessagingMessageConverter();
    private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
    private Type replyPayloadType = Object.class;
    private ProducerRecordCreator<K, V> producerRecordCreator = (message, str, num, l, obj, obj2, headers) -> {
        return new ProducerRecord(str, num, l, obj, obj2, headers);
    };
    private int timeoutBuffer = DEFAULT_TIMEOUT_BUFFER;
    private Duration assignmentDuration = DEFAULT_ASSIGNMENT_TIMEOUT;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler$ConvertingReplyFuture.class */
    public final class ConvertingReplyFuture extends CompletableFuture<Object> {
        ConvertingReplyFuture(RequestReplyFuture<?, ?, Object> requestReplyFuture) {
            addCallback(requestReplyFuture);
        }

        private void addCallback(RequestReplyFuture<?, ?, Object> requestReplyFuture) {
            requestReplyFuture.whenComplete((consumerRecord, th) -> {
                if (th != null) {
                    completeExceptionally(th);
                    return;
                }
                try {
                    complete(dontLeakHeaders(KafkaProducerMessageHandler.this.replyMessageConverter.toMessage(consumerRecord, (Acknowledgment) null, (Consumer) null, KafkaProducerMessageHandler.this.replyPayloadType)));
                } catch (Exception e) {
                    completeExceptionally(e);
                }
            });
        }

        private Message<?> dontLeakHeaders(Message<?> message) {
            if (!(message.getHeaders() instanceof KafkaMessageHeaders)) {
                return KafkaProducerMessageHandler.this.getMessageBuilderFactory().fromMessage(message).removeHeader("kafka_correlationId").removeHeader("kafka_replyTopic").removeHeader("kafka_replyPartition").build();
            }
            Map rawHeaders = message.getHeaders().getRawHeaders();
            rawHeaders.remove("kafka_correlationId");
            rawHeaders.remove("kafka_replyTopic");
            rawHeaders.remove("kafka_replyPartition");
            return message;
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler$ProducerRecordCreator.class */
    public interface ProducerRecordCreator<K, V> {
        ProducerRecord<K, V> create(Message<?> message, String str, Integer num, Long l, K k, V v, Headers headers);
    }

    public KafkaProducerMessageHandler(KafkaTemplate<K, V> kafkaTemplate) {
        Assert.notNull(kafkaTemplate, "kafkaTemplate cannot be null");
        this.kafkaTemplate = kafkaTemplate;
        this.isGateway = kafkaTemplate instanceof ReplyingKafkaTemplate;
        if (this.isGateway) {
            setAsync(true);
            updateNotPropagatedHeaders(new String[]{"kafka_topic", "kafka_partitionId", "kafka_messageKey"}, false);
        }
        if (JacksonPresent.isJackson2Present()) {
            this.headerMapper = new DefaultKafkaHeaderMapper();
        } else {
            this.headerMapper = new SimpleKafkaHeaderMapper();
        }
        this.transactional = kafkaTemplate.isTransactional();
        this.allowNonTransactional = kafkaTemplate.isAllowNonTransactional();
        if (this.transactional && this.isGateway) {
            this.logger.warn("The KafkaTemplate is transactional; this gateway will only work if the consumer is configured to read uncommitted records");
        }
        determineSendTimeout();
        this.deliveryTimeoutMsProperty = ((Long) this.sendTimeoutExpression.getValue(Long.class)).longValue() - this.timeoutBuffer;
    }

    private void determineSendTimeout() {
        Object obj = this.kafkaTemplate.getProducerFactory().getConfigurationProperties().get("delivery.timeout.ms");
        if (obj == null) {
            obj = ProducerConfig.configDef().defaultValues().get("delivery.timeout.ms");
        }
        if (obj instanceof Long) {
            setSendTimeout(((Long) obj).longValue() + this.timeoutBuffer);
        } else if (obj instanceof Integer) {
            setSendTimeout(Long.valueOf(((Integer) obj).intValue()).longValue() + this.timeoutBuffer);
        } else if (obj instanceof String) {
            setSendTimeout(Long.parseLong((String) obj) + this.timeoutBuffer);
        }
    }

    public void setTopicExpression(Expression expression) {
        this.topicExpression = expression;
    }

    public void setMessageKeyExpression(Expression expression) {
        this.messageKeyExpression = expression;
    }

    public void setPartitionIdExpression(Expression expression) {
        this.partitionIdExpression = expression;
    }

    public void setTimestampExpression(Expression expression) {
        this.timestampExpression = expression;
    }

    public void setFlushExpression(Expression expression) {
        Assert.notNull(expression, "'flushExpression' cannot be null");
        this.flushExpression = expression;
    }

    public void setHeaderMapper(KafkaHeaderMapper kafkaHeaderMapper) {
        this.headerMapper = kafkaHeaderMapper;
    }

    public KafkaHeaderMapper getHeaderMapper() {
        return this.headerMapper;
    }

    public KafkaTemplate<?, ?> getKafkaTemplate() {
        return this.kafkaTemplate;
    }

    public void setSync(boolean z) {
        this.sync = z;
    }

    public final void setSendTimeout(long j) {
        super.setSendTimeout(j);
        setSendTimeoutExpression(new ValueExpression(Long.valueOf(j)));
    }

    public void setSendTimeoutExpression(Expression expression) {
        Assert.notNull(expression, "'sendTimeoutExpression' must not be null");
        this.sendTimeoutExpression = expression;
    }

    public void setSendFailureChannel(MessageChannel messageChannel) {
        this.sendFailureChannel = messageChannel;
    }

    public void setSendFailureChannelName(String str) {
        this.sendFailureChannelName = str;
    }

    public void setSendSuccessChannel(MessageChannel messageChannel) {
        this.sendSuccessChannel = messageChannel;
    }

    public void setSendSuccessChannelName(String str) {
        this.sendSuccessChannelName = str;
    }

    public void setFuturesChannel(MessageChannel messageChannel) {
        this.futuresChannel = messageChannel;
    }

    public void setFuturesChannelName(String str) {
        this.futuresChannelName = str;
    }

    public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
        Assert.notNull(errorMessageStrategy, "'errorMessageStrategy' cannot be null");
        this.errorMessageStrategy = errorMessageStrategy;
    }

    public void setReplyMessageConverter(RecordMessageConverter recordMessageConverter) {
        Assert.notNull(recordMessageConverter, "'messageConverter' cannot be null");
        this.replyMessageConverter = recordMessageConverter;
    }

    public void setReplyPayloadType(Type type) {
        Assert.notNull(type, "'payloadType' cannot be null");
        this.replyPayloadType = type;
    }

    public void setProducerRecordCreator(ProducerRecordCreator<K, V> producerRecordCreator) {
        Assert.notNull(producerRecordCreator, "'producerRecordCreator' cannot be null");
        this.producerRecordCreator = producerRecordCreator;
    }

    public void setTimeoutBuffer(int i) {
        this.timeoutBuffer = i;
    }

    public void setUseTemplateConverter(boolean z) {
        this.useTemplateConverter = z;
    }

    public void setAssignmentDuration(Duration duration) {
        Assert.notNull(duration, "'assignmentDuration' cannot be null");
        this.assignmentDuration = duration;
    }

    public String getComponentType() {
        return this.isGateway ? "kafka:outbound-gateway" : "kafka:outbound-channel-adapter";
    }

    protected MessageChannel getSendFailureChannel() {
        if (this.sendFailureChannel != null) {
            return this.sendFailureChannel;
        }
        if (this.sendFailureChannelName == null) {
            return null;
        }
        this.sendFailureChannel = (MessageChannel) getChannelResolver().resolveDestination(this.sendFailureChannelName);
        return this.sendFailureChannel;
    }

    protected MessageChannel getSendSuccessChannel() {
        if (this.sendSuccessChannel != null) {
            return this.sendSuccessChannel;
        }
        if (this.sendSuccessChannelName == null) {
            return null;
        }
        this.sendSuccessChannel = (MessageChannel) getChannelResolver().resolveDestination(this.sendSuccessChannelName);
        return this.sendSuccessChannel;
    }

    protected MessageChannel getFuturesChannel() {
        if (this.futuresChannel != null) {
            return this.futuresChannel;
        }
        if (this.futuresChannelName == null) {
            return null;
        }
        this.futuresChannel = (MessageChannel) getChannelResolver().resolveDestination(this.futuresChannelName);
        return this.futuresChannel;
    }

    protected void doInit() {
        this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
    }

    public void start() {
        this.running.set(true);
    }

    public void stop() {
        if (this.running.compareAndSet(true, false)) {
            if (!this.transactional || this.allowNonTransactional) {
                this.kafkaTemplate.flush();
            }
        }
    }

    public boolean isRunning() {
        return this.running.get();
    }

    protected Object handleRequestMessage(Message<?> message) {
        ProducerRecord<K, V> createProducerRecord;
        CompletableFuture<SendResult<K, V>> send;
        boolean equals = Boolean.TRUE.equals(this.flushExpression.getValue(this.evaluationContext, message, Boolean.class));
        boolean z = message.getPayload() instanceof ProducerRecord;
        if (z) {
            createProducerRecord = (ProducerRecord) message.getPayload();
        } else {
            createProducerRecord = createProducerRecord(message);
            if (equals) {
                createProducerRecord.headers().remove(KafkaIntegrationHeaders.FLUSH);
            }
        }
        Object obj = message.getHeaders().get(KafkaIntegrationHeaders.FUTURE_TOKEN);
        if (obj != null) {
            createProducerRecord.headers().remove(KafkaIntegrationHeaders.FUTURE_TOKEN);
        }
        RequestReplyFuture<?, ?, Object> requestReplyFuture = null;
        if (this.isGateway && (!z || createProducerRecord.headers().lastHeader("kafka_replyTopic") == null)) {
            createProducerRecord.headers().add(new RecordHeader("kafka_replyTopic", getReplyTopic(message)));
            requestReplyFuture = this.kafkaTemplate.sendAndReceive(createProducerRecord);
            send = requestReplyFuture.getSendFuture();
        } else if (!this.transactional || this.kafkaTemplate.inTransaction() || this.allowNonTransactional) {
            send = this.kafkaTemplate.send(createProducerRecord);
        } else {
            ProducerRecord<K, V> producerRecord = createProducerRecord;
            send = (CompletableFuture) this.kafkaTemplate.executeInTransaction(kafkaOperations -> {
                return kafkaOperations.send(producerRecord);
            });
        }
        sendFutureIfRequested(send, obj);
        if (equals) {
            this.kafkaTemplate.flush();
        }
        try {
            processSendResult(message, createProducerRecord, send, getSendSuccessChannel());
            return processReplyFuture(requestReplyFuture);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new MessageHandlingException(message, e);
        } catch (ExecutionException e2) {
            throw new MessageHandlingException(message, e2.getCause());
        }
    }

    private void sendFutureIfRequested(CompletableFuture<SendResult<K, V>> completableFuture, Object obj) {
        MessageChannel futuresChannel;
        if (obj == null || (futuresChannel = getFuturesChannel()) == null) {
            return;
        }
        try {
            futuresChannel.send(getMessageBuilderFactory().withPayload(completableFuture).setHeader(KafkaIntegrationHeaders.FUTURE_TOKEN, obj).build());
        } catch (Exception e) {
            this.logger.error(e, "Failed to send sendFuture");
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private ProducerRecord<K, V> createProducerRecord(Message<?> message) {
        MessageHeaders headers = message.getHeaders();
        String str = this.topicExpression != null ? (String) this.topicExpression.getValue(this.evaluationContext, message, String.class) : (String) headers.get("kafka_topic", String.class);
        if (str == null) {
            str = this.kafkaTemplate.getDefaultTopic();
        }
        if (this.useTemplateConverter) {
            return this.kafkaTemplate.getMessageConverter().fromMessage(message, str);
        }
        Assert.state(StringUtils.hasText(str), "The 'topic' can not be empty or null");
        Integer num = this.partitionIdExpression != null ? (Integer) this.partitionIdExpression.getValue(this.evaluationContext, message, Integer.class) : (Integer) headers.get("kafka_partitionId", Integer.class);
        Object value = this.messageKeyExpression != null ? this.messageKeyExpression.getValue(this.evaluationContext, message) : headers.get("kafka_messageKey");
        Long l = this.timestampExpression != null ? (Long) this.timestampExpression.getValue(this.evaluationContext, message, Long.class) : (Long) headers.get("kafka_timestamp", Long.class);
        Object payload = message.getPayload();
        if (payload instanceof KafkaNull) {
            payload = null;
        }
        Headers headers2 = null;
        if (this.headerMapper != null) {
            headers2 = new RecordHeaders();
            this.headerMapper.fromHeaders(headers, headers2);
        }
        return this.producerRecordCreator.create(message, str, num, l, value, payload, headers2);
    }

    private byte[] getReplyTopic(Message<?> message) {
        if (this.replyTopicsAndPartitions.isEmpty()) {
            determineValidReplyTopicsAndPartitions();
        }
        Object obj = message.getHeaders().get("kafka_replyTopic");
        byte[] bArr = null;
        String str = null;
        if (obj instanceof String) {
            bArr = ((String) obj).getBytes(StandardCharsets.UTF_8);
            str = (String) obj;
        } else if (obj instanceof byte[]) {
            bArr = (byte[]) obj;
        } else if (obj != null) {
            throw new IllegalStateException("kafka_replyTopic must be String or byte[]");
        }
        if (bArr != null) {
            if (str == null) {
                str = new String(bArr, StandardCharsets.UTF_8);
            }
            if (!this.replyTopicsAndPartitions.containsKey(str)) {
                throw new IllegalStateException("The reply topic header [" + str + "] does not match any reply container topic: " + this.replyTopicsAndPartitions.keySet());
            }
        } else {
            if (this.replyTopicsAndPartitions.size() != 1) {
                throw new IllegalStateException("No reply topic header and no default reply topic can be determined; container's assigned partitions: " + this.replyTopicsAndPartitions);
            }
            bArr = getSingleReplyTopic();
        }
        Integer num = (Integer) message.getHeaders().get("kafka_replyPartition", Integer.class);
        if (num != null) {
            if (str == null) {
                str = new String(bArr, StandardCharsets.UTF_8);
            }
            if (!this.replyTopicsAndPartitions.get(str).contains(num)) {
                throw new IllegalStateException("The reply partition header [" + num + "] does not match any reply container partition for topic [" + str + "]: " + this.replyTopicsAndPartitions.get(str));
            }
        }
        return bArr;
    }

    private byte[] getSingleReplyTopic() {
        if (this.singleReplyTopic == null) {
            this.singleReplyTopic = this.replyTopicsAndPartitions.keySet().iterator().next().getBytes(StandardCharsets.UTF_8);
        }
        return this.singleReplyTopic;
    }

    private void determineValidReplyTopicsAndPartitions() {
        ReplyingKafkaTemplate replyingKafkaTemplate = this.kafkaTemplate;
        try {
            replyingKafkaTemplate.waitForAssignment(this.assignmentDuration);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        Collection assignedReplyTopicPartitions = replyingKafkaTemplate.getAssignedReplyTopicPartitions();
        HashMap hashMap = new HashMap();
        if (assignedReplyTopicPartitions != null) {
            assignedReplyTopicPartitions.forEach(topicPartition -> {
                hashMap.computeIfAbsent(topicPartition.topic(), str -> {
                    return new TreeSet();
                });
                ((Set) hashMap.get(topicPartition.topic())).add(Integer.valueOf(topicPartition.partition()));
            });
            this.replyTopicsAndPartitions.putAll(hashMap);
        }
    }

    public void processSendResult(Message<?> message, ProducerRecord<K, V> producerRecord, CompletableFuture<SendResult<K, V>> completableFuture, MessageChannel messageChannel) throws InterruptedException, ExecutionException {
        MessageChannel sendFailureChannel = getSendFailureChannel();
        if (sendFailureChannel != null || messageChannel != null) {
            completableFuture.whenComplete((sendResult, th) -> {
                if (th == null) {
                    if (messageChannel != null) {
                        this.messagingTemplate.send(messageChannel, getMessageBuilderFactory().fromMessage(message).setHeader("kafka_recordMetadata", sendResult.getRecordMetadata()).build());
                    }
                } else if (sendFailureChannel != null) {
                    this.messagingTemplate.send(sendFailureChannel, this.errorMessageStrategy.buildErrorMessage(new KafkaSendFailureException(message, producerRecord, th), (AttributeAccessor) null));
                }
            });
        }
        if (this.sync || this.isGateway) {
            Long l = (Long) this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class);
            if (l != null && l.longValue() <= this.deliveryTimeoutMsProperty) {
                this.logger.debug(() -> {
                    return "'sendTimeout' increased to " + (this.deliveryTimeoutMsProperty + this.timeoutBuffer) + "ms; it must be greater than the 'delivery.timeout.ms' Kafka producer property to avoid false failures";
                });
                l = Long.valueOf(this.deliveryTimeoutMsProperty + this.timeoutBuffer);
            }
            if (l == null || l.longValue() < 0) {
                completableFuture.get();
                return;
            }
            try {
                completableFuture.get(l.longValue(), TimeUnit.MILLISECONDS);
            } catch (TimeoutException e) {
                throw new MessageTimeoutException(message, "Timeout waiting for response from KafkaProducer", e);
            }
        }
    }

    private Future<?> processReplyFuture(@Nullable RequestReplyFuture<?, ?, Object> requestReplyFuture) {
        if (requestReplyFuture == null) {
            return null;
        }
        return new ConvertingReplyFuture(requestReplyFuture);
    }
}
