/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.rabbit;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.jetbrains.annotations.NotNull;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.connection.LocalizedQueueConnectionFactory;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.core.BatchingRabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.amqp.support.postprocessor.DelegatingDecompressingPostProcessor;
import org.springframework.amqp.support.postprocessor.GZipPostProcessor;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.config.BeanDefinitionCustomizer;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.DefaultPollableMessageSource;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.rabbit.BatchCapableRejectAndDontRequeueRecoverer;
import org.springframework.cloud.stream.binder.rabbit.RabbitExpressionEvaluatingInterceptor;
import org.springframework.cloud.stream.binder.rabbit.StreamUtils;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitCommonProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitExtendedBindingProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties;
import org.springframework.cloud.stream.binder.rabbit.provisioning.RabbitExchangeQueueProvisioner;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.cloud.stream.config.MessageSourceCustomizer;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.amqp.inbound.AmqpMessageSource;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public class RabbitMessageChannelBinder
extends AbstractMessageChannelBinder<ExtendedConsumerProperties<RabbitConsumerProperties>, ExtendedProducerProperties<RabbitProducerProperties>, RabbitExchangeQueueProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, RabbitConsumerProperties, RabbitProducerProperties>,
DisposableBean {
    private static final SimplePassthroughMessageConverter passThoughConverter = new SimplePassthroughMessageConverter();
    private static final AmqpMessageHeaderErrorMessageStrategy errorMessageStrategy = new AmqpMessageHeaderErrorMessageStrategy();
    private static final MessagePropertiesConverter inboundMessagePropertiesConverter = new DefaultMessagePropertiesConverter(){

        public MessageProperties toMessageProperties(AMQP.BasicProperties source, Envelope envelope, String charset) {
            MessageProperties properties = super.toMessageProperties(source, envelope, charset);
            properties.setDeliveryMode(null);
            return properties;
        }
    };
    private static final Pattern interceptorNeededPattern = Pattern.compile("(payload|#root|#this)");
    private final RabbitProperties rabbitProperties;
    private boolean destroyConnectionFactory;
    private ConnectionFactory connectionFactory;
    private MessagePostProcessor decompressingPostProcessor = new DelegatingDecompressingPostProcessor();
    private MessagePostProcessor compressingPostProcessor = new GZipPostProcessor();
    private volatile String[] adminAddresses;
    private volatile String[] nodes;
    private volatile boolean clustered;
    private RabbitExtendedBindingProperties extendedBindingProperties = new RabbitExtendedBindingProperties();

    public RabbitMessageChannelBinder(ConnectionFactory connectionFactory, RabbitProperties rabbitProperties, RabbitExchangeQueueProvisioner provisioningProvider) {
        this(connectionFactory, rabbitProperties, provisioningProvider, null, null);
    }

    public RabbitMessageChannelBinder(ConnectionFactory connectionFactory, RabbitProperties rabbitProperties, RabbitExchangeQueueProvisioner provisioningProvider, ListenerContainerCustomizer<MessageListenerContainer> containerCustomizer) {
        this(connectionFactory, rabbitProperties, provisioningProvider, containerCustomizer, null);
    }

    public RabbitMessageChannelBinder(ConnectionFactory connectionFactory, RabbitProperties rabbitProperties, RabbitExchangeQueueProvisioner provisioningProvider, ListenerContainerCustomizer<MessageListenerContainer> containerCustomizer, MessageSourceCustomizer<AmqpMessageSource> sourceCustomizer) {
        super(new String[0], (ProvisioningProvider)provisioningProvider, containerCustomizer, sourceCustomizer);
        Assert.notNull((Object)connectionFactory, (String)"connectionFactory must not be null");
        Assert.notNull((Object)rabbitProperties, (String)"rabbitProperties must not be null");
        this.connectionFactory = connectionFactory;
        this.rabbitProperties = rabbitProperties;
    }

    public void setDecompressingPostProcessor(MessagePostProcessor decompressingPostProcessor) {
        this.decompressingPostProcessor = decompressingPostProcessor;
    }

    public void setCompressingPostProcessor(MessagePostProcessor compressingPostProcessor) {
        this.compressingPostProcessor = compressingPostProcessor;
    }

    public void setAdminAddresses(String[] adminAddresses) {
        this.adminAddresses = Arrays.copyOf(adminAddresses, adminAddresses.length);
    }

    public void setNodes(String[] nodes) {
        this.nodes = Arrays.copyOf(nodes, nodes.length);
        this.clustered = nodes.length > 1;
    }

    public void setExtendedBindingProperties(RabbitExtendedBindingProperties extendedBindingProperties) {
        this.extendedBindingProperties = extendedBindingProperties;
    }

    public void onInit() throws Exception {
        super.onInit();
        if (this.clustered) {
            String[] addresses = StringUtils.commaDelimitedListToStringArray((String)this.rabbitProperties.getAddresses());
            Assert.state((addresses.length == this.adminAddresses.length && addresses.length == this.nodes.length ? 1 : 0) != 0, (String)"'addresses', 'adminAddresses', and 'nodes' properties must have equal length");
            this.connectionFactory = new LocalizedQueueConnectionFactory(this.connectionFactory, addresses, this.adminAddresses, this.nodes, this.rabbitProperties.getVirtualHost(), this.rabbitProperties.getUsername(), this.rabbitProperties.getPassword(), this.rabbitProperties.getSsl().getEnabled().booleanValue(), this.rabbitProperties.getSsl().getKeyStore(), this.rabbitProperties.getSsl().getTrustStore(), this.rabbitProperties.getSsl().getKeyStorePassword(), this.rabbitProperties.getSsl().getTrustStorePassword());
            this.destroyConnectionFactory = true;
        }
    }

    public void destroy() throws Exception {
        if (this.connectionFactory instanceof DisposableBean && this.destroyConnectionFactory) {
            ((DisposableBean)this.connectionFactory).destroy();
        }
    }

    public ConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    public RabbitConsumerProperties getExtendedConsumerProperties(String channelName) {
        return (RabbitConsumerProperties)this.extendedBindingProperties.getExtendedConsumerProperties(channelName);
    }

    public RabbitProducerProperties getExtendedProducerProperties(String channelName) {
        return (RabbitProducerProperties)this.extendedBindingProperties.getExtendedProducerProperties(channelName);
    }

    public String getDefaultsPrefix() {
        return this.extendedBindingProperties.getDefaultsPrefix();
    }

    public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
        return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
    }

    public String getBinderIdentity() {
        return "rabbit-" + super.getBinderIdentity();
    }

    protected MessageHandler createProducerMessageHandler(ProducerDestination producerDestination, ExtendedProducerProperties<RabbitProducerProperties> producerProperties, MessageChannel errorChannel) {
        Assert.state((!HeaderMode.embeddedHeaders.equals((Object)producerProperties.getHeaderMode()) ? 1 : 0) != 0, (String)"the RabbitMQ binder does not support embedded headers since RabbitMQ supports headers natively");
        String prefix = ((RabbitProducerProperties)producerProperties.getExtension()).getPrefix();
        String exchangeName = producerDestination.getName();
        String destination = !StringUtils.hasText((String)prefix) ? exchangeName : exchangeName.substring(prefix.length());
        RabbitProducerProperties extendedProperties = (RabbitProducerProperties)producerProperties.getExtension();
        Object endpoint = !RabbitProducerProperties.ProducerType.AMQP.equals((Object)((RabbitProducerProperties)producerProperties.getExtension()).getProducerType()) ? StreamUtils.createStreamMessageHandler(producerDestination, producerProperties, errorChannel, destination, extendedProperties, this.getApplicationContext(), this::configureHeaderMapper) : this.amqpHandler(producerDestination, producerProperties, errorChannel, destination, extendedProperties);
        return endpoint;
    }

    private AmqpOutboundEndpoint amqpHandler(ProducerDestination producerDestination, ExtendedProducerProperties<RabbitProducerProperties> producerProperties, MessageChannel errorChannel, String destination, RabbitProducerProperties extendedProperties) {
        AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint((AmqpTemplate)this.buildRabbitTemplate(extendedProperties, errorChannel != null || extendedProperties.isUseConfirmHeader()));
        endpoint.setExchangeName(producerDestination.getName());
        boolean expressionInterceptorNeeded = this.expressionInterceptorNeeded(extendedProperties);
        Expression routingKeyExpression = extendedProperties.getRoutingKeyExpression();
        if (!producerProperties.isPartitioned()) {
            if (routingKeyExpression == null) {
                endpoint.setRoutingKey(destination);
            } else if (expressionInterceptorNeeded) {
                endpoint.setRoutingKeyExpressionString("headers['scst_routingKey']");
            } else {
                endpoint.setRoutingKeyExpression(routingKeyExpression);
            }
        } else if (routingKeyExpression == null) {
            endpoint.setRoutingKeyExpression(this.buildPartitionRoutingExpression(destination, false));
        } else if (expressionInterceptorNeeded) {
            endpoint.setRoutingKeyExpression(this.buildPartitionRoutingExpression("headers['scst_routingKey']", true));
        } else {
            endpoint.setRoutingKeyExpression(this.buildPartitionRoutingExpression(routingKeyExpression.getExpressionString(), true));
        }
        if (extendedProperties.getDelayExpression() != null) {
            if (expressionInterceptorNeeded) {
                endpoint.setDelayExpressionString("headers['scst_delay']");
            } else {
                endpoint.setDelayExpression(extendedProperties.getDelayExpression());
            }
        }
        endpoint.setHeaderMapper(this.configureHeaderMapper(extendedProperties));
        endpoint.setDefaultDeliveryMode(extendedProperties.getDeliveryMode());
        endpoint.setBeanFactory((BeanFactory)this.getBeanFactory());
        if (errorChannel != null) {
            this.checkConnectionFactoryIsErrorCapable();
            endpoint.setReturnChannel(errorChannel);
            if (!extendedProperties.isUseConfirmHeader()) {
                String ackChannelBeanName;
                endpoint.setConfirmNackChannel(errorChannel);
                String string = ackChannelBeanName = StringUtils.hasText((String)extendedProperties.getConfirmAckChannel()) ? extendedProperties.getConfirmAckChannel() : "nullChannel";
                if (!ackChannelBeanName.equals("nullChannel") && !this.getApplicationContext().containsBean(ackChannelBeanName)) {
                    GenericApplicationContext context = (GenericApplicationContext)this.getApplicationContext();
                    context.registerBean(ackChannelBeanName, DirectChannel.class, () -> new DirectChannel(), new BeanDefinitionCustomizer[0]);
                }
                endpoint.setConfirmAckChannelName(ackChannelBeanName);
                endpoint.setConfirmCorrelationExpressionString("#root");
            } else {
                Assert.state((!StringUtils.hasText((String)extendedProperties.getConfirmAckChannel()) ? 1 : 0) != 0, (String)"You cannot specify a 'confirmAckChannel' when 'useConfirmHeader' is true");
            }
            endpoint.setErrorMessageStrategy((ErrorMessageStrategy)new DefaultErrorMessageStrategy());
        }
        endpoint.setHeadersMappedLast(true);
        return endpoint;
    }

    private AmqpHeaderMapper configureHeaderMapper(RabbitProducerProperties extendedProperties) {
        DefaultAmqpHeaderMapper mapper = DefaultAmqpHeaderMapper.outboundMapper();
        ArrayList<String> headerPatterns = new ArrayList<String>(extendedProperties.getHeaderPatterns().length + 3);
        if (!extendedProperties.isSuperStream()) {
            headerPatterns.add("!scst_partition");
        }
        headerPatterns.add("!sourceData");
        headerPatterns.add("!deliveryAttempt");
        headerPatterns.add("!rabbitmq_streamContext");
        headerPatterns.addAll(Arrays.asList(extendedProperties.getHeaderPatterns()));
        mapper.setRequestHeaderNames(headerPatterns.toArray(new String[headerPatterns.size()]));
        return mapper;
    }

    protected void postProcessOutputChannel(MessageChannel outputChannel, ExtendedProducerProperties<RabbitProducerProperties> producerProperties) {
        RabbitProducerProperties extendedProperties = (RabbitProducerProperties)producerProperties.getExtension();
        if (this.expressionInterceptorNeeded(extendedProperties)) {
            ((AbstractMessageChannel)outputChannel).addInterceptor(0, (ChannelInterceptor)new RabbitExpressionEvaluatingInterceptor(extendedProperties.getRoutingKeyExpression(), extendedProperties.getDelayExpression(), this.getEvaluationContext()));
        }
    }

    private boolean expressionInterceptorNeeded(RabbitProducerProperties extendedProperties) {
        Expression rkExpression = extendedProperties.getRoutingKeyExpression();
        Expression delayExpression = extendedProperties.getDelayExpression();
        return rkExpression != null && interceptorNeededPattern.matcher(rkExpression.getExpressionString()).find() || delayExpression != null && interceptorNeededPattern.matcher(delayExpression.getExpressionString()).find();
    }

    private void checkConnectionFactoryIsErrorCapable() {
        if (!(this.connectionFactory instanceof CachingConnectionFactory)) {
            this.logger.warn((Object)("Unknown connection factory type, cannot determine error capabilities: " + this.connectionFactory.getClass()));
        } else {
            CachingConnectionFactory ccf = (CachingConnectionFactory)this.connectionFactory;
            if (!ccf.isPublisherConfirms() && !ccf.isPublisherReturns()) {
                this.logger.warn((Object)"Producer error channel is enabled, but the connection factory is not configured for returns or confirms; the error channel will receive no messages");
            } else if (!ccf.isPublisherConfirms()) {
                this.logger.info((Object)"Producer error channel is enabled, but the connection factory is only configured to handle returned messages; negative acks will not be reported");
            } else if (!ccf.isPublisherReturns()) {
                this.logger.info((Object)"Producer error channel is enabled, but the connection factory is only configured to handle negatively acked messages; returned messages will not be reported");
            }
        }
    }

    private Expression buildPartitionRoutingExpression(String expressionRoot, boolean rootIsExpression) {
        String partitionRoutingExpression = rootIsExpression ? expressionRoot + " + '-' + headers['scst_partition']" : "'" + expressionRoot + "-' + headers['scst_partition']";
        return new SpelExpressionParser().parseExpression(partitionRoutingExpression);
    }

    protected MessageProducer createConsumerEndpoint(ConsumerDestination consumerDestination, String group, ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
        Assert.state((!HeaderMode.embeddedHeaders.equals((Object)properties.getHeaderMode()) ? 1 : 0) != 0, (String)"the RabbitMQ binder does not support embedded headers since RabbitMQ supports headers natively");
        String destination = consumerDestination.getName();
        RabbitConsumerProperties extension = (RabbitConsumerProperties)properties.getExtension();
        MessageListenerContainer listenerContainer = this.createAndConfigureContainer(consumerDestination, group, properties, destination, extension);
        String[] queues = StringUtils.tokenizeToStringArray((String)destination, (String)",", (boolean)true, (boolean)true);
        if (((RabbitConsumerProperties)properties.getExtension()).getContainerType() != RabbitConsumerProperties.ContainerType.STREAM || !((RabbitConsumerProperties)properties.getExtension()).isSuperStream()) {
            listenerContainer.setQueueNames(queues);
        }
        this.getContainerCustomizer().configure((Object)listenerContainer, consumerDestination.getName(), group);
        listenerContainer.afterPropertiesSet();
        AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
        adapter.setBindSourceMessage(true);
        adapter.setBeanFactory((BeanFactory)this.getBeanFactory());
        adapter.setBeanName("inbound." + destination);
        DefaultAmqpHeaderMapper mapper = DefaultAmqpHeaderMapper.inboundMapper();
        mapper.setRequestHeaderNames(extension.getHeaderPatterns());
        adapter.setHeaderMapper((AmqpHeaderMapper)mapper);
        AbstractMessageChannelBinder.ErrorInfrastructure errorInfrastructure = this.registerErrorInfrastructure(consumerDestination, group, (ConsumerProperties)properties);
        if (properties.getMaxAttempts() > 1) {
            adapter.setRetryTemplate(this.buildRetryTemplate((ConsumerProperties)properties));
            adapter.setRecoveryCallback((RecoveryCallback)errorInfrastructure.getRecoverer());
        } else {
            adapter.setErrorMessageStrategy((ErrorMessageStrategy)errorMessageStrategy);
            adapter.setErrorChannel((MessageChannel)errorInfrastructure.getErrorChannel());
        }
        adapter.setMessageConverter((MessageConverter)passThoughConverter);
        RabbitConsumerProperties.ContainerType containerType = extension.getContainerType();
        if (properties.isBatchMode() && extension.isEnableBatching() && RabbitConsumerProperties.ContainerType.SIMPLE.equals((Object)containerType)) {
            adapter.setBatchMode(AmqpInboundChannelAdapter.BatchMode.EXTRACT_PAYLOADS_WITH_HEADERS);
        }
        if (containerType.equals((Object)RabbitConsumerProperties.ContainerType.STREAM)) {
            StreamUtils.configureAdapter(adapter);
        }
        return adapter;
    }

    private MessageListenerContainer createAndConfigureContainer(ConsumerDestination consumerDestination, String group, ExtendedConsumerProperties<RabbitConsumerProperties> properties, String destination, RabbitConsumerProperties extension) {
        if (extension.getContainerType().equals((Object)RabbitConsumerProperties.ContainerType.STREAM)) {
            return StreamUtils.createContainer(consumerDestination, group, properties, destination, (ApplicationContext)this.getApplicationContext());
        }
        boolean directContainer = extension.getContainerType().equals((Object)RabbitConsumerProperties.ContainerType.DIRECT);
        DirectMessageListenerContainer listenerContainer = directContainer ? new DirectMessageListenerContainer(this.connectionFactory) : new SimpleMessageListenerContainer(this.connectionFactory);
        listenerContainer.setBeanName(consumerDestination.getName() + "." + group + ".container");
        listenerContainer.setAcknowledgeMode(extension.getAcknowledgeMode());
        listenerContainer.setChannelTransacted(extension.isTransacted());
        listenerContainer.setDefaultRequeueRejected(extension.isRequeueRejected());
        int concurrency = properties.getConcurrency();
        int n = concurrency = concurrency > 0 ? concurrency : 1;
        if (directContainer) {
            this.setDMLCProperties(properties, listenerContainer, concurrency);
        } else {
            this.setSMLCProperties(properties, (SimpleMessageListenerContainer)listenerContainer, concurrency);
        }
        listenerContainer.setPrefetchCount(extension.getPrefetch());
        listenerContainer.setRecoveryInterval(extension.getRecoveryInterval());
        listenerContainer.setTaskExecutor((Executor)new SimpleAsyncTaskExecutor(consumerDestination.getName() + "-"));
        listenerContainer.setAfterReceivePostProcessors(new MessagePostProcessor[]{this.decompressingPostProcessor});
        listenerContainer.setMessagePropertiesConverter(inboundMessagePropertiesConverter);
        listenerContainer.setExclusive(extension.isExclusive());
        listenerContainer.setMissingQueuesFatal(extension.getMissingQueuesFatal());
        if (extension.getFailedDeclarationRetryInterval() != null) {
            listenerContainer.setFailedDeclarationRetryInterval(extension.getFailedDeclarationRetryInterval().longValue());
        }
        if (this.getApplicationEventPublisher() != null) {
            listenerContainer.setApplicationEventPublisher(this.getApplicationEventPublisher());
        } else if (this.getApplicationContext() != null) {
            listenerContainer.setApplicationEventPublisher((ApplicationEventPublisher)this.getApplicationContext());
        }
        if (StringUtils.hasText((String)extension.getConsumerTagPrefix())) {
            AtomicInteger index = new AtomicInteger();
            listenerContainer.setConsumerTagStrategy(q -> extension.getConsumerTagPrefix() + "#" + index.getAndIncrement());
        }
        return listenerContainer;
    }

    private void setSMLCProperties(ExtendedConsumerProperties<RabbitConsumerProperties> properties, SimpleMessageListenerContainer listenerContainer, int concurrency) {
        RabbitConsumerProperties extension = (RabbitConsumerProperties)properties.getExtension();
        listenerContainer.setConcurrentConsumers(concurrency);
        int maxConcurrency = extension.getMaxConcurrency();
        if (maxConcurrency > concurrency) {
            listenerContainer.setMaxConcurrentConsumers(maxConcurrency);
        }
        listenerContainer.setDeBatchingEnabled(!properties.isBatchMode());
        listenerContainer.setBatchSize(extension.getBatchSize());
        if (extension.getQueueDeclarationRetries() != null) {
            listenerContainer.setDeclarationRetries(extension.getQueueDeclarationRetries().intValue());
        }
        if (properties.isBatchMode() && extension.isEnableBatching()) {
            listenerContainer.setConsumerBatchEnabled(true);
            listenerContainer.setDeBatchingEnabled(true);
        }
        if (extension.getReceiveTimeout() != null) {
            listenerContainer.setReceiveTimeout(extension.getReceiveTimeout().longValue());
        }
    }

    private void setDMLCProperties(ExtendedConsumerProperties<RabbitConsumerProperties> properties, DirectMessageListenerContainer listenerContainer, int concurrency) {
        listenerContainer.setConsumersPerQueue(concurrency);
        if (((RabbitConsumerProperties)properties.getExtension()).getMaxConcurrency() > concurrency) {
            this.logger.warn((Object)"maxConcurrency is not supported by the direct container type");
        }
        if (((RabbitConsumerProperties)properties.getExtension()).getBatchSize() > 1) {
            this.logger.warn((Object)"batchSize is not supported by the direct container type");
        }
        if (((RabbitConsumerProperties)properties.getExtension()).getQueueDeclarationRetries() != null) {
            this.logger.warn((Object)"queueDeclarationRetries is not supported by the direct container type");
        }
    }

    protected AbstractMessageChannelBinder.PolledConsumerResources createPolledConsumerResources(String name, String group, ConsumerDestination destination, ExtendedConsumerProperties<RabbitConsumerProperties> consumerProperties) {
        Assert.isTrue((!consumerProperties.isMultiplex() ? 1 : 0) != 0, (String)"The Spring Integration polled MessageSource does not currently support muiltiple queues");
        AmqpMessageSource source = new AmqpMessageSource(this.connectionFactory, destination.getName());
        source.setRawMessageHeader(true);
        this.getMessageSourceCustomizer().configure((Object)source, destination.getName(), group);
        return new AbstractMessageChannelBinder.PolledConsumerResources((MessageSource)source, this.registerErrorInfrastructure(destination, group, (ConsumerProperties)consumerProperties, true));
    }

    protected void postProcessPollableSource(DefaultPollableMessageSource bindingTarget) {
        bindingTarget.setAttributesProvider((accessor, message) -> {
            Object rawMessage = message.getHeaders().get((Object)"amqp_raw_message");
            if (rawMessage != null) {
                accessor.setAttribute("amqp_raw_message", rawMessage);
            }
        });
    }

    protected ErrorMessageStrategy getErrorMessageStrategy() {
        return errorMessageStrategy;
    }

    protected MessageHandler getErrorMessageHandler(ConsumerDestination destination, String group, final ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
        if (((RabbitConsumerProperties)properties.getExtension()).isRepublishToDlq()) {
            return new MessageHandler(){
                private static final long ACK_TIMEOUT = 10000L;
                private final RabbitTemplate template;
                private final CachingConnectionFactory.ConfirmType confirmType;
                private final String exchange;
                private final String routingKey;
                private final int frameMaxHeadroom;
                private int maxStackTraceLength;
                private Boolean dlxPresent;
                {
                    this.template = new RabbitTemplate(RabbitMessageChannelBinder.this.connectionFactory);
                    this.template.setUsePublisherConnection(true);
                    this.template.setChannelTransacted(((RabbitConsumerProperties)properties.getExtension()).isTransacted());
                    this.template.setMandatory(RabbitMessageChannelBinder.this.connectionFactory.isPublisherReturns());
                    this.confirmType = RabbitMessageChannelBinder.this.connectionFactory.isSimplePublisherConfirms() ? CachingConnectionFactory.ConfirmType.SIMPLE : (RabbitMessageChannelBinder.this.connectionFactory.isPublisherConfirms() ? CachingConnectionFactory.ConfirmType.CORRELATED : CachingConnectionFactory.ConfirmType.NONE);
                    this.exchange = RabbitMessageChannelBinder.this.deadLetterExchangeName((RabbitCommonProperties)properties.getExtension());
                    this.routingKey = ((RabbitConsumerProperties)properties.getExtension()).getDeadLetterRoutingKey();
                    this.frameMaxHeadroom = ((RabbitConsumerProperties)properties.getExtension()).getFrameMaxHeadroom();
                    this.maxStackTraceLength = -1;
                }

                public void handleMessage(Message<?> message) throws MessagingException {
                    List<org.springframework.amqp.core.Message> amqpMessages = RabbitMessageChannelBinder.this.extractAmqpMessages(message, (ExtendedConsumerProperties<RabbitConsumerProperties>)properties);
                    if (!(message instanceof ErrorMessage)) {
                        RabbitMessageChannelBinder.this.logger.error((Object)("Expected an ErrorMessage, not a " + message.getClass().toString() + " for: " + message));
                    } else if (amqpMessages == null) {
                        RabbitMessageChannelBinder.this.logger.error((Object)("No raw message header in " + message));
                    } else {
                        Message original;
                        int rabbitMaxStackTraceLength;
                        if (!this.checkDlx()) {
                            return;
                        }
                        Throwable cause = (Throwable)message.getPayload();
                        if (!this.shouldRepublish(cause)) {
                            if (RabbitMessageChannelBinder.this.logger.isDebugEnabled()) {
                                RabbitMessageChannelBinder.this.logger.debug((Object)("Skipping republish of: " + message));
                            }
                            return;
                        }
                        String stackTraceAsString = RabbitMessageChannelBinder.this.getStackTraceAsString(cause);
                        if (this.maxStackTraceLength < 0 && (rabbitMaxStackTraceLength = RabbitUtils.getMaxFrame((ConnectionFactory)this.template.getConnectionFactory())) > 0) {
                            this.maxStackTraceLength = rabbitMaxStackTraceLength - this.frameMaxHeadroom;
                        }
                        if (this.maxStackTraceLength > 0 && stackTraceAsString.length() > this.maxStackTraceLength) {
                            stackTraceAsString = stackTraceAsString.substring(0, this.maxStackTraceLength);
                            RabbitMessageChannelBinder.this.logger.warn((Object)"Stack trace in republished message header truncated due to frame_max limitations; consider increasing frame_max on the broker or reduce the stack trace depth", cause);
                        }
                        for (org.springframework.amqp.core.Message amqpMessage : amqpMessages) {
                            MessageProperties messageProperties = this.adjustMessagePropertiesHeader(cause, stackTraceAsString, amqpMessage);
                            this.doSend(this.exchange, this.routingKey != null ? this.routingKey : messageProperties.getConsumerQueue(), amqpMessage);
                        }
                        if (((RabbitConsumerProperties)properties.getExtension()).getAcknowledgeMode().equals((Object)AcknowledgeMode.MANUAL) && (original = ((ErrorMessage)message).getOriginalMessage()) != null) {
                            try {
                                ((Channel)original.getHeaders().get((Object)"amqp_channel", Channel.class)).basicAck(((Long)original.getHeaders().get((Object)"amqp_deliveryTag", Long.class)).longValue(), false);
                            }
                            catch (IOException e) {
                                RabbitMessageChannelBinder.this.logger.debug((Object)"Failed to ack original message", (Throwable)e);
                            }
                        }
                    }
                }

                @NotNull
                private MessageProperties adjustMessagePropertiesHeader(Throwable cause, String stackTraceAsString, org.springframework.amqp.core.Message amqpMessage) {
                    MessageProperties messageProperties = amqpMessage.getMessageProperties();
                    Map headers = messageProperties.getHeaders();
                    headers.put("x-exception-stacktrace", stackTraceAsString);
                    headers.put("x-exception-message", cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
                    headers.put("x-original-exchange", messageProperties.getReceivedExchange());
                    headers.put("x-original-routingKey", messageProperties.getReceivedRoutingKey());
                    if (((RabbitConsumerProperties)properties.getExtension()).getRepublishDeliveyMode() != null) {
                        messageProperties.setDeliveryMode(((RabbitConsumerProperties)properties.getExtension()).getRepublishDeliveyMode());
                    }
                    return messageProperties;
                }

                private void doSend(String exchange, String routingKey, org.springframework.amqp.core.Message amqpMessage) {
                    if (CachingConnectionFactory.ConfirmType.SIMPLE.equals((Object)this.confirmType)) {
                        this.template.invoke(temp -> {
                            temp.send(exchange, routingKey, amqpMessage);
                            if (!temp.waitForConfirms(10000L)) {
                                throw new AmqpRejectAndDontRequeueException("Negative ack for DLQ message received");
                            }
                            return null;
                        });
                    } else if (CachingConnectionFactory.ConfirmType.CORRELATED.equals((Object)this.confirmType)) {
                        CorrelationData corr = new CorrelationData();
                        this.template.send(exchange, routingKey, amqpMessage, corr);
                        try {
                            CorrelationData.Confirm confirm = (CorrelationData.Confirm)corr.getFuture().get(10000L, TimeUnit.MILLISECONDS);
                            if (!confirm.isAck()) {
                                throw new AmqpRejectAndDontRequeueException("Negative ack for DLQ message received");
                            }
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            throw new AmqpRejectAndDontRequeueException((Throwable)e);
                        }
                        catch (ExecutionException e) {
                            throw new AmqpRejectAndDontRequeueException(e.getCause());
                        }
                        catch (TimeoutException e) {
                            throw new AmqpRejectAndDontRequeueException((Throwable)e);
                        }
                        if (corr.getReturned() != null) {
                            RabbitMessageChannelBinder.this.logger.error((Object)("DLQ message was returned: " + amqpMessage));
                            throw new AmqpRejectAndDontRequeueException("DLQ message was returned");
                        }
                    } else {
                        this.template.send(exchange, routingKey, amqpMessage);
                    }
                }

                private boolean checkDlx() {
                    if (this.dlxPresent == null) {
                        this.dlxPresent = ((RabbitConsumerProperties)properties.getExtension()).isAutoBindDlq() ? Boolean.TRUE : (Boolean)this.template.execute(channel -> {
                            String dlx = RabbitMessageChannelBinder.this.deadLetterExchangeName((RabbitCommonProperties)properties.getExtension());
                            try {
                                channel.exchangeDeclarePassive(dlx);
                                return Boolean.TRUE;
                            }
                            catch (IOException e) {
                                RabbitMessageChannelBinder.this.logger.warn((Object)("'republishToDlq' is true, but the '" + dlx + "' dead letter exchange is not present; disabling 'republishToDlq'"));
                                return Boolean.FALSE;
                            }
                        });
                    }
                    return this.dlxPresent;
                }

                private boolean shouldRepublish(Throwable throwable) {
                    Throwable cause;
                    for (cause = throwable; cause != null && !(cause instanceof AmqpRejectAndDontRequeueException) && !(cause instanceof ImmediateAcknowledgeAmqpException); cause = cause.getCause()) {
                    }
                    return !(cause instanceof ImmediateAcknowledgeAmqpException);
                }
            };
        }
        if (properties.getMaxAttempts() > 1) {
            return new MessageHandler(){
                private final BatchCapableRejectAndDontRequeueRecoverer recoverer = new BatchCapableRejectAndDontRequeueRecoverer();

                public void handleMessage(Message<?> message) throws MessagingException {
                    List<org.springframework.amqp.core.Message> amqpMessages = RabbitMessageChannelBinder.this.extractAmqpMessages(message, (ExtendedConsumerProperties<RabbitConsumerProperties>)properties);
                    if (!(message instanceof ErrorMessage)) {
                        RabbitMessageChannelBinder.this.logger.error((Object)("Expected an ErrorMessage, not a " + message.getClass().toString() + " for: " + message));
                        throw new ListenerExecutionFailedException("Unexpected error message " + message, (Throwable)new AmqpRejectAndDontRequeueException(""), (org.springframework.amqp.core.Message[])null);
                    }
                    if (amqpMessages == null || amqpMessages.isEmpty()) {
                        RabbitMessageChannelBinder.this.logger.error((Object)("No raw message header in " + message));
                        throw new ListenerExecutionFailedException("Unexpected error message " + message, (Throwable)new AmqpRejectAndDontRequeueException(""), (org.springframework.amqp.core.Message[])amqpMessages.toArray(org.springframework.amqp.core.Message[]::new));
                    }
                    this.recoverer.recover(amqpMessages, (Throwable)message.getPayload());
                }
            };
        }
        return super.getErrorMessageHandler(destination, group, properties);
    }

    private List<org.springframework.amqp.core.Message> extractAmqpMessages(Message<?> message, ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
        if (properties.isBatchMode() || ((RabbitConsumerProperties)properties.getExtension()).isEnableBatching()) {
            this.logger.debug((Object)"Batch mode enabled: Extract list instead of single message");
            return (List)StaticMessageHeaderAccessor.getSourceData(message);
        }
        org.springframework.amqp.core.Message amqpMessage = (org.springframework.amqp.core.Message)StaticMessageHeaderAccessor.getSourceData(message);
        return List.of(amqpMessage);
    }

    protected MessageHandler getPolledConsumerErrorMessageHandler(ConsumerDestination destination, String group, ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
        MessageHandler handler = this.getErrorMessageHandler(destination, group, properties);
        if (handler != null) {
            return handler;
        }
        MessageHandler superHandler = super.getErrorMessageHandler(destination, group, properties);
        return message -> {
            AcknowledgmentCallback ack;
            org.springframework.amqp.core.Message amqpMessage = (org.springframework.amqp.core.Message)message.getHeaders().get((Object)"amqp_raw_message");
            if (!(message instanceof ErrorMessage)) {
                this.logger.error((Object)("Expected an ErrorMessage, not a " + message.getClass().toString() + " for: " + message));
            } else if (amqpMessage == null) {
                if (superHandler != null) {
                    superHandler.handleMessage(message);
                }
            } else if (message.getPayload() instanceof MessagingException && (ack = StaticMessageHeaderAccessor.getAcknowledgmentCallback((Message)((MessagingException)message.getPayload()).getFailedMessage())) != null) {
                if (((RabbitConsumerProperties)properties.getExtension()).isRequeueRejected()) {
                    ack.acknowledge(AcknowledgmentCallback.Status.REQUEUE);
                } else {
                    ack.acknowledge(AcknowledgmentCallback.Status.REJECT);
                }
            }
        };
    }

    private String deadLetterExchangeName(RabbitCommonProperties properties) {
        if (properties.getDeadLetterExchange() == null) {
            return RabbitMessageChannelBinder.applyPrefix((String)properties.getPrefix(), (String)"DLX");
        }
        return properties.getDeadLetterExchange();
    }

    protected void afterUnbindConsumer(ConsumerDestination consumerDestination, String group, ExtendedConsumerProperties<RabbitConsumerProperties> consumerProperties) {
        ((RabbitExchangeQueueProvisioner)this.provisioningProvider).cleanAutoDeclareContext(consumerDestination, consumerProperties);
    }

    protected void afterUnbindProducer(ProducerDestination destination, ExtendedProducerProperties<RabbitProducerProperties> producerProperties) {
        ((RabbitExchangeQueueProvisioner)this.provisioningProvider).cleanAutoDeclareContext(destination, producerProperties);
    }

    private RabbitTemplate buildRabbitTemplate(RabbitProducerProperties properties, boolean mandatory) {
        RabbitTemplate rabbitTemplate;
        if (properties.isBatchingEnabled()) {
            BatchingStrategy batchingStrategy = this.getBatchingStrategy(properties);
            TaskScheduler taskScheduler = (TaskScheduler)this.getApplicationContext().getBean("taskScheduler", TaskScheduler.class);
            rabbitTemplate = new BatchingRabbitTemplate(batchingStrategy, taskScheduler);
        } else {
            rabbitTemplate = new RabbitTemplate();
        }
        rabbitTemplate.setMessageConverter((MessageConverter)passThoughConverter);
        rabbitTemplate.setChannelTransacted(properties.isTransacted());
        rabbitTemplate.setConnectionFactory(this.connectionFactory);
        rabbitTemplate.setUsePublisherConnection(true);
        if (properties.isCompress()) {
            rabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor[]{this.compressingPostProcessor});
        }
        rabbitTemplate.setMandatory(mandatory);
        if (this.rabbitProperties != null && this.rabbitProperties.getTemplate().getRetry().isEnabled()) {
            RabbitProperties.Retry retry = this.rabbitProperties.getTemplate().getRetry();
            SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(retry.getMaxAttempts());
            ExponentialBackOffPolicy backOff = new ExponentialBackOffPolicy();
            backOff.setInitialInterval(retry.getInitialInterval().toMillis());
            backOff.setMultiplier(retry.getMultiplier());
            backOff.setMaxInterval(retry.getMaxInterval().toMillis());
            RetryTemplate retryTemplate = new RetryTemplate();
            retryTemplate.setRetryPolicy((RetryPolicy)retryPolicy);
            retryTemplate.setBackOffPolicy((BackOffPolicy)backOff);
            rabbitTemplate.setRetryTemplate(retryTemplate);
        }
        rabbitTemplate.afterPropertiesSet();
        return rabbitTemplate;
    }

    private BatchingStrategy getBatchingStrategy(RabbitProducerProperties properties) {
        Object batchingStrategy = properties.getBatchingStrategyBeanName() != null ? (BatchingStrategy)this.getApplicationContext().getBean(properties.getBatchingStrategyBeanName(), BatchingStrategy.class) : new SimpleBatchingStrategy(properties.getBatchSize(), properties.getBatchBufferLimit(), (long)properties.getBatchTimeout());
        return batchingStrategy;
    }

    private String getStackTraceAsString(Throwable cause) {
        StringWriter stringWriter = new StringWriter();
        PrintWriter printWriter = new PrintWriter((Writer)stringWriter, true);
        cause.printStackTrace(printWriter);
        return stringWriter.getBuffer().toString();
    }

    private static final class SimplePassthroughMessageConverter
    extends AbstractMessageConverter {
        private static final SimpleMessageConverter converter = new SimpleMessageConverter();

        SimplePassthroughMessageConverter() {
        }

        protected org.springframework.amqp.core.Message createMessage(Object object, MessageProperties messageProperties) {
            if (object instanceof byte[]) {
                return new org.springframework.amqp.core.Message((byte[])object, messageProperties);
            }
            return converter.toMessage(object, messageProperties);
        }

        public Object fromMessage(org.springframework.amqp.core.Message message) throws MessageConversionException {
            return message.getBody();
        }
    }
}

