package org.springframework.messaging.core;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.util.Assert;

/* loaded from: input_file:lib/spring-messaging-4.2.6.RELEASE.jar:org/springframework/messaging/core/GenericMessagingTemplate.class */
public class GenericMessagingTemplate extends AbstractDestinationResolvingMessagingTemplate<MessageChannel> implements BeanFactoryAware {
    private volatile long sendTimeout = -1;
    private volatile long receiveTimeout = -1;
    private volatile boolean throwExceptionOnLateReply = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/spring-messaging-4.2.6.RELEASE.jar:org/springframework/messaging/core/GenericMessagingTemplate$TemporaryReplyChannel.class */
    public class TemporaryReplyChannel implements PollableChannel {
        private final Log logger;
        private final CountDownLatch replyLatch;
        private volatile Message<?> replyMessage;
        private volatile boolean hasReceived;
        private volatile boolean hasTimedOut;
        private volatile boolean hasSendFailed;

        private TemporaryReplyChannel() {
            this.logger = LogFactory.getLog(TemporaryReplyChannel.class);
            this.replyLatch = new CountDownLatch(1);
        }

        public void setSendFailed(boolean z) {
            this.hasSendFailed = z;
        }

        @Override // org.springframework.messaging.PollableChannel
        public Message<?> receive() {
            return receive(-1L);
        }

        @Override // org.springframework.messaging.PollableChannel
        public Message<?> receive(long j) {
            try {
                if (GenericMessagingTemplate.this.receiveTimeout < 0) {
                    this.replyLatch.await();
                    this.hasReceived = true;
                } else if (this.replyLatch.await(GenericMessagingTemplate.this.receiveTimeout, TimeUnit.MILLISECONDS)) {
                    this.hasReceived = true;
                } else {
                    this.hasTimedOut = true;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return this.replyMessage;
        }

        @Override // org.springframework.messaging.MessageChannel
        public boolean send(Message<?> message) {
            return send(message, -1L);
        }

        @Override // org.springframework.messaging.MessageChannel
        public boolean send(Message<?> message, long j) {
            this.replyMessage = message;
            boolean z = this.hasReceived;
            this.replyLatch.countDown();
            String str = null;
            if (this.hasTimedOut) {
                str = "Reply message received but the receiving thread has exited due to a timeout";
            } else if (z) {
                str = "Reply message received but the receiving thread has already received a reply";
            } else if (this.hasSendFailed) {
                str = "Reply message received but the receiving thread has exited due to an exception while sending the request message";
            }
            if (str == null) {
                return true;
            }
            if (this.logger.isWarnEnabled()) {
                this.logger.warn(str + ":" + message);
            }
            if (GenericMessagingTemplate.this.throwExceptionOnLateReply) {
                throw new MessageDeliveryException(message, str);
            }
            return true;
        }
    }

    public void setSendTimeout(long j) {
        this.sendTimeout = j;
    }

    public long getSendTimeout() {
        return this.sendTimeout;
    }

    public void setReceiveTimeout(long j) {
        this.receiveTimeout = j;
    }

    public long getReceiveTimeout() {
        return this.receiveTimeout;
    }

    public void setThrowExceptionOnLateReply(boolean z) {
        this.throwExceptionOnLateReply = z;
    }

    @Override // org.springframework.beans.factory.BeanFactoryAware
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        super.setDestinationResolver(new BeanFactoryMessageChannelDestinationResolver(beanFactory));
    }

    protected final void doSend(MessageChannel messageChannel, Message<?> message) {
        Assert.notNull(messageChannel, "'channel' is required");
        MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, (Class<MessageHeaderAccessor>) MessageHeaderAccessor.class);
        if (accessor != null && accessor.isMutable()) {
            accessor.setImmutable();
        }
        long j = this.sendTimeout;
        if (!(j >= 0 ? messageChannel.send(message, j) : messageChannel.send(message))) {
            throw new MessageDeliveryException(message, "failed to send message to channel '" + messageChannel + "' within timeout: " + j);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.messaging.core.AbstractMessageReceivingTemplate
    public final Message<?> doReceive(MessageChannel messageChannel) {
        Assert.notNull(messageChannel, "'channel' is required");
        Assert.state(messageChannel instanceof PollableChannel, "A PollableChannel is required to receive messages");
        long j = this.receiveTimeout;
        Message<?> receive = j >= 0 ? ((PollableChannel) messageChannel).receive(j) : ((PollableChannel) messageChannel).receive();
        if (receive == null && this.logger.isTraceEnabled()) {
            this.logger.trace("Failed to receive message from channel '" + messageChannel + "' within timeout: " + j);
        }
        return receive;
    }

    protected final Message<?> doSendAndReceive(MessageChannel messageChannel, Message<?> message) {
        Assert.notNull(messageChannel, "'channel' is required");
        Object replyChannel = message.getHeaders().getReplyChannel();
        Object errorChannel = message.getHeaders().getErrorChannel();
        TemporaryReplyChannel temporaryReplyChannel = new TemporaryReplyChannel();
        try {
            doSend(messageChannel, MessageBuilder.fromMessage(message).setReplyChannel(temporaryReplyChannel).setErrorChannel(temporaryReplyChannel).build());
            Message<?> doReceive = doReceive((MessageChannel) temporaryReplyChannel);
            if (doReceive != null) {
                doReceive = MessageBuilder.fromMessage(doReceive).setHeader(MessageHeaders.REPLY_CHANNEL, replyChannel).setHeader(MessageHeaders.ERROR_CHANNEL, errorChannel).build();
            }
            return doReceive;
        } catch (RuntimeException e) {
            temporaryReplyChannel.setSendFailed(true);
            throw e;
        }
    }

    @Override // org.springframework.messaging.core.AbstractMessagingTemplate
    protected /* bridge */ /* synthetic */ Message doSendAndReceive(Object obj, Message message) {
        return doSendAndReceive((MessageChannel) obj, (Message<?>) message);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.messaging.core.AbstractMessageSendingTemplate
    public /* bridge */ /* synthetic */ void doSend(Object obj, Message message) {
        doSend((MessageChannel) obj, (Message<?>) message);
    }
}
