package org.springframework.integration.zeromq.outbound;

import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.expression.SupplierExpression;
import org.springframework.integration.handler.AbstractReactiveMessageHandler;
import org.springframework.integration.mapping.ConvertingBytesMessageMapper;
import org.springframework.integration.mapping.OutboundMessageMapper;
import org.springframework.integration.support.converter.ConfigurableCompositeMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.Assert;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import zmq.socket.pubsub.Pub;

/* loaded from: input_file:org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.class */
public class ZeroMqMessageHandler extends AbstractReactiveMessageHandler {
    private static final List<SocketType> VALID_SOCKET_TYPES = Arrays.asList(SocketType.PAIR, SocketType.PUSH, SocketType.PUB);
    private final Scheduler publisherScheduler;
    private final Mono<ZMQ.Socket> socketMono;
    private OutboundMessageMapper<byte[]> messageMapper;
    private Consumer<ZMQ.Socket> socketConfigurer;
    private Expression topicExpression;
    private EvaluationContext evaluationContext;
    private volatile boolean initialized;

    public ZeroMqMessageHandler(ZContext zContext, String str) {
        this(zContext, str, SocketType.PAIR);
    }

    public ZeroMqMessageHandler(ZContext zContext, String str, SocketType socketType) {
        this.publisherScheduler = Schedulers.newSingle("zeroMqMessageHandlerScheduler");
        this.socketConfigurer = socket -> {
        };
        this.topicExpression = new SupplierExpression(() -> {
            return null;
        });
        Assert.notNull(zContext, "'context' must not be null");
        Assert.hasText(str, "'connectUrl' must not be empty");
        Assert.state(VALID_SOCKET_TYPES.contains(socketType), () -> {
            return "'socketType' can only be one of the: " + VALID_SOCKET_TYPES;
        });
        this.socketMono = Mono.just(zContext.createSocket(socketType)).publishOn(this.publisherScheduler).doOnNext(socket2 -> {
            this.socketConfigurer.accept(socket2);
        }).doOnNext(socket3 -> {
            socket3.connect(str);
        }).cache().publishOn(this.publisherScheduler);
    }

    public void setMessageMapper(OutboundMessageMapper<byte[]> outboundMessageMapper) {
        Assert.notNull(outboundMessageMapper, "'messageMapper' must not be null");
        this.messageMapper = outboundMessageMapper;
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        setMessageMapper(new ConvertingBytesMessageMapper(messageConverter));
    }

    public void setSocketConfigurer(Consumer<ZMQ.Socket> consumer) {
        Assert.notNull(consumer, "'socketConfigurer' must not be null");
        this.socketConfigurer = consumer;
    }

    public void setTopic(String str) {
        setTopicExpression(new LiteralExpression(str));
    }

    public void setTopicExpression(Expression expression) {
        Assert.notNull(expression, "'topicExpression' must not be null");
        this.topicExpression = expression;
    }

    public String getComponentType() {
        return "zeromq:outbound-channel-adapter";
    }

    protected void onInit() {
        super.onInit();
        BeanFactory beanFactory = getBeanFactory();
        this.evaluationContext = ExpressionUtils.createSimpleEvaluationContext(beanFactory);
        if (this.messageMapper == null) {
            ConfigurableCompositeMessageConverter configurableCompositeMessageConverter = new ConfigurableCompositeMessageConverter();
            configurableCompositeMessageConverter.setBeanFactory(beanFactory);
            configurableCompositeMessageConverter.afterPropertiesSet();
            this.messageMapper = new ConvertingBytesMessageMapper(configurableCompositeMessageConverter);
        }
        this.socketMono.subscribe();
        this.initialized = true;
    }

    protected Mono<Void> handleMessageInternal(Message<?> message) {
        Assert.state(this.initialized, "the message handler is not initialized yet or already destroyed");
        return this.socketMono.doOnNext(socket -> {
            ZMsg zMsg;
            String str;
            if (message.getPayload() instanceof ZMsg) {
                zMsg = (ZMsg) message.getPayload();
            } else {
                zMsg = new ZMsg();
                zMsg.add((byte[]) this.messageMapper.fromMessage(message));
                if ((socket.base() instanceof Pub) && (str = (String) this.topicExpression.getValue(this.evaluationContext, message, String.class)) != null) {
                    zMsg.wrap(new ZFrame(str));
                }
            }
            zMsg.send(socket, false);
        }).then();
    }

    public void destroy() {
        this.initialized = false;
        super.destroy();
        this.socketMono.doOnNext((v0) -> {
            v0.close();
        }).block();
        this.publisherScheduler.dispose();
    }
}
