package org.springframework.rabbit.stream.listener;

import com.rabbitmq.client.Channel;
import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.ConsumerBuilder;
import com.rabbitmq.stream.Environment;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.lang.Nullable;
import org.springframework.rabbit.stream.support.StreamMessageProperties;
import org.springframework.rabbit.stream.support.converter.DefaultStreamMessageConverter;
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/rabbit/stream/listener/StreamListenerContainer.class */
public class StreamListenerContainer implements MessageListenerContainer, BeanNameAware {
    protected Log logger;
    private final ConsumerBuilder builder;
    private StreamMessageConverter streamConverter;
    private ConsumerCustomizer consumerCustomizer;
    private Consumer consumer;
    private String listenerId;
    private String beanName;
    private boolean autoStartup;
    private MessageListener messageListener;

    public StreamListenerContainer(Environment environment) {
        this(environment, null);
    }

    public StreamListenerContainer(Environment environment, @Nullable Codec codec) {
        this.logger = LogFactory.getLog(getClass());
        this.consumerCustomizer = (str, consumerBuilder) -> {
        };
        this.autoStartup = true;
        Assert.notNull(environment, "'environment' cannot be null");
        this.builder = environment.consumerBuilder();
        this.streamConverter = new DefaultStreamMessageConverter(codec);
    }

    public void setQueueNames(String... strArr) {
        Assert.isTrue(strArr != null && strArr.length == 1, "Only one stream is supported");
        this.builder.stream(strArr[0]);
    }

    public StreamMessageConverter getStreamConverter() {
        return this.streamConverter;
    }

    public void setStreamConverter(StreamMessageConverter streamMessageConverter) {
        Assert.notNull(streamMessageConverter, "'messageConverter' cannot be null");
        this.streamConverter = streamMessageConverter;
    }

    public synchronized void setConsumerCustomizer(ConsumerCustomizer consumerCustomizer) {
        Assert.notNull(consumerCustomizer, "'consumerCustomizer' cannot be null");
        this.consumerCustomizer = consumerCustomizer;
    }

    @Nullable
    public String getListenerId() {
        return this.listenerId != null ? this.listenerId : this.beanName;
    }

    public void setListenerId(String str) {
        this.listenerId = str;
    }

    @Nullable
    public String getBeanName() {
        return this.beanName;
    }

    public void setBeanName(String str) {
        this.beanName = str;
    }

    public void setAutoStartup(boolean z) {
        this.autoStartup = z;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    @Nullable
    public Object getMessageListener() {
        return this.messageListener;
    }

    public synchronized boolean isRunning() {
        return this.consumer != null;
    }

    public synchronized void start() {
        if (this.consumer == null) {
            this.consumerCustomizer.accept(getListenerId(), this.builder);
            this.consumer = this.builder.build();
        }
    }

    public synchronized void stop() {
        if (this.consumer != null) {
            this.consumer.close();
            this.consumer = null;
        }
    }

    public void setupMessageListener(MessageListener messageListener) {
        this.messageListener = messageListener;
        this.builder.messageHandler((context, message) -> {
            if (messageListener instanceof StreamMessageListener) {
                ((StreamMessageListener) messageListener).onStreamMessage(message, context);
                return;
            }
            Message message = this.streamConverter.toMessage((Object) message, new StreamMessageProperties(context));
            if (!(messageListener instanceof ChannelAwareMessageListener)) {
                messageListener.onMessage(message);
                return;
            }
            try {
                ((ChannelAwareMessageListener) messageListener).onMessage(message, (Channel) null);
            } catch (Exception e) {
                this.logger.error("Listner threw an exception", e);
            }
        });
    }
}
