package org.springframework.rabbit.stream.producer;

import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.MessageBuilder;
import com.rabbitmq.stream.Producer;
import com.rabbitmq.stream.ProducerBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.rabbit.stream.support.StreamMessageProperties;
import org.springframework.rabbit.stream.support.converter.DefaultStreamMessageConverter;
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;

/* loaded from: input_file:org/springframework/rabbit/stream/producer/RabbitStreamTemplate.class */
public class RabbitStreamTemplate implements RabbitStreamOperations, BeanNameAware {
    private final Environment environment;
    private final String streamName;
    private boolean streamConverterSet;
    private Producer producer;
    private String beanName;
    protected final LogAccessor logger = new LogAccessor(getClass());
    private MessageConverter messageConverter = new SimpleMessageConverter();
    private StreamMessageConverter streamConverter = new DefaultStreamMessageConverter();
    private ProducerCustomizer producerCustomizer = (str, producerBuilder) -> {
    };

    public RabbitStreamTemplate(Environment environment, String str) {
        Assert.notNull(environment, "'environment' cannot be null");
        Assert.notNull(str, "'streamName' cannot be null");
        this.environment = environment;
        this.streamName = str;
    }

    private synchronized Producer createOrGetProducer() {
        if (this.producer == null) {
            ProducerBuilder producerBuilder = this.environment.producerBuilder();
            producerBuilder.stream(this.streamName);
            this.producerCustomizer.accept(this.beanName, producerBuilder);
            this.producer = producerBuilder.build();
            if (!this.streamConverterSet) {
                ((DefaultStreamMessageConverter) this.streamConverter).setBuilderSupplier(() -> {
                    return this.producer.messageBuilder();
                });
            }
        }
        return this.producer;
    }

    public synchronized void setBeanName(String str) {
        this.beanName = str;
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        Assert.notNull(messageConverter, "'messageConverter' cannot be null");
        this.messageConverter = messageConverter;
    }

    public synchronized void setStreamConverter(StreamMessageConverter streamMessageConverter) {
        Assert.notNull(streamMessageConverter, "'streamConverter' cannot be null");
        this.streamConverter = streamMessageConverter;
        this.streamConverterSet = true;
    }

    public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
        Assert.notNull(producerCustomizer, "'producerCustomizer' cannot be null");
        this.producerCustomizer = producerCustomizer;
    }

    @Override // org.springframework.rabbit.stream.producer.RabbitStreamOperations
    public MessageConverter messageConverter() {
        return this.messageConverter;
    }

    @Override // org.springframework.rabbit.stream.producer.RabbitStreamOperations
    public StreamMessageConverter streamMessageConverter() {
        return this.streamConverter;
    }

    @Override // org.springframework.rabbit.stream.producer.RabbitStreamOperations
    public ListenableFuture<Boolean> send(Message message) {
        SettableListenableFuture<Boolean> settableListenableFuture = new SettableListenableFuture<>();
        createOrGetProducer().send(this.streamConverter.mo1fromMessage(message), handleConfirm(settableListenableFuture));
        return settableListenableFuture;
    }

    @Override // org.springframework.rabbit.stream.producer.RabbitStreamOperations
    public ListenableFuture<Boolean> convertAndSend(Object obj) {
        return convertAndSend(obj, null);
    }

    @Override // org.springframework.rabbit.stream.producer.RabbitStreamOperations
    public ListenableFuture<Boolean> convertAndSend(Object obj, @Nullable MessagePostProcessor messagePostProcessor) {
        Message message = this.messageConverter.toMessage(obj, new StreamMessageProperties());
        Assert.notNull(message, "The message converter returned null");
        if (messagePostProcessor != null) {
            message = messagePostProcessor.postProcessMessage(message);
            if (message == null) {
                this.logger.debug("Message Post Processor returned null, message not sent");
                SettableListenableFuture settableListenableFuture = new SettableListenableFuture();
                settableListenableFuture.set(false);
                return settableListenableFuture;
            }
        }
        return send(message);
    }

    @Override // org.springframework.rabbit.stream.producer.RabbitStreamOperations
    public ListenableFuture<Boolean> send(com.rabbitmq.stream.Message message) {
        SettableListenableFuture<Boolean> settableListenableFuture = new SettableListenableFuture<>();
        createOrGetProducer().send(message, handleConfirm(settableListenableFuture));
        return settableListenableFuture;
    }

    @Override // org.springframework.rabbit.stream.producer.RabbitStreamOperations
    public MessageBuilder messageBuilder() {
        return createOrGetProducer().messageBuilder();
    }

    private ConfirmationHandler handleConfirm(SettableListenableFuture<Boolean> settableListenableFuture) {
        return confirmationStatus -> {
            String str;
            if (confirmationStatus.isConfirmed()) {
                settableListenableFuture.set(true);
                return;
            }
            short code = confirmationStatus.getCode();
            switch (code) {
                case 10001:
                    str = "Message Enqueueing Failed";
                    break;
                case 10002:
                    str = "Producer Not Available";
                    break;
                case 10003:
                    str = "Producer Closed";
                    break;
                case 10004:
                    str = "Publish Confirm Timeout";
                    break;
                default:
                    str = "Unknown code: " + ((int) code);
                    break;
            }
            settableListenableFuture.setException(new StreamSendException(str, code));
        };
    }

    @Override // org.springframework.rabbit.stream.producer.RabbitStreamOperations, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.producer != null) {
            this.producer.close();
            this.producer = null;
        }
    }
}
