/*
 * Decompiled with CFR 0.152.
 */
package com.azure.spring.cloud.stream.binder.servicebus;

import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.azure.spring.cloud.core.implementation.util.AzurePropertiesUtils;
import com.azure.spring.cloud.core.properties.AzureProperties;
import com.azure.spring.cloud.stream.binder.servicebus.config.ServiceBusProcessorFactoryCustomizer;
import com.azure.spring.cloud.stream.binder.servicebus.config.ServiceBusProducerFactoryCustomizer;
import com.azure.spring.cloud.stream.binder.servicebus.core.properties.ServiceBusConsumerProperties;
import com.azure.spring.cloud.stream.binder.servicebus.core.properties.ServiceBusExtendedBindingProperties;
import com.azure.spring.cloud.stream.binder.servicebus.core.properties.ServiceBusProducerProperties;
import com.azure.spring.cloud.stream.binder.servicebus.core.provisioning.ServiceBusChannelProvisioner;
import com.azure.spring.integration.core.handler.DefaultMessageHandler;
import com.azure.spring.integration.core.implementation.instrumentation.DefaultInstrumentation;
import com.azure.spring.integration.core.implementation.instrumentation.DefaultInstrumentationManager;
import com.azure.spring.integration.core.implementation.instrumentation.InstrumentationSendCallback;
import com.azure.spring.integration.core.instrumentation.Instrumentation;
import com.azure.spring.integration.core.instrumentation.InstrumentationManager;
import com.azure.spring.integration.servicebus.implementation.health.ServiceBusProcessorInstrumentation;
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import com.azure.spring.messaging.PropertiesSupplier;
import com.azure.spring.messaging.converter.AzureMessageConverter;
import com.azure.spring.messaging.core.SendOperation;
import com.azure.spring.messaging.servicebus.core.DefaultServiceBusNamespaceProcessorFactory;
import com.azure.spring.messaging.servicebus.core.DefaultServiceBusNamespaceProducerFactory;
import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory;
import com.azure.spring.messaging.servicebus.core.ServiceBusProducerFactory;
import com.azure.spring.messaging.servicebus.core.ServiceBusTemplate;
import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer;
import com.azure.spring.messaging.servicebus.core.properties.NamespaceProperties;
import com.azure.spring.messaging.servicebus.core.properties.ProcessorProperties;
import com.azure.spring.messaging.servicebus.core.properties.ProducerProperties;
import com.azure.spring.messaging.servicebus.core.properties.ServiceBusContainerProperties;
import com.azure.spring.messaging.servicebus.implementation.properties.merger.ProcessorPropertiesMerger;
import com.azure.spring.messaging.servicebus.support.converter.ServiceBusMessageConverter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.expression.Expression;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.expression.FunctionExpression;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFutureCallback;

public class ServiceBusMessageChannelBinder
extends AbstractMessageChannelBinder<ExtendedConsumerProperties<ServiceBusConsumerProperties>, ExtendedProducerProperties<ServiceBusProducerProperties>, ServiceBusChannelProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, ServiceBusConsumerProperties, ServiceBusProducerProperties> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusMessageChannelBinder.class);
    private static final DefaultErrorMessageStrategy DEFAULT_ERROR_MESSAGE_STRATEGY = new DefaultErrorMessageStrategy();
    private static final String EXCEPTION_MESSAGE = "exception-message";
    private ServiceBusExtendedBindingProperties bindingProperties = new ServiceBusExtendedBindingProperties();
    private NamespaceProperties namespaceProperties;
    private ServiceBusTemplate serviceBusTemplate;
    private ServiceBusProcessorFactory processorFactory;
    private ServiceBusMessageConverter messageConverter = new ServiceBusMessageConverter();
    private final List<ServiceBusMessageListenerContainer> serviceBusMessageListenerContainers = new ArrayList<ServiceBusMessageListenerContainer>();
    private final InstrumentationManager instrumentationManager = new DefaultInstrumentationManager();
    private final Map<String, ExtendedProducerProperties<ServiceBusProducerProperties>> extendedProducerPropertiesMap = new ConcurrentHashMap<String, ExtendedProducerProperties<ServiceBusProducerProperties>>();
    private final List<ServiceBusProducerFactoryCustomizer> producerFactoryCustomizers = new ArrayList<ServiceBusProducerFactoryCustomizer>();
    private final List<ServiceBusProcessorFactoryCustomizer> processorFactoryCustomizers = new ArrayList<ServiceBusProcessorFactoryCustomizer>();

    public ServiceBusMessageChannelBinder(String[] headersToEmbed, ServiceBusChannelProvisioner provisioningProvider) {
        super(headersToEmbed, (ProvisioningProvider)provisioningProvider);
    }

    protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<ServiceBusProducerProperties> producerProperties, MessageChannel errorChannel) {
        Assert.notNull((Object)this.getServiceBusTemplate(), (String)"ServiceBusTemplate can't be null when create a producer");
        this.extendedProducerPropertiesMap.put(destination.getName(), producerProperties);
        DefaultMessageHandler handler = new DefaultMessageHandler(destination.getName(), (SendOperation)this.serviceBusTemplate);
        handler.setBeanFactory((BeanFactory)this.getBeanFactory());
        handler.setSync(((ServiceBusProducerProperties)producerProperties.getExtension()).isSync());
        handler.setSendTimeout(((ServiceBusProducerProperties)producerProperties.getExtension()).getSendTimeout().toMillis());
        handler.setSendFailureChannel(errorChannel);
        String instrumentationId = Instrumentation.buildId((Instrumentation.Type)Instrumentation.Type.PRODUCER, (String)destination.getName());
        handler.setSendCallback((ListenableFutureCallback)new InstrumentationSendCallback(instrumentationId, this.instrumentationManager));
        if (producerProperties.isPartitioned()) {
            handler.setPartitionKeyExpressionString("'partitionKey-' + headers['scst_partition']");
        } else {
            handler.setPartitionKeyExpression((Expression)new FunctionExpression(m -> m.getPayload().hashCode()));
        }
        return handler;
    }

    protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<ServiceBusConsumerProperties> properties) {
        ServiceBusContainerProperties containerProperties = this.createContainerProperties(destination, group, properties);
        ServiceBusMessageListenerContainer listenerContainer = new ServiceBusMessageListenerContainer(this.getProcessorFactory(), containerProperties);
        this.serviceBusMessageListenerContainers.add(listenerContainer);
        ServiceBusInboundChannelAdapter inboundAdapter = new ServiceBusInboundChannelAdapter(listenerContainer);
        String instrumentationId = Instrumentation.buildId((Instrumentation.Type)Instrumentation.Type.CONSUMER, (String)(destination.getName() + "/" + this.getGroup(group)));
        AbstractMessageChannelBinder.ErrorInfrastructure errorInfrastructure = this.registerErrorInfrastructure(destination, this.getGroup(group), (ConsumerProperties)properties);
        inboundAdapter.setBeanFactory((BeanFactory)this.getBeanFactory());
        inboundAdapter.setInstrumentationManager(this.instrumentationManager);
        inboundAdapter.setInstrumentationId(instrumentationId);
        inboundAdapter.setErrorChannel((MessageChannel)errorInfrastructure.getErrorChannel());
        inboundAdapter.setMessageConverter((AzureMessageConverter)this.messageConverter);
        return inboundAdapter;
    }

    ServiceBusContainerProperties createContainerProperties(ConsumerDestination destination, String group, ExtendedConsumerProperties<ServiceBusConsumerProperties> properties) {
        ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
        AzurePropertiesUtils.copyAzureCommonProperties((AzureProperties)((AzureProperties)properties.getExtension()), (AzureProperties)containerProperties);
        ProcessorPropertiesMerger.copyProcessorPropertiesIfNotNull((ProcessorProperties)((ProcessorProperties)properties.getExtension()), (ProcessorProperties)containerProperties);
        containerProperties.setEntityName(destination.getName());
        containerProperties.setSubscriptionName(group);
        return containerProperties;
    }

    protected MessageHandler getErrorMessageHandler(ConsumerDestination destination, String group, ExtendedConsumerProperties<ServiceBusConsumerProperties> properties) {
        return message -> {
            Assert.state((boolean)(message instanceof ErrorMessage), (String)("Expected an ErrorMessage, not a " + message.getClass().toString() + " for: " + message));
            ErrorMessage errorMessage = (ErrorMessage)message;
            Message amqpMessage = errorMessage.getOriginalMessage();
            if (amqpMessage == null) {
                this.logger.error((Object)("No raw message header in " + message));
            } else {
                Throwable cause = (Throwable)message.getPayload();
                if (((ServiceBusConsumerProperties)properties.getExtension()).isRequeueRejected()) {
                    this.deadLetter(destination.getName(), amqpMessage, EXCEPTION_MESSAGE, cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
                } else {
                    this.abandon(destination.getName(), amqpMessage);
                }
            }
        };
    }

    public <T> void deadLetter(String destination, Message<T> message, String deadLetterReason, String deadLetterErrorDescription) {
        Assert.hasText((String)destination, (String)"destination can't be null or empty");
        ServiceBusReceivedMessageContext messageContext = (ServiceBusReceivedMessageContext)message.getHeaders().get((Object)"azure_service_bus_received_message_context");
        if (messageContext != null) {
            messageContext.deadLetter();
        }
    }

    public <T> void abandon(String destination, Message<T> message) {
        Assert.hasText((String)destination, (String)"destination can't be null or empty");
        ServiceBusReceivedMessageContext messageContext = (ServiceBusReceivedMessageContext)message.getHeaders().get((Object)"azure_service_bus_received_message_context");
        if (messageContext != null) {
            messageContext.abandon();
        }
    }

    public ServiceBusConsumerProperties getExtendedConsumerProperties(String channelName) {
        return (ServiceBusConsumerProperties)this.bindingProperties.getExtendedConsumerProperties(channelName);
    }

    public ServiceBusProducerProperties getExtendedProducerProperties(String channelName) {
        return (ServiceBusProducerProperties)this.bindingProperties.getExtendedProducerProperties(channelName);
    }

    public String getDefaultsPrefix() {
        return this.bindingProperties.getDefaultsPrefix();
    }

    public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
        return this.bindingProperties.getExtendedPropertiesEntryClass();
    }

    protected ErrorMessageStrategy getErrorMessageStrategy() {
        return DEFAULT_ERROR_MESSAGE_STRATEGY;
    }

    public void setBindingProperties(ServiceBusExtendedBindingProperties bindingProperties) {
        this.bindingProperties = bindingProperties;
    }

    private ServiceBusTemplate getServiceBusTemplate() {
        if (this.serviceBusTemplate == null) {
            DefaultServiceBusNamespaceProducerFactory factory = new DefaultServiceBusNamespaceProducerFactory(this.namespaceProperties, this.getProducerPropertiesSupplier());
            this.producerFactoryCustomizers.forEach(customizer -> customizer.customize((ServiceBusProducerFactory)factory));
            factory.addListener((name, client) -> {
                DefaultInstrumentation instrumentation = new DefaultInstrumentation(name, Instrumentation.Type.PRODUCER);
                instrumentation.setStatus(Instrumentation.Status.UP);
                this.instrumentationManager.addHealthInstrumentation((Instrumentation)instrumentation);
            });
            this.serviceBusTemplate = new ServiceBusTemplate((ServiceBusProducerFactory)factory);
        }
        return this.serviceBusTemplate;
    }

    private ServiceBusProcessorFactory getProcessorFactory() {
        if (this.processorFactory == null) {
            this.processorFactory = new DefaultServiceBusNamespaceProcessorFactory(this.namespaceProperties);
            this.processorFactoryCustomizers.forEach(customizer -> customizer.customize(this.processorFactory));
            this.processorFactory.addListener((name, client) -> {
                String subscriptionName = client.getSubscriptionName();
                boolean isTopic = StringUtils.hasText((String)subscriptionName);
                String entityName = isTopic ? client.getTopicName() : client.getQueueName();
                String instrumentationName = entityName + "/" + this.getGroup(subscriptionName);
                ServiceBusProcessorInstrumentation instrumentation = new ServiceBusProcessorInstrumentation(instrumentationName, Instrumentation.Type.CONSUMER, Duration.ofMinutes(2L));
                instrumentation.setStatus(Instrumentation.Status.UP);
                this.instrumentationManager.addHealthInstrumentation((Instrumentation)instrumentation);
            });
        }
        return this.processorFactory;
    }

    private PropertiesSupplier<String, ProducerProperties> getProducerPropertiesSupplier() {
        return key -> {
            if (this.extendedProducerPropertiesMap.containsKey(key)) {
                ServiceBusProducerProperties producerProperties = (ServiceBusProducerProperties)this.extendedProducerPropertiesMap.get(key).getExtension();
                producerProperties.setEntityName(key);
                return producerProperties;
            }
            LOGGER.debug("Can't find extended properties for {}", key);
            return null;
        };
    }

    public void setNamespaceProperties(NamespaceProperties namespaceProperties) {
        this.namespaceProperties = namespaceProperties;
    }

    public void setMessageConverter(ServiceBusMessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    public InstrumentationManager getInstrumentationManager() {
        return this.instrumentationManager;
    }

    private String getGroup(String group) {
        return group != null ? group : "";
    }

    public void addProducerFactoryCustomizer(ServiceBusProducerFactoryCustomizer producerFactoryCustomizer) {
        if (producerFactoryCustomizer != null) {
            this.producerFactoryCustomizers.add(producerFactoryCustomizer);
        }
    }

    public void addProcessorFactoryCustomizer(ServiceBusProcessorFactoryCustomizer processorFactoryCustomizer) {
        if (processorFactoryCustomizer != null) {
            this.processorFactoryCustomizers.add(processorFactoryCustomizer);
        }
    }
}

