/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.amqp.rabbit;

import com.rabbitmq.client.Channel;
import java.time.Instant;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jspecify.annotations.Nullable;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpMessageReturnedException;
import org.springframework.amqp.core.AsyncAmqpTemplate;
import org.springframework.amqp.core.Correlation;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.RabbitConverterFuture;
import org.springframework.amqp.rabbit.RabbitFuture;
import org.springframework.amqp.rabbit.RabbitMessageFuture;
import org.springframework.amqp.rabbit.TimeoutTask;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.connection.PublisherCallbackChannel;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SmartMessageConverter;
import org.springframework.amqp.utils.JavaUtils;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.expression.Expression;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public class AsyncRabbitTemplate
implements AsyncAmqpTemplate,
ChannelAwareMessageListener,
RabbitTemplate.ReturnsCallback,
RabbitTemplate.ConfirmCallback,
BeanNameAware,
SmartLifecycle {
    public static final int DEFAULT_RECEIVE_TIMEOUT = 30000;
    private final Log logger = LogFactory.getLog(this.getClass());
    private final Lock lock = new ReentrantLock();
    private final RabbitTemplate template;
    private final @Nullable AbstractMessageListenerContainer container;
    private final @Nullable DirectReplyToMessageListenerContainer directReplyToContainer;
    private final @Nullable String replyAddress;
    private final ConcurrentMap<String, RabbitFuture<?>> pending = new ConcurrentHashMap();
    private final CorrelationMessagePostProcessor<?> messagePostProcessor = new CorrelationMessagePostProcessor();
    private volatile boolean running;
    private volatile boolean enableConfirms;
    private volatile long receiveTimeout = 30000L;
    private int phase;
    private boolean autoStartup = true;
    private String beanName;
    private TaskScheduler taskScheduler;
    private boolean internalTaskScheduler = true;

    public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey, String replyQueue) {
        this(connectionFactory, exchange, routingKey, replyQueue, null);
    }

    public AsyncRabbitTemplate(ConnectionFactory connectionFactory, @Nullable String exchange, String routingKey, String replyQueue, @Nullable String replyAddress) {
        Assert.notNull((Object)connectionFactory, (String)"'connectionFactory' cannot be null");
        Assert.notNull((Object)routingKey, (String)"'routingKey' cannot be null");
        Assert.notNull((Object)replyQueue, (String)"'replyQueue' cannot be null");
        this.template = new RabbitTemplate(connectionFactory);
        this.template.setExchange(exchange == null ? "" : exchange);
        this.template.setRoutingKey(routingKey);
        this.container = new SimpleMessageListenerContainer(connectionFactory);
        JavaUtils.INSTANCE.acceptIfNotNull(this.template.getAfterReceivePostProcessors(), value -> this.container.setAfterReceivePostProcessors(value.toArray(new MessagePostProcessor[0])));
        this.container.setQueueNames(replyQueue);
        this.container.setMessageListener(this);
        this.container.afterPropertiesSet();
        this.directReplyToContainer = null;
        this.replyAddress = Objects.requireNonNullElse(replyAddress, replyQueue);
    }

    public AsyncRabbitTemplate(RabbitTemplate template, AbstractMessageListenerContainer container) {
        this(template, container, null);
    }

    public AsyncRabbitTemplate(RabbitTemplate template, AbstractMessageListenerContainer container, @Nullable String replyAddress) {
        Assert.notNull((Object)template, (String)"'template' cannot be null");
        Assert.notNull((Object)container, (String)"'container' cannot be null");
        this.template = template;
        this.container = container;
        this.container.setMessageListener(this);
        this.directReplyToContainer = null;
        this.replyAddress = Objects.requireNonNullElseGet(replyAddress, () -> container.getQueueNames()[0]);
    }

    public AsyncRabbitTemplate(ConnectionFactory connectionFactory, @Nullable String exchange, String routingKey) {
        this(new RabbitTemplate(connectionFactory));
        Assert.notNull((Object)routingKey, (String)"'routingKey' cannot be null");
        this.template.setExchange(exchange == null ? "" : exchange);
        this.template.setRoutingKey(routingKey);
    }

    public AsyncRabbitTemplate(RabbitTemplate template) {
        Assert.notNull((Object)template, (String)"'template' cannot be null");
        this.template = template;
        this.container = null;
        this.replyAddress = null;
        this.directReplyToContainer = new DirectReplyToMessageListenerContainer(this.template.getConnectionFactory());
        JavaUtils.INSTANCE.acceptIfNotNull(template.getAfterReceivePostProcessors(), value -> this.directReplyToContainer.setAfterReceivePostProcessors(value.toArray(new MessagePostProcessor[0])));
        this.directReplyToContainer.setMessageListener(this);
    }

    public void setAutoStartup(boolean autoStartup) {
        this.autoStartup = autoStartup;
    }

    public void setPhase(int phase) {
        this.phase = phase;
    }

    public void setMandatory(boolean mandatory) {
        this.template.setReturnsCallback(this);
        this.template.setMandatory(mandatory);
    }

    public void setMandatoryExpression(Expression mandatoryExpression) {
        this.template.setReturnsCallback(this);
        this.template.setMandatoryExpression(mandatoryExpression);
    }

    public void setMandatoryExpressionString(String mandatoryExpression) {
        this.template.setReturnsCallback(this);
        this.template.setMandatoryExpressionString(mandatoryExpression);
    }

    public void setEnableConfirms(boolean enableConfirms) {
        this.enableConfirms = enableConfirms;
        if (enableConfirms) {
            this.template.setConfirmCallback(this);
        }
    }

    public String getBeanName() {
        return this.beanName;
    }

    public void setBeanName(String beanName) {
        this.beanName = beanName;
    }

    public ConnectionFactory getConnectionFactory() {
        return this.template.getConnectionFactory();
    }

    public void setReceiveTimeout(long receiveTimeout) {
        this.receiveTimeout = receiveTimeout;
    }

    public void setTaskScheduler(TaskScheduler taskScheduler) {
        Assert.notNull((Object)taskScheduler, (String)"'taskScheduler' cannot be null");
        this.lock.lock();
        try {
            this.internalTaskScheduler = false;
            this.taskScheduler = taskScheduler;
        }
        finally {
            this.lock.unlock();
        }
    }

    public MessageConverter getMessageConverter() {
        return this.template.getMessageConverter();
    }

    public RabbitTemplate getRabbitTemplate() {
        return this.template;
    }

    public RabbitMessageFuture sendAndReceive(Message message) {
        return this.sendAndReceive(this.template.getExchange(), this.template.getRoutingKey(), message);
    }

    public RabbitMessageFuture sendAndReceive(String routingKey, Message message) {
        return this.sendAndReceive(this.template.getExchange(), routingKey, message);
    }

    public RabbitMessageFuture sendAndReceive(String exchange, String routingKey, Message message) {
        String correlationId = this.getOrSetCorrelationIdAndSetReplyTo(message, null);
        RabbitMessageFuture future = new RabbitMessageFuture(correlationId, message, this::canceler, this::timeoutTask);
        CorrelationData correlationData = null;
        if (this.enableConfirms) {
            correlationData = new CorrelationData(correlationId);
            future.setConfirm(new CompletableFuture<Boolean>());
        }
        this.pending.put(correlationId, future);
        if (this.container != null) {
            this.template.send(exchange, routingKey, message, correlationData);
        } else {
            Assert.notNull((Object)this.directReplyToContainer, (String)"'directReplyToContainer' cannot be null");
            DirectReplyToMessageListenerContainer.ChannelHolder channelHolder = this.directReplyToContainer.getChannelHolder();
            future.setChannelHolder(channelHolder);
            this.sendDirect(channelHolder.getChannel(), exchange, routingKey, message, correlationData);
        }
        future.startTimer();
        return future;
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceive(Object object) {
        return this.convertSendAndReceive(this.template.getExchange(), this.template.getRoutingKey(), object, null);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceive(String routingKey, Object object) {
        return this.convertSendAndReceive(this.template.getExchange(), routingKey, object, null);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceive(String exchange, String routingKey, Object object) {
        return this.convertSendAndReceive(exchange, routingKey, object, null);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceive(Object object, MessagePostProcessor messagePostProcessor) {
        return this.convertSendAndReceive(this.template.getExchange(), this.template.getRoutingKey(), object, messagePostProcessor);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceive(String routingKey, Object object, MessagePostProcessor messagePostProcessor) {
        return this.convertSendAndReceive(this.template.getExchange(), routingKey, object, messagePostProcessor);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceive(String exchange, String routingKey, Object object, @Nullable MessagePostProcessor messagePostProcessor) {
        return this.convertSendAndReceive(exchange, routingKey, object, messagePostProcessor, null);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceiveAsType(Object object, ParameterizedTypeReference<C> responseType) {
        return this.convertSendAndReceiveAsType(this.template.getExchange(), this.template.getRoutingKey(), object, (MessagePostProcessor)null, (ParameterizedTypeReference)responseType);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceiveAsType(String routingKey, Object object, ParameterizedTypeReference<C> responseType) {
        return this.convertSendAndReceiveAsType(this.template.getExchange(), routingKey, object, (MessagePostProcessor)null, (ParameterizedTypeReference)responseType);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceiveAsType(String exchange, String routingKey, Object object, ParameterizedTypeReference<C> responseType) {
        return this.convertSendAndReceiveAsType(exchange, routingKey, object, (MessagePostProcessor)null, (ParameterizedTypeReference)responseType);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceiveAsType(Object object, MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<C> responseType) {
        return this.convertSendAndReceiveAsType(this.template.getExchange(), this.template.getRoutingKey(), object, messagePostProcessor, (ParameterizedTypeReference)responseType);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceiveAsType(String routingKey, Object object, @Nullable MessagePostProcessor messagePostProcessor, @Nullable ParameterizedTypeReference<C> responseType) {
        return this.convertSendAndReceiveAsType(this.template.getExchange(), routingKey, object, messagePostProcessor, (ParameterizedTypeReference)responseType);
    }

    public <C> RabbitConverterFuture<C> convertSendAndReceiveAsType(String exchange, String routingKey, Object object, @Nullable MessagePostProcessor messagePostProcessor, @Nullable ParameterizedTypeReference<C> responseType) {
        Assert.state((boolean)(this.template.getMessageConverter() instanceof SmartMessageConverter), (String)"template's message converter must be a SmartMessageConverter");
        return this.convertSendAndReceive(exchange, routingKey, object, messagePostProcessor, responseType);
    }

    private <C> RabbitConverterFuture<C> convertSendAndReceive(String exchange, String routingKey, Object object, @Nullable MessagePostProcessor messagePostProcessor, @Nullable ParameterizedTypeReference<C> responseType) {
        AsyncCorrelationData<C> correlationData = new AsyncCorrelationData<C>(messagePostProcessor, responseType, this.enableConfirms);
        if (this.container != null) {
            this.template.convertAndSend(exchange, routingKey, object, this.messagePostProcessor, correlationData);
        } else {
            MessageConverter converter = this.template.getMessageConverter();
            Message message = converter.toMessage(object, new MessageProperties());
            this.messagePostProcessor.postProcessMessage(message, correlationData, this.template.nullSafeExchange(exchange), this.template.nullSafeRoutingKey(routingKey));
            DirectReplyToMessageListenerContainer.ChannelHolder channelHolder = this.directReplyToContainer.getChannelHolder();
            correlationData.future.setChannelHolder(channelHolder);
            this.sendDirect(channelHolder.getChannel(), exchange, routingKey, message, correlationData);
        }
        RabbitConverterFuture future = correlationData.future;
        future.startTimer();
        return future;
    }

    private void sendDirect(Channel channel, String exchange, String routingKey, Message message, @Nullable CorrelationData correlationData) {
        message.getMessageProperties().setReplyTo("amq.rabbitmq.reply-to");
        try {
            if (channel instanceof PublisherCallbackChannel) {
                this.template.addListener(channel);
            }
            this.template.doSend(channel, exchange, routingKey, message, this.template.isMandatoryFor(message), correlationData);
        }
        catch (Exception e) {
            throw new AmqpException("Failed to send request", (Throwable)e);
        }
    }

    public void start() {
        this.lock.lock();
        try {
            if (!this.running) {
                if (this.internalTaskScheduler) {
                    ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
                    scheduler.setThreadNamePrefix(this.getBeanName() + "-");
                    scheduler.afterPropertiesSet();
                    this.taskScheduler = scheduler;
                }
                if (this.container != null) {
                    this.container.start();
                }
                if (this.directReplyToContainer != null) {
                    this.directReplyToContainer.setTaskScheduler(this.taskScheduler);
                    this.directReplyToContainer.start();
                }
            }
            this.running = true;
        }
        finally {
            this.lock.unlock();
        }
    }

    public void stop() {
        this.lock.lock();
        try {
            if (this.running) {
                if (this.container != null) {
                    this.container.stop();
                }
                if (this.directReplyToContainer != null) {
                    this.directReplyToContainer.stop();
                }
                for (RabbitFuture future : this.pending.values()) {
                    future.setNackCause("AsyncRabbitTemplate was stopped while waiting for reply");
                    future.cancel(true);
                }
                if (this.internalTaskScheduler) {
                    ((ThreadPoolTaskScheduler)this.taskScheduler).destroy();
                    this.taskScheduler = null;
                }
            }
            this.running = false;
        }
        finally {
            this.lock.unlock();
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    public int getPhase() {
        return this.phase;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    @Override
    public void onMessage(Message message, @Nullable Channel channel) {
        MessageProperties messageProperties = message.getMessageProperties();
        String correlationId = messageProperties.getCorrelationId();
        if (StringUtils.hasText((String)correlationId)) {
            RabbitFuture future;
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("onMessage: " + String.valueOf(message)));
            }
            if ((future = (RabbitFuture)this.pending.remove(correlationId)) != null) {
                if (future instanceof RabbitConverterFuture) {
                    MessageConverter messageConverter = this.template.getMessageConverter();
                    RabbitConverterFuture rabbitFuture = (RabbitConverterFuture)future;
                    try {
                        Object object;
                        if (rabbitFuture.getReturnType() != null && messageConverter instanceof SmartMessageConverter) {
                            SmartMessageConverter smart = (SmartMessageConverter)messageConverter;
                            object = smart.fromMessage(message, rabbitFuture.getReturnType());
                        } else {
                            object = messageConverter.fromMessage(message);
                        }
                        Object converted = object;
                        rabbitFuture.complete(converted);
                    }
                    catch (MessageConversionException e) {
                        rabbitFuture.completeExceptionally(e);
                    }
                } else {
                    ((RabbitMessageFuture)future).complete(message);
                }
            } else if (this.logger.isWarnEnabled()) {
                this.logger.warn((Object)("No pending reply - perhaps timed out: " + String.valueOf(message)));
            }
        }
    }

    @Override
    public void returnedMessage(ReturnedMessage returned) {
        MessageProperties messageProperties = returned.getMessage().getMessageProperties();
        String correlationId = messageProperties.getCorrelationId();
        if (StringUtils.hasText((String)correlationId)) {
            RabbitFuture future = (RabbitFuture)this.pending.remove(correlationId);
            if (future != null) {
                future.completeExceptionally((Throwable)new AmqpMessageReturnedException("Message returned", returned));
            } else if (this.logger.isWarnEnabled()) {
                this.logger.warn((Object)("No pending reply - perhaps timed out? Message returned: " + String.valueOf(returned.getMessage())));
            }
        }
    }

    @Override
    public void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Confirm: " + String.valueOf(correlationData) + ", ack=" + ack + (String)(cause == null ? "" : ", cause: " + cause)));
        }
        Assert.notNull((Object)correlationData, (String)"'correlationData' must not be null");
        String correlationId = correlationData.getId();
        RabbitFuture future = (RabbitFuture)this.pending.get(correlationId);
        if (future != null) {
            future.setNackCause(cause);
            future.getConfirm().complete(ack);
        } else if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Confirm: " + String.valueOf(correlationData) + ", ack=" + ack + (String)(cause == null ? "" : ", cause: " + cause) + " no pending future - either canceled or the reply is already received"));
        }
    }

    private String getOrSetCorrelationIdAndSetReplyTo(Message message, @Nullable AsyncCorrelationData<?> correlationData) {
        MessageProperties messageProperties = message.getMessageProperties();
        Assert.notNull((Object)messageProperties, (String)"the message properties cannot be null");
        String correlationId = messageProperties.getCorrelationId();
        if (!StringUtils.hasText((String)correlationId)) {
            correlationId = correlationData != null ? correlationData.getId() : UUID.randomUUID().toString();
            messageProperties.setCorrelationId(correlationId);
            Assert.isNull((Object)messageProperties.getReplyTo(), (String)"'replyTo' property must be null");
        }
        messageProperties.setReplyTo(this.replyAddress);
        return correlationId;
    }

    private void canceler(String correlationId, @Nullable DirectReplyToMessageListenerContainer.ChannelHolder channelHolder) {
        this.pending.remove(correlationId);
        if (channelHolder != null && this.directReplyToContainer != null) {
            this.directReplyToContainer.releaseConsumerFor(channelHolder, false, null);
        }
    }

    private @Nullable ScheduledFuture<?> timeoutTask(RabbitFuture<?> future) {
        if (this.receiveTimeout > 0L) {
            this.lock.lock();
            try {
                if (!this.running) {
                    this.pending.remove(future.getCorrelationId());
                    throw new IllegalStateException("'AsyncRabbitTemplate' must be started.");
                }
                ScheduledFuture scheduledFuture = this.taskScheduler.schedule((Runnable)new TimeoutTask(future, this.pending, this.directReplyToContainer), Instant.now().plusMillis(this.receiveTimeout));
                return scheduledFuture;
            }
            finally {
                this.lock.unlock();
            }
        }
        return null;
    }

    public String toString() {
        return this.getClass().getSimpleName() + ": " + this.beanName;
    }

    private final class CorrelationMessagePostProcessor<C>
    implements MessagePostProcessor {
        CorrelationMessagePostProcessor() {
        }

        public Message postProcessMessage(Message message) throws AmqpException {
            throw new UnsupportedOperationException();
        }

        public Message postProcessMessage(Message message, @Nullable Correlation correlation) throws AmqpException {
            Message messageToSend = message;
            AsyncCorrelationData correlationData = (AsyncCorrelationData)correlation;
            Assert.notNull((Object)correlationData, (String)"correlationData cannot be null");
            if (correlationData.userPostProcessor != null) {
                messageToSend = correlationData.userPostProcessor.postProcessMessage(message);
            }
            String correlationId = AsyncRabbitTemplate.this.getOrSetCorrelationIdAndSetReplyTo(messageToSend, correlationData);
            correlationData.future = new RabbitConverterFuture(correlationId, message, AsyncRabbitTemplate.this::canceler, AsyncRabbitTemplate.this::timeoutTask);
            if (correlationData.enableConfirms) {
                correlationData.setId(correlationId);
                correlationData.future.setConfirm(new CompletableFuture<Boolean>());
            }
            correlationData.future.setReturnType(correlationData.returnType);
            AsyncRabbitTemplate.this.pending.put(correlationId, correlationData.future);
            return messageToSend;
        }
    }

    private static class AsyncCorrelationData<C>
    extends CorrelationData {
        final @Nullable MessagePostProcessor userPostProcessor;
        final @Nullable ParameterizedTypeReference<C> returnType;
        final boolean enableConfirms;
        volatile RabbitConverterFuture<C> future;

        AsyncCorrelationData(@Nullable MessagePostProcessor userPostProcessor, @Nullable ParameterizedTypeReference<C> returnType, boolean enableConfirms) {
            this.userPostProcessor = userPostProcessor;
            this.returnType = returnType;
            this.enableConfirms = enableConfirms;
        }
    }
}

