/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.outbound;

import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.jspecify.annotations.Nullable;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.expression.FunctionExpression;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.kafka.support.KafkaSendFailureException;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.management.ManageableLifecycle;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonPresent;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.SimpleKafkaHeaderMapper;
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public class KafkaProducerMessageHandler<K, V>
extends AbstractReplyProducingMessageHandler
implements ManageableLifecycle {
    private static final int DEFAULT_TIMEOUT_BUFFER = 5000;
    private static final int TWENTY = 20;
    private static final Duration DEFAULT_ASSIGNMENT_TIMEOUT = Duration.ofSeconds(20L);
    private final Map<String, Set<Integer>> replyTopicsAndPartitions = new HashMap<String, Set<Integer>>();
    private final KafkaTemplate<K, V> kafkaTemplate;
    private final boolean isGateway;
    private final boolean transactional;
    private final boolean allowNonTransactional;
    private final AtomicBoolean running = new AtomicBoolean();
    private final long deliveryTimeoutMsProperty;
    private EvaluationContext evaluationContext;
    private @Nullable Expression topicExpression;
    private @Nullable Expression messageKeyExpression;
    private @Nullable Expression partitionIdExpression;
    private @Nullable Expression timestampExpression;
    private Expression flushExpression = new FunctionExpression(message -> Boolean.TRUE.equals(message.getHeaders().get((Object)"kafka_flush")));
    private boolean sync;
    private Expression sendTimeoutExpression;
    private @Nullable KafkaHeaderMapper headerMapper;
    private RecordMessageConverter replyMessageConverter = new MessagingMessageConverter();
    private @Nullable MessageChannel sendFailureChannel;
    private @Nullable String sendFailureChannelName;
    private @Nullable MessageChannel sendSuccessChannel;
    private @Nullable String sendSuccessChannelName;
    private @Nullable MessageChannel futuresChannel;
    private @Nullable String futuresChannelName;
    private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
    private Type replyPayloadType = Object.class;
    private ProducerRecordCreator<K, V> producerRecordCreator = (message, topic, partition, timestamp, key, value, headers) -> new ProducerRecord(topic, partition, timestamp, key, value, (Iterable)headers);
    private int timeoutBuffer = 5000;
    private boolean useTemplateConverter;
    private Duration assignmentDuration = DEFAULT_ASSIGNMENT_TIMEOUT;

    public KafkaProducerMessageHandler(KafkaTemplate<K, V> kafkaTemplate) {
        Assert.notNull(kafkaTemplate, (String)"kafkaTemplate cannot be null");
        this.kafkaTemplate = kafkaTemplate;
        this.isGateway = kafkaTemplate instanceof ReplyingKafkaTemplate;
        if (this.isGateway) {
            this.setAsync(true);
            this.updateNotPropagatedHeaders(new String[]{"kafka_topic", "kafka_partitionId", "kafka_messageKey"}, false);
        }
        this.headerMapper = JacksonPresent.isJackson2Present() ? new DefaultKafkaHeaderMapper() : new SimpleKafkaHeaderMapper();
        this.transactional = kafkaTemplate.isTransactional();
        this.allowNonTransactional = kafkaTemplate.isAllowNonTransactional();
        if (this.transactional && this.isGateway) {
            this.logger.warn((CharSequence)"The KafkaTemplate is transactional; this gateway will only work if the consumer is configured to read uncommitted records");
        }
        this.determineSendTimeout();
        Long sendTimeout = (Long)this.sendTimeoutExpression.getValue(Long.class);
        Assert.state((sendTimeout != null ? 1 : 0) != 0, (String)"'sendTimeout' must not be null");
        this.deliveryTimeoutMsProperty = sendTimeout - (long)this.timeoutBuffer;
    }

    private void determineSendTimeout() {
        Map props = this.kafkaTemplate.getProducerFactory().getConfigurationProperties();
        Object dt = props.get("delivery.timeout.ms");
        if (dt == null) {
            dt = ProducerConfig.configDef().defaultValues().get("delivery.timeout.ms");
        }
        if (dt instanceof Long) {
            this.setSendTimeout((Long)dt + (long)this.timeoutBuffer);
        } else if (dt instanceof Integer) {
            this.setSendTimeout(Long.valueOf(((Integer)dt).intValue()) + (long)this.timeoutBuffer);
        } else if (dt instanceof String) {
            this.setSendTimeout(Long.parseLong((String)dt) + (long)this.timeoutBuffer);
        }
    }

    public void setTopicExpression(Expression topicExpression) {
        this.topicExpression = topicExpression;
    }

    public void setMessageKeyExpression(Expression messageKeyExpression) {
        this.messageKeyExpression = messageKeyExpression;
    }

    public void setPartitionIdExpression(Expression partitionIdExpression) {
        this.partitionIdExpression = partitionIdExpression;
    }

    public void setTimestampExpression(Expression timestampExpression) {
        this.timestampExpression = timestampExpression;
    }

    public void setFlushExpression(Expression flushExpression) {
        Assert.notNull((Object)flushExpression, (String)"'flushExpression' cannot be null");
        this.flushExpression = flushExpression;
    }

    public void setHeaderMapper(KafkaHeaderMapper headerMapper) {
        this.headerMapper = headerMapper;
    }

    public @Nullable KafkaHeaderMapper getHeaderMapper() {
        return this.headerMapper;
    }

    public KafkaTemplate<?, ?> getKafkaTemplate() {
        return this.kafkaTemplate;
    }

    public void setSync(boolean sync) {
        this.sync = sync;
    }

    public final void setSendTimeout(long sendTimeout) {
        super.setSendTimeout(sendTimeout);
        this.setSendTimeoutExpression((Expression)new ValueExpression((Object)sendTimeout));
    }

    public void setSendTimeoutExpression(Expression sendTimeoutExpression) {
        Assert.notNull((Object)sendTimeoutExpression, (String)"'sendTimeoutExpression' must not be null");
        this.sendTimeoutExpression = sendTimeoutExpression;
    }

    public void setSendFailureChannel(MessageChannel sendFailureChannel) {
        this.sendFailureChannel = sendFailureChannel;
    }

    public void setSendFailureChannelName(String sendFailureChannelName) {
        this.sendFailureChannelName = sendFailureChannelName;
    }

    public void setSendSuccessChannel(MessageChannel sendSuccessChannel) {
        this.sendSuccessChannel = sendSuccessChannel;
    }

    public void setSendSuccessChannelName(String sendSuccessChannelName) {
        this.sendSuccessChannelName = sendSuccessChannelName;
    }

    public void setFuturesChannel(MessageChannel futuresChannel) {
        this.futuresChannel = futuresChannel;
    }

    public void setFuturesChannelName(String futuresChannelName) {
        this.futuresChannelName = futuresChannelName;
    }

    public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
        Assert.notNull((Object)errorMessageStrategy, (String)"'errorMessageStrategy' cannot be null");
        this.errorMessageStrategy = errorMessageStrategy;
    }

    public void setReplyMessageConverter(RecordMessageConverter messageConverter) {
        Assert.notNull((Object)messageConverter, (String)"'messageConverter' cannot be null");
        this.replyMessageConverter = messageConverter;
    }

    public void setReplyPayloadType(Type payloadType) {
        Assert.notNull((Object)payloadType, (String)"'payloadType' cannot be null");
        this.replyPayloadType = payloadType;
    }

    public void setProducerRecordCreator(ProducerRecordCreator<K, V> producerRecordCreator) {
        Assert.notNull(producerRecordCreator, (String)"'producerRecordCreator' cannot be null");
        this.producerRecordCreator = producerRecordCreator;
    }

    public void setTimeoutBuffer(int timeoutBuffer) {
        this.timeoutBuffer = timeoutBuffer;
    }

    public void setUseTemplateConverter(boolean useTemplateConverter) {
        this.useTemplateConverter = useTemplateConverter;
    }

    public void setAssignmentDuration(Duration assignmentDuration) {
        Assert.notNull((Object)assignmentDuration, (String)"'assignmentDuration' cannot be null");
        this.assignmentDuration = assignmentDuration;
    }

    public String getComponentType() {
        return this.isGateway ? "kafka:outbound-gateway" : "kafka:outbound-channel-adapter";
    }

    protected @Nullable MessageChannel getSendFailureChannel() {
        if (!(this.sendFailureChannel != null || this.sendFailureChannelName == null && this.sync)) {
            String sendFailureChannelNameToUse = this.sendFailureChannelName;
            if (sendFailureChannelNameToUse == null) {
                sendFailureChannelNameToUse = "errorChannel";
            }
            this.sendFailureChannel = (MessageChannel)this.getChannelResolver().resolveDestination(sendFailureChannelNameToUse);
        }
        return this.sendFailureChannel;
    }

    protected @Nullable MessageChannel getSendSuccessChannel() {
        if (this.sendSuccessChannel != null) {
            return this.sendSuccessChannel;
        }
        if (this.sendSuccessChannelName != null) {
            this.sendSuccessChannel = (MessageChannel)this.getChannelResolver().resolveDestination(this.sendSuccessChannelName);
            return this.sendSuccessChannel;
        }
        return null;
    }

    protected @Nullable MessageChannel getFuturesChannel() {
        if (this.futuresChannel != null) {
            return this.futuresChannel;
        }
        if (this.futuresChannelName != null) {
            this.futuresChannel = (MessageChannel)this.getChannelResolver().resolveDestination(this.futuresChannelName);
            return this.futuresChannel;
        }
        return null;
    }

    protected void doInit() {
        this.evaluationContext = ExpressionUtils.createStandardEvaluationContext((BeanFactory)this.getBeanFactory());
    }

    public void start() {
        this.running.set(true);
    }

    public void stop() {
        if (this.running.compareAndSet(true, false) && (!this.transactional || this.allowNonTransactional)) {
            this.kafkaTemplate.flush();
        }
    }

    public boolean isRunning() {
        return this.running.get();
    }

    protected @Nullable Object handleRequestMessage(Message<?> message) {
        CompletableFuture sendFuture;
        ProducerRecord producerRecord;
        boolean flush = Boolean.TRUE.equals(this.flushExpression.getValue(this.evaluationContext, message, Boolean.class));
        if (message.getPayload() instanceof ProducerRecord) {
            producerRecord = (ProducerRecord)message.getPayload();
        } else {
            producerRecord = this.createProducerRecord(message);
            if (flush) {
                producerRecord.headers().remove("kafka_flush");
            }
        }
        Object futureToken = message.getHeaders().get((Object)"kafka_futureToken");
        if (futureToken != null) {
            producerRecord.headers().remove("kafka_futureToken");
        }
        RequestReplyFuture gatewayFuture = null;
        try {
            if (this.isGateway) {
                this.waitForAssignment();
                this.addReplyTopicIfAny(message.getHeaders(), producerRecord.headers());
                gatewayFuture = ((ReplyingKafkaTemplate)this.kafkaTemplate).sendAndReceive(producerRecord);
                sendFuture = gatewayFuture.getSendFuture();
            } else {
                sendFuture = this.transactional && !this.kafkaTemplate.inTransaction() && !this.allowNonTransactional ? (CompletableFuture)this.kafkaTemplate.executeInTransaction(template -> template.send(producerRecord)) : this.kafkaTemplate.send(producerRecord);
            }
        }
        catch (RuntimeException rtex) {
            this.sendFailure(message, producerRecord, this.getSendFailureChannel(), rtex);
            throw rtex;
        }
        this.sendFutureIfRequested(sendFuture, futureToken);
        if (flush) {
            this.kafkaTemplate.flush();
        }
        try {
            this.processSendResult(message, producerRecord, sendFuture, this.getSendSuccessChannel());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new MessageHandlingException(message, (Throwable)e);
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause() != null ? e.getCause() : e;
            throw new MessageHandlingException(message, cause);
        }
        return this.processReplyFuture(gatewayFuture);
    }

    private ProducerRecord<K, V> createProducerRecord(Message<?> message) {
        String topic;
        MessageHeaders messageHeaders = message.getHeaders();
        String string = topic = this.topicExpression != null ? (String)this.topicExpression.getValue(this.evaluationContext, message, String.class) : (String)messageHeaders.get((Object)"kafka_topic", String.class);
        if (topic == null) {
            topic = this.kafkaTemplate.getDefaultTopic();
        }
        if (this.useTemplateConverter) {
            return this.kafkaTemplate.getMessageConverter().fromMessage(message, topic);
        }
        Assert.state((boolean)StringUtils.hasText((String)topic), (String)"The 'topic' can not be empty or null");
        Integer partitionId = this.partitionIdExpression != null ? (Integer)this.partitionIdExpression.getValue(this.evaluationContext, message, Integer.class) : (Integer)messageHeaders.get((Object)"kafka_partitionId", Integer.class);
        Object messageKey = this.messageKeyExpression != null ? this.messageKeyExpression.getValue(this.evaluationContext, message) : messageHeaders.get((Object)"kafka_messageKey");
        Long timestamp = this.timestampExpression != null ? (Long)this.timestampExpression.getValue(this.evaluationContext, message, Long.class) : (Long)messageHeaders.get((Object)"kafka_timestamp", Long.class);
        Object payload = message.getPayload();
        if (payload instanceof KafkaNull) {
            payload = null;
        }
        RecordHeaders headers = new RecordHeaders();
        if (this.headerMapper != null) {
            this.headerMapper.fromHeaders(messageHeaders, (Headers)headers);
        }
        return this.producerRecordCreator.create(message, topic, partitionId, timestamp, messageKey, payload, (Headers)headers);
    }

    private void waitForAssignment() {
        ReplyingKafkaTemplate rkt = (ReplyingKafkaTemplate)this.kafkaTemplate;
        try {
            rkt.waitForAssignment(this.assignmentDuration);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void addReplyTopicIfAny(MessageHeaders messageHeaders, Headers headers) {
        if (this.isGateway) {
            Object replyHeader = messageHeaders.get((Object)"kafka_replyTopic");
            if (replyHeader instanceof String) {
                String topicString = (String)replyHeader;
                headers.add("kafka_replyTopic", topicString.getBytes(StandardCharsets.UTF_8));
            } else if (replyHeader instanceof byte[]) {
                byte[] topicBytes = (byte[])replyHeader;
                headers.add("kafka_replyTopic", topicBytes);
            } else if (replyHeader != null) {
                throw new IllegalStateException("kafka_replyTopic must be String or byte[]");
            }
        }
    }

    private void sendFutureIfRequested(@Nullable CompletableFuture<SendResult<K, V>> sendFuture, @Nullable Object futureToken) {
        MessageChannel futures;
        if (sendFuture != null && futureToken != null && (futures = this.getFuturesChannel()) != null) {
            try {
                futures.send(this.getMessageBuilderFactory().withPayload(sendFuture).setHeader("kafka_futureToken", futureToken).build());
            }
            catch (Exception e) {
                this.logger.error((Throwable)e, (CharSequence)"Failed to send sendFuture");
            }
        }
    }

    public void processSendResult(Message<?> message, ProducerRecord<K, V> producerRecord, @Nullable CompletableFuture<SendResult<K, V>> future, @Nullable MessageChannel metadataChannel) throws InterruptedException, ExecutionException {
        if (future == null) {
            this.logger.warn((CharSequence)"send future is null, skip processing send result.");
            return;
        }
        MessageChannel failureChannel = this.getSendFailureChannel();
        if (failureChannel != null || metadataChannel != null) {
            future.whenComplete((sendResult, exception) -> {
                if (exception == null) {
                    if (metadataChannel != null) {
                        this.messagingTemplate.send((Object)metadataChannel, this.getMessageBuilderFactory().fromMessage(message).setHeader("kafka_recordMetadata", (Object)sendResult.getRecordMetadata()).build());
                    }
                } else {
                    this.sendFailure(message, producerRecord, failureChannel, (Throwable)exception);
                }
            });
        }
        if (this.sync || this.isGateway) {
            Long sendTimeout = (Long)this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class);
            if (sendTimeout != null && sendTimeout <= this.deliveryTimeoutMsProperty) {
                this.logger.debug(() -> "'sendTimeout' increased to " + (this.deliveryTimeoutMsProperty + (long)this.timeoutBuffer) + "ms; it must be greater than the 'delivery.timeout.ms' Kafka producer property to avoid false failures");
                sendTimeout = this.deliveryTimeoutMsProperty + (long)this.timeoutBuffer;
            }
            if (sendTimeout == null || sendTimeout < 0L) {
                future.get();
            } else {
                try {
                    future.get(sendTimeout, TimeUnit.MILLISECONDS);
                }
                catch (TimeoutException te) {
                    throw new MessageTimeoutException(message, "Timeout waiting for response from KafkaProducer", (Throwable)te);
                }
            }
        }
    }

    private void sendFailure(Message<?> message, ProducerRecord<K, V> producerRecord, @Nullable MessageChannel failureChannel, Throwable exception) {
        if (failureChannel != null) {
            this.messagingTemplate.send((Object)failureChannel, (Message)this.errorMessageStrategy.buildErrorMessage((Throwable)((Object)new KafkaSendFailureException(message, producerRecord, exception)), null));
        }
    }

    private @Nullable Future<?> processReplyFuture(@Nullable RequestReplyFuture<?, ?, Object> future) {
        if (future == null) {
            return null;
        }
        return new ConvertingReplyFuture(future);
    }

    @FunctionalInterface
    public static interface ProducerRecordCreator<K, V> {
        public ProducerRecord<K, V> create(Message<?> var1, @Nullable String var2, @Nullable Integer var3, @Nullable Long var4, @Nullable K var5, @Nullable V var6, Headers var7);
    }

    private final class ConvertingReplyFuture
    extends CompletableFuture<Object> {
        ConvertingReplyFuture(RequestReplyFuture<?, ?, Object> future) {
            this.addCallback(future);
        }

        private void addCallback(RequestReplyFuture<?, ?, Object> future) {
            future.whenComplete((result, exception) -> {
                if (exception == null) {
                    try {
                        this.complete(this.dontLeakHeaders(KafkaProducerMessageHandler.this.replyMessageConverter.toMessage(result, null, null, KafkaProducerMessageHandler.this.replyPayloadType)));
                    }
                    catch (Exception ex) {
                        this.completeExceptionally(ex);
                    }
                } else {
                    this.completeExceptionally((Throwable)exception);
                }
            });
        }

        private Message<?> dontLeakHeaders(Message<?> message) {
            if (message.getHeaders() instanceof KafkaMessageHeaders) {
                Map headers = ((KafkaMessageHeaders)message.getHeaders()).getRawHeaders();
                headers.remove("kafka_correlationId");
                headers.remove("kafka_replyTopic");
                headers.remove("kafka_replyPartition");
                return message;
            }
            return KafkaProducerMessageHandler.this.getMessageBuilderFactory().fromMessage(message).removeHeader("kafka_correlationId").removeHeader("kafka_replyTopic").removeHeader("kafka_replyPartition").build();
        }
    }
}

