/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.rabbit.stream.producer;

import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.MessageBuilder;
import com.rabbitmq.stream.Producer;
import com.rabbitmq.stream.ProducerBuilder;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.rabbit.stream.micrometer.RabbitStreamMessageSenderContext;
import org.springframework.rabbit.stream.micrometer.RabbitStreamTemplateObservation;
import org.springframework.rabbit.stream.micrometer.RabbitStreamTemplateObservationConvention;
import org.springframework.rabbit.stream.producer.ProducerCustomizer;
import org.springframework.rabbit.stream.producer.RabbitStreamOperations;
import org.springframework.rabbit.stream.producer.StreamSendException;
import org.springframework.rabbit.stream.support.StreamMessageProperties;
import org.springframework.rabbit.stream.support.converter.DefaultStreamMessageConverter;
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
import org.springframework.util.Assert;

public class RabbitStreamTemplate
implements RabbitStreamOperations,
ApplicationContextAware,
BeanNameAware {
    protected final LogAccessor logger = new LogAccessor(this.getClass());
    private final Lock lock = new ReentrantLock();
    private ApplicationContext applicationContext;
    private final Environment environment;
    private final String streamName;
    private Function<Message, String> superStreamRouting;
    private MessageConverter messageConverter = new SimpleMessageConverter();
    private StreamMessageConverter streamConverter = new DefaultStreamMessageConverter();
    private boolean streamConverterSet;
    private String beanName;
    private ProducerCustomizer producerCustomizer = (name, builder) -> {};
    private boolean observationEnabled;
    @Nullable
    private RabbitStreamTemplateObservationConvention observationConvention;
    private ObservationRegistry observationRegistry;
    private volatile Producer producer;
    private volatile boolean observationRegistryObtained;

    public RabbitStreamTemplate(Environment environment, String streamName) {
        Assert.notNull((Object)environment, (String)"'environment' cannot be null");
        Assert.notNull((Object)streamName, (String)"'streamName' cannot be null");
        this.environment = environment;
        this.streamName = streamName;
    }

    private Producer createOrGetProducer() {
        if (this.producer == null) {
            this.lock.lock();
            try {
                if (this.producer == null) {
                    ProducerBuilder builder = this.environment.producerBuilder();
                    if (this.superStreamRouting == null) {
                        builder.stream(this.streamName);
                    } else {
                        builder.superStream(this.streamName).routing(this.superStreamRouting);
                    }
                    this.producerCustomizer.accept(this.beanName, builder);
                    this.producer = builder.build();
                    if (!this.streamConverterSet) {
                        ((DefaultStreamMessageConverter)this.streamConverter).setBuilderSupplier(() -> this.producer.messageBuilder());
                    }
                }
            }
            finally {
                this.lock.unlock();
            }
        }
        return this.producer;
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public void setBeanName(String name) {
        this.lock.lock();
        try {
            this.beanName = name;
        }
        finally {
            this.lock.unlock();
        }
    }

    public void setSuperStreamRouting(Function<Message, String> superStreamRouting) {
        this.lock.lock();
        try {
            this.superStreamRouting = superStreamRouting;
        }
        finally {
            this.lock.unlock();
        }
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        Assert.notNull((Object)messageConverter, (String)"'messageConverter' cannot be null");
        this.messageConverter = messageConverter;
    }

    public void setStreamConverter(StreamMessageConverter streamConverter) {
        Assert.notNull((Object)streamConverter, (String)"'streamConverter' cannot be null");
        this.lock.lock();
        try {
            this.streamConverter = streamConverter;
            this.streamConverterSet = true;
        }
        finally {
            this.lock.unlock();
        }
    }

    public void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
        Assert.notNull((Object)producerCustomizer, (String)"'producerCustomizer' cannot be null");
        this.lock.lock();
        try {
            this.producerCustomizer = producerCustomizer;
        }
        finally {
            this.lock.unlock();
        }
    }

    public void setObservationEnabled(boolean observationEnabled) {
        this.observationEnabled = observationEnabled;
    }

    @Override
    public MessageConverter messageConverter() {
        return this.messageConverter;
    }

    @Override
    public StreamMessageConverter streamMessageConverter() {
        return this.streamConverter;
    }

    @Override
    public CompletableFuture<Boolean> send(org.springframework.amqp.core.Message message) {
        CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        this.observeSend(this.streamConverter.fromMessage(message), future);
        return future;
    }

    @Override
    public CompletableFuture<Boolean> convertAndSend(Object message) {
        return this.convertAndSend(message, null);
    }

    @Override
    public CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp) {
        org.springframework.amqp.core.Message message2 = this.messageConverter.toMessage(message, (MessageProperties)new StreamMessageProperties());
        Assert.notNull((Object)message2, (String)"The message converter returned null");
        if (mpp != null && (message2 = mpp.postProcessMessage(message2)) == null) {
            this.logger.debug((CharSequence)"Message Post Processor returned null, message not sent");
            CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
            future.complete(false);
            return future;
        }
        return this.send(message2);
    }

    @Override
    public CompletableFuture<Boolean> send(Message message) {
        CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        this.observeSend(message, future);
        return future;
    }

    private void observeSend(Message message, CompletableFuture<Boolean> future) {
        Observation observation = RabbitStreamTemplateObservation.STREAM_TEMPLATE_OBSERVATION.observation(this.observationConvention, RabbitStreamTemplateObservation.DefaultRabbitStreamTemplateObservationConvention.INSTANCE, () -> new RabbitStreamMessageSenderContext(message, this.beanName, this.streamName), this.obtainObservationRegistry());
        observation.start();
        try {
            this.createOrGetProducer().send(message, this.handleConfirm(future, observation));
        }
        catch (Exception ex) {
            observation.error((Throwable)ex);
            observation.stop();
            future.completeExceptionally(ex);
        }
    }

    @Nullable
    private ObservationRegistry obtainObservationRegistry() {
        if (!this.observationRegistryObtained && this.observationEnabled) {
            if (this.applicationContext != null) {
                ObjectProvider registry = this.applicationContext.getBeanProvider(ObservationRegistry.class);
                this.observationRegistry = (ObservationRegistry)registry.getIfUnique();
            }
            this.observationRegistryObtained = true;
        }
        return this.observationRegistry;
    }

    @Override
    public MessageBuilder messageBuilder() {
        return this.createOrGetProducer().messageBuilder();
    }

    private ConfirmationHandler handleConfirm(CompletableFuture<Boolean> future, Observation observation) {
        return confStatus -> {
            if (confStatus.isConfirmed()) {
                future.complete(true);
                observation.stop();
            } else {
                short code = confStatus.getCode();
                Object errorMessage = switch (code) {
                    case 10001 -> "Message Enqueueing Failed";
                    case 10003 -> "Producer Closed";
                    case 10002 -> "Producer Not Available";
                    case 10004 -> "Publish Confirm Timeout";
                    default -> "Unknown code: " + code;
                };
                StreamSendException ex = new StreamSendException((String)errorMessage, code);
                observation.error((Throwable)((Object)ex));
                observation.stop();
                future.completeExceptionally((Throwable)((Object)ex));
            }
        };
    }

    @Override
    public void close() {
        if (this.producer != null) {
            this.lock.lock();
            try {
                if (this.producer != null) {
                    this.producer.close();
                    this.producer = null;
                }
            }
            finally {
                this.lock.unlock();
            }
        }
    }
}

