/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.amqp.rabbitmq.client;

import com.rabbitmq.client.amqp.Consumer;
import com.rabbitmq.client.amqp.Message;
import com.rabbitmq.client.amqp.Publisher;
import com.rabbitmq.client.amqp.Requester;
import com.rabbitmq.client.amqp.Resource;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.jspecify.annotations.Nullable;
import org.springframework.amqp.AmqpIllegalStateException;
import org.springframework.amqp.core.AsyncAmqpTemplate;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.ReceiveAndReplyCallback;
import org.springframework.amqp.core.ReceiveAndReplyMessageCallback;
import org.springframework.amqp.rabbit.core.AmqpNackReceivedException;
import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory;
import org.springframework.amqp.rabbitmq.client.RabbitAmqpAdmin;
import org.springframework.amqp.rabbitmq.client.RabbitAmqpUtils;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.amqp.support.converter.SmartMessageConverter;
import org.springframework.amqp.utils.JavaUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.log.LogAccessor;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public class RabbitAmqpTemplate
implements AsyncAmqpTemplate,
DisposableBean {
    private static final LogAccessor LOG = new LogAccessor(RabbitAmqpAdmin.class);
    private final AmqpConnectionFactory connectionFactory;
    private final Lock instanceLock = new ReentrantLock();
    private @Nullable Publisher publisher;
    private MessageConverter messageConverter = new SimpleMessageConverter();
    private @Nullable String defaultExchange;
    private @Nullable String defaultRoutingKey;
    private @Nullable String defaultQueue;
    private @Nullable String defaultReceiveQueue;
    private @Nullable String defaultReplyToQueue;
    private // Could not load outer class - annotation placement on inner may be incorrect
    Resource.StateListener @Nullable [] stateListeners;
    private Duration publishTimeout = Duration.ofSeconds(60L);
    private Duration completionTimeout = Duration.ofSeconds(60L);

    public RabbitAmqpTemplate(AmqpConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public void setListeners(Resource.StateListener ... listeners) {
        this.stateListeners = listeners;
    }

    public void setPublishTimeout(Duration timeout) {
        this.publishTimeout = timeout;
    }

    public void setCompletionTimeout(Duration completionTimeout) {
        this.completionTimeout = completionTimeout;
    }

    public void setExchange(String exchange) {
        this.defaultExchange = exchange;
    }

    public void setRoutingKey(String routingKey) {
        this.defaultRoutingKey = routingKey;
    }

    public void setQueue(String queue) {
        this.defaultQueue = queue;
    }

    public void setReceiveQueue(String queue) {
        this.defaultReceiveQueue = queue;
    }

    public void setReplyToQueue(String queue) {
        this.defaultReplyToQueue = queue;
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    private String getRequiredQueue() throws IllegalStateException {
        String name = this.defaultReceiveQueue;
        Assert.state((name != null ? 1 : 0) != 0, (String)"No 'queue' specified. Check configuration of this 'RabbitAmqpTemplate'.");
        return name;
    }

    public Publisher getPublisher() {
        Publisher publisherToReturn = this.publisher;
        if (publisherToReturn == null) {
            this.instanceLock.lock();
            try {
                publisherToReturn = this.publisher;
                if (publisherToReturn == null) {
                    this.publisher = publisherToReturn = this.connectionFactory.getConnection().publisherBuilder().listeners(this.stateListeners).publishTimeout(this.publishTimeout).build();
                }
            }
            finally {
                this.instanceLock.unlock();
            }
        }
        return publisherToReturn;
    }

    public void destroy() {
        Publisher publisherToClose = this.publisher;
        if (publisherToClose != null) {
            publisherToClose.close();
            this.publisher = null;
        }
    }

    public CompletableFuture<Boolean> send(org.springframework.amqp.core.Message message) {
        Assert.state((this.defaultExchange != null || this.defaultQueue != null ? 1 : 0) != 0, (String)"For send with defaults, an 'exchange' (and optional 'routingKey') or 'queue' must be provided");
        return this.doSend(this.defaultExchange, this.defaultRoutingKey, this.defaultQueue, message);
    }

    public CompletableFuture<Boolean> send(String queue, org.springframework.amqp.core.Message message) {
        return this.doSend(null, null, queue, message);
    }

    public CompletableFuture<Boolean> send(String exchange, @Nullable String routingKey, org.springframework.amqp.core.Message message) {
        return this.doSend(exchange, routingKey != null ? routingKey : this.defaultRoutingKey, null, message);
    }

    private CompletableFuture<Boolean> doSend(@Nullable String exchange, @Nullable String routingKey, @Nullable String queue, org.springframework.amqp.core.Message message) {
        Message amqpMessage = RabbitAmqpTemplate.toAmqpMessage(exchange, routingKey, queue, message, () -> ((Publisher)this.getPublisher()).message());
        CompletableFuture<Boolean> publishResult = new CompletableFuture<Boolean>();
        this.getPublisher().publish(amqpMessage, context -> {
            switch (context.status()) {
                case ACCEPTED: {
                    publishResult.complete(true);
                    break;
                }
                case REJECTED: 
                case RELEASED: {
                    publishResult.completeExceptionally((Throwable)new AmqpNackReceivedException("The message was rejected", message));
                }
            }
        });
        return publishResult;
    }

    public CompletableFuture<Boolean> convertAndSend(Object message) {
        Assert.state((this.defaultExchange != null || this.defaultQueue != null ? 1 : 0) != 0, (String)"For send with defaults, an 'exchange' (and optional 'routingKey') or 'queue' must be provided");
        return this.doConvertAndSend(this.defaultExchange, this.defaultRoutingKey, this.defaultQueue, message, null);
    }

    public CompletableFuture<Boolean> convertAndSend(String queue, Object message) {
        return this.doConvertAndSend(null, null, queue, message, null);
    }

    public CompletableFuture<Boolean> convertAndSend(String exchange, @Nullable String routingKey, Object message) {
        return this.doConvertAndSend(exchange, routingKey != null ? routingKey : this.defaultRoutingKey, null, message, null);
    }

    public CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor messagePostProcessor) {
        return this.doConvertAndSend(null, null, null, message, messagePostProcessor);
    }

    public CompletableFuture<Boolean> convertAndSend(String queue, Object message, @Nullable MessagePostProcessor messagePostProcessor) {
        return this.doConvertAndSend(null, null, queue, message, messagePostProcessor);
    }

    public CompletableFuture<Boolean> convertAndSend(String exchange, @Nullable String routingKey, Object message, @Nullable MessagePostProcessor messagePostProcessor) {
        return this.doConvertAndSend(exchange, routingKey, null, message, messagePostProcessor);
    }

    private CompletableFuture<Boolean> doConvertAndSend(@Nullable String exchange, @Nullable String routingKey, @Nullable String queue, Object data, @Nullable MessagePostProcessor messagePostProcessor) {
        org.springframework.amqp.core.Message message = this.convertToMessageIfNecessary(data);
        if (messagePostProcessor != null) {
            message = messagePostProcessor.postProcessMessage(message);
        }
        return this.doSend(exchange, routingKey, queue, message);
    }

    public CompletableFuture<org.springframework.amqp.core.Message> receive() {
        return this.receive(this.getRequiredQueue());
    }

    public CompletableFuture<org.springframework.amqp.core.Message> receive(String queueName) {
        CompletableFuture messageFuture = new CompletableFuture();
        AtomicBoolean messageReceived = new AtomicBoolean();
        Consumer consumer = this.connectionFactory.getConnection().consumerBuilder().queue(queueName).initialCredits(1).priority(10).messageHandler((context, message) -> {
            if (messageReceived.compareAndSet(false, true)) {
                context.accept();
                messageFuture.complete(RabbitAmqpUtils.fromAmqpMessage(message, null));
            }
        }).build();
        return messageFuture.orTimeout(this.completionTimeout.toMillis(), TimeUnit.MILLISECONDS).whenComplete((message, exception) -> consumer.close());
    }

    public CompletableFuture<Object> receiveAndConvert() {
        return this.receiveAndConvert((ParameterizedTypeReference)null);
    }

    public CompletableFuture<Object> receiveAndConvert(String queueName) {
        return this.receiveAndConvert(queueName, null);
    }

    public <T> CompletableFuture<T> receiveAndConvert(@Nullable ParameterizedTypeReference<T> type) {
        return this.receiveAndConvert(this.getRequiredQueue(), type);
    }

    public <T> CompletableFuture<T> receiveAndConvert(String queueName, @Nullable ParameterizedTypeReference<T> type) {
        return this.receive(queueName).thenApply(message -> this.convertReply((org.springframework.amqp.core.Message)message, type));
    }

    private <T> T convertReply(org.springframework.amqp.core.Message message, @Nullable ParameterizedTypeReference<T> type) {
        if (type != null) {
            return (T)this.getRequiredSmartMessageConverter().fromMessage(message, type);
        }
        return (T)this.messageConverter.fromMessage(message);
    }

    private SmartMessageConverter getRequiredSmartMessageConverter() throws IllegalStateException {
        Assert.state((boolean)(this.messageConverter instanceof SmartMessageConverter), (String)"template's message converter must be a SmartMessageConverter");
        return (SmartMessageConverter)this.messageConverter;
    }

    public <R, S> CompletableFuture<Boolean> receiveAndReply(ReceiveAndReplyCallback<R, S> callback) {
        return this.receiveAndReply(this.getRequiredQueue(), callback);
    }

    public <R, S> CompletableFuture<Boolean> receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback) {
        CompletableFuture rpcFuture = new CompletableFuture();
        Consumer.MessageHandler consumerHandler = (context, message) -> {
            org.springframework.amqp.core.Message requestMessage = RabbitAmqpUtils.fromAmqpMessage(message, null);
            try {
                Object messageId = message.messageId();
                Assert.notNull((Object)messageId, (String)"The 'message-id' property has to be set on request. Used for reply correlation.");
                String replyTo = message.replyTo();
                Assert.hasText((String)replyTo, (String)"The 'reply-to' property has to be set on request. Used for reply publishing.");
                org.springframework.amqp.core.Message reply = this.handleRequestAndProduceReply(requestMessage, callback);
                if (reply == null) {
                    LOG.info(() -> "No reply for request: " + String.valueOf(requestMessage));
                    context.accept();
                    rpcFuture.complete(false);
                } else {
                    Message replyMessage = this.getPublisher().message();
                    RabbitAmqpUtils.toAmqpMessage(reply, replyMessage);
                    replyMessage.correlationId(messageId);
                    replyMessage.to(replyTo);
                    this.getPublisher().publish(replyMessage, ctx -> {});
                    context.accept();
                    rpcFuture.complete(true);
                }
            }
            catch (Exception ex) {
                context.discard();
                rpcFuture.completeExceptionally((Throwable)new AmqpIllegalStateException("Failed to process RPC request: " + String.valueOf(requestMessage), (Throwable)ex));
            }
        };
        Consumer consumer = this.connectionFactory.getConnection().consumerBuilder().queue(queueName).initialCredits(1).priority(10).messageHandler(consumerHandler).build();
        return rpcFuture.orTimeout(this.completionTimeout.toMillis(), TimeUnit.MILLISECONDS).whenComplete((message, exception) -> consumer.close());
    }

    private <S, R> @Nullable org.springframework.amqp.core.Message handleRequestAndProduceReply(org.springframework.amqp.core.Message requestMessage, ReceiveAndReplyCallback<R, S> callback) {
        Object reply;
        Object receive = requestMessage;
        if (!ReceiveAndReplyMessageCallback.class.isAssignableFrom(callback.getClass())) {
            receive = this.messageConverter.fromMessage(requestMessage);
        }
        try {
            reply = callback.handle(receive);
        }
        catch (ClassCastException ex) {
            StackTraceElement[] trace = ex.getStackTrace();
            if (trace[0].getMethodName().equals("handle") && Objects.equals(trace[1].getFileName(), "RabbitAmqpTemplate.java")) {
                throw new IllegalArgumentException("ReceiveAndReplyCallback '" + String.valueOf(callback) + "' can't handle received object '" + String.valueOf(receive) + "'", ex);
            }
            throw ex;
        }
        if (reply != null) {
            return this.convertToMessageIfNecessary(reply);
        }
        return null;
    }

    private org.springframework.amqp.core.Message convertToMessageIfNecessary(Object data) {
        if (data instanceof org.springframework.amqp.core.Message) {
            org.springframework.amqp.core.Message msg = (org.springframework.amqp.core.Message)data;
            return msg;
        }
        return this.messageConverter.toMessage(data, new MessageProperties());
    }

    public CompletableFuture<org.springframework.amqp.core.Message> sendAndReceive(org.springframework.amqp.core.Message message) {
        Assert.state((this.defaultExchange != null || this.defaultQueue != null ? 1 : 0) != 0, (String)"For send-n-receive with defaults, an 'exchange' (and optional 'routingKey') or 'queue' must be provided");
        return this.doSendAndReceive(this.defaultExchange, this.defaultRoutingKey, this.defaultQueue, message);
    }

    public CompletableFuture<org.springframework.amqp.core.Message> sendAndReceive(String exchange, @Nullable String routingKey, org.springframework.amqp.core.Message message) {
        return this.doSendAndReceive(exchange, routingKey != null ? routingKey : this.defaultRoutingKey, null, message);
    }

    public CompletableFuture<org.springframework.amqp.core.Message> sendAndReceive(String queue, org.springframework.amqp.core.Message message) {
        return this.doSendAndReceive(null, null, queue, message);
    }

    private CompletableFuture<org.springframework.amqp.core.Message> doSendAndReceive(@Nullable String exchange, @Nullable String routingKey, @Nullable String queue, org.springframework.amqp.core.Message message) {
        MessageProperties messageProperties = message.getMessageProperties();
        String messageId = messageProperties.getMessageId();
        String correlationId = messageProperties.getCorrelationId();
        String replyTo = messageProperties.getReplyTo();
        Supplier<Object> correlationIdSupplier = null;
        if (StringUtils.hasText((String)correlationId)) {
            correlationIdSupplier = () -> correlationId;
        } else if (StringUtils.hasText((String)messageId)) {
            correlationIdSupplier = () -> messageId;
        }
        String replyToQueue = this.defaultReplyToQueue;
        if (StringUtils.hasText((String)replyTo)) {
            replyToQueue = replyTo;
        }
        Requester rpcClient = this.connectionFactory.getConnection().requesterBuilder().requestTimeout(this.publishTimeout).correlationIdSupplier(correlationIdSupplier).replyToQueue(replyToQueue).build();
        Message amqpMessage = RabbitAmqpTemplate.toAmqpMessage(exchange, routingKey, queue, message, () -> ((Requester)rpcClient).message());
        return ((CompletableFuture)rpcClient.publish(amqpMessage).thenApply(reply -> RabbitAmqpUtils.fromAmqpMessage(reply, null))).orTimeout(this.completionTimeout.toMillis(), TimeUnit.MILLISECONDS).whenComplete((replyMessage, exception) -> rpcClient.close());
    }

    public <C> CompletableFuture<C> convertSendAndReceive(Object object) {
        return this.convertSendAndReceiveAsType(object, null, null);
    }

    public <C> CompletableFuture<C> convertSendAndReceive(String queue, Object object) {
        return this.convertSendAndReceiveAsType(queue, object, null, null);
    }

    public <C> CompletableFuture<C> convertSendAndReceive(String exchange, @Nullable String routingKey, Object object) {
        return this.convertSendAndReceiveAsType(exchange, routingKey, object, null, null);
    }

    public <C> CompletableFuture<C> convertSendAndReceive(Object object, MessagePostProcessor messagePostProcessor) {
        return this.convertSendAndReceiveAsType(object, messagePostProcessor, null);
    }

    public <C> CompletableFuture<C> convertSendAndReceive(String queue, Object object, MessagePostProcessor messagePostProcessor) {
        return this.convertSendAndReceiveAsType(queue, object, messagePostProcessor, null);
    }

    public <C> CompletableFuture<C> convertSendAndReceive(String exchange, @Nullable String routingKey, Object object, @Nullable MessagePostProcessor messagePostProcessor) {
        return this.convertSendAndReceiveAsType(exchange, routingKey, object, messagePostProcessor, null);
    }

    public <C> CompletableFuture<C> convertSendAndReceiveAsType(Object object, ParameterizedTypeReference<C> responseType) {
        return this.convertSendAndReceiveAsType(object, null, responseType);
    }

    public <C> CompletableFuture<C> convertSendAndReceiveAsType(String queue, Object object, ParameterizedTypeReference<C> responseType) {
        return this.convertSendAndReceiveAsType(queue, object, null, responseType);
    }

    public <C> CompletableFuture<C> convertSendAndReceiveAsType(String exchange, @Nullable String routingKey, Object object, ParameterizedTypeReference<C> responseType) {
        return this.convertSendAndReceiveAsType(exchange, routingKey, object, null, responseType);
    }

    public <C> CompletableFuture<C> convertSendAndReceiveAsType(Object object, @Nullable MessagePostProcessor messagePostProcessor, @Nullable ParameterizedTypeReference<C> responseType) {
        Assert.state((this.defaultExchange != null || this.defaultQueue != null ? 1 : 0) != 0, (String)"For send with defaults, an 'exchange' (and optional 'routingKey') or 'queue' must be provided");
        return this.doConvertSendAndReceive(this.defaultExchange, this.defaultRoutingKey, this.defaultQueue, object, messagePostProcessor, responseType);
    }

    public <C> CompletableFuture<C> convertSendAndReceiveAsType(String queue, Object object, @Nullable MessagePostProcessor messagePostProcessor, @Nullable ParameterizedTypeReference<C> responseType) {
        return this.doConvertSendAndReceive(null, null, queue, object, messagePostProcessor, responseType);
    }

    public <C> CompletableFuture<C> convertSendAndReceiveAsType(String exchange, @Nullable String routingKey, Object object, @Nullable MessagePostProcessor messagePostProcessor, @Nullable ParameterizedTypeReference<C> responseType) {
        return this.doConvertSendAndReceive(exchange, routingKey != null ? routingKey : this.defaultRoutingKey, null, object, messagePostProcessor, responseType);
    }

    private <C> CompletableFuture<C> doConvertSendAndReceive(@Nullable String exchange, @Nullable String routingKey, @Nullable String queue, Object data, @Nullable MessagePostProcessor messagePostProcessor, @Nullable ParameterizedTypeReference<C> responseType) {
        org.springframework.amqp.core.Message message = this.convertToMessageIfNecessary(data);
        if (messagePostProcessor != null) {
            message = messagePostProcessor.postProcessMessage(message);
        }
        return this.doSendAndReceive(exchange, routingKey, queue, message).thenApply(reply -> this.convertReply((org.springframework.amqp.core.Message)reply, (ParameterizedTypeReference)responseType));
    }

    private static Message toAmqpMessage(@Nullable String exchange, @Nullable String routingKey, @Nullable String queue, org.springframework.amqp.core.Message message, Supplier<Message> amqpMessageSupplier) {
        Message.MessageAddressBuilder address = amqpMessageSupplier.get().toAddress();
        JavaUtils.INSTANCE.acceptIfNotNull((Object)exchange, arg_0 -> ((Message.MessageAddressBuilder)address).exchange(arg_0)).acceptIfNotNull((Object)routingKey, arg_0 -> ((Message.MessageAddressBuilder)address).key(arg_0)).acceptIfNotNull((Object)queue, arg_0 -> ((Message.MessageAddressBuilder)address).queue(arg_0));
        Message amqpMessage = address.message();
        RabbitAmqpUtils.toAmqpMessage(message, amqpMessage);
        return amqpMessage;
    }
}

