/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.zeromq.outbound;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.jspecify.annotations.Nullable;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.expression.SupplierExpression;
import org.springframework.integration.handler.AbstractReactiveMessageHandler;
import org.springframework.integration.mapping.ConvertingBytesMessageMapper;
import org.springframework.integration.mapping.OutboundMessageMapper;
import org.springframework.integration.support.converter.ConfigurableCompositeMessageConverter;
import org.springframework.integration.support.management.ManageableLifecycle;
import org.springframework.integration.zeromq.ZeroMqUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.Assert;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import zmq.socket.pubsub.Pub;

public class ZeroMqMessageHandler
extends AbstractReactiveMessageHandler
implements ManageableLifecycle {
    private static final List<SocketType> VALID_SOCKET_TYPES = List.of(SocketType.PAIR, SocketType.PUSH, SocketType.PUB);
    private final AtomicBoolean running = new AtomicBoolean();
    private final Scheduler publisherScheduler = Schedulers.newSingle((String)"zeroMqMessageHandlerScheduler");
    private volatile Mono<ZMQ.Socket> socketMono;
    private OutboundMessageMapper<byte[]> messageMapper;
    private Consumer<ZMQ.Socket> socketConfigurer = socket -> {};
    private Expression topicExpression = new SupplierExpression(() -> null);
    private EvaluationContext evaluationContext;
    private volatile boolean initialized;
    private volatile Disposable socketMonoSubscriber;
    private volatile boolean wrapTopic = true;
    private final ZContext context;
    private final SocketType socketType;
    private final AtomicInteger bindPort = new AtomicInteger();
    private @Nullable Supplier<String> connectUrl;

    public ZeroMqMessageHandler(ZContext context) {
        this(context, SocketType.PAIR);
    }

    public ZeroMqMessageHandler(ZContext context, SocketType socketType) {
        Assert.notNull((Object)context, (String)"'context' must not be null");
        Assert.state((boolean)VALID_SOCKET_TYPES.contains(socketType), () -> "'socketType' can only be one of the: " + String.valueOf(VALID_SOCKET_TYPES));
        this.context = context;
        this.socketType = socketType;
    }

    public ZeroMqMessageHandler(ZContext context, String connectUrl) {
        this(context, connectUrl, SocketType.PAIR);
    }

    public ZeroMqMessageHandler(ZContext context, int port) {
        this(context, port, SocketType.PAIR);
    }

    public ZeroMqMessageHandler(ZContext context, Supplier<String> connectUrl) {
        this(context, connectUrl, SocketType.PAIR);
    }

    public ZeroMqMessageHandler(ZContext context, String connectUrl, SocketType socketType) {
        this(context, () -> connectUrl, socketType);
        Assert.hasText((String)connectUrl, (String)"'connectUrl' must not be empty");
    }

    public ZeroMqMessageHandler(ZContext context, int port, SocketType socketType) {
        this(context, socketType);
        Assert.isTrue((port > 0 ? 1 : 0) != 0, (String)"'port' must not be zero or negative");
        this.bindPort.set(port);
    }

    public ZeroMqMessageHandler(ZContext context, Supplier<String> connectUrl, SocketType socketType) {
        this(context, socketType);
        Assert.notNull(connectUrl, (String)"'connectUrl' must not be null");
        this.connectUrl = connectUrl;
    }

    public void setMessageMapper(OutboundMessageMapper<byte[]> messageMapper) {
        Assert.notNull(messageMapper, (String)"'messageMapper' must not be null");
        this.messageMapper = messageMapper;
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        this.setMessageMapper((OutboundMessageMapper<byte[]>)new ConvertingBytesMessageMapper(messageConverter));
    }

    public void setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) {
        Assert.notNull(socketConfigurer, (String)"'socketConfigurer' must not be null");
        this.socketConfigurer = socketConfigurer;
    }

    public void setTopic(String topic) {
        this.setTopicExpression((Expression)new LiteralExpression(topic));
    }

    public void setTopicExpression(Expression topicExpression) {
        Assert.notNull((Object)topicExpression, (String)"'topicExpression' must not be null");
        this.topicExpression = topicExpression;
    }

    public void wrapTopic(boolean wrapTopic) {
        this.wrapTopic = wrapTopic;
    }

    public int getBoundPort() {
        return this.bindPort.get();
    }

    public String getComponentType() {
        return "zeromq:outbound-channel-adapter";
    }

    protected void onInit() {
        super.onInit();
        BeanFactory beanFactory = this.getBeanFactory();
        this.evaluationContext = ExpressionUtils.createStandardEvaluationContext((BeanFactory)beanFactory);
        if (this.messageMapper == null) {
            ConfigurableCompositeMessageConverter messageConverter = new ConfigurableCompositeMessageConverter();
            messageConverter.setBeanFactory(beanFactory);
            messageConverter.afterPropertiesSet();
            this.messageMapper = new ConvertingBytesMessageMapper((MessageConverter)messageConverter);
        }
        this.initialized = true;
    }

    public void start() {
        if (!this.running.getAndSet(true)) {
            this.socketMono = Mono.just((Object)this.context.createSocket(this.socketType)).publishOn(this.publisherScheduler).doOnNext(socket -> this.socketConfigurer.accept((ZMQ.Socket)socket)).doOnNext(socket -> {
                if (this.connectUrl != null) {
                    socket.connect(this.connectUrl.get());
                } else {
                    this.bindPort.set(ZeroMqUtils.bindSocket(socket, this.bindPort.get()));
                }
            }).cache().publishOn(this.publisherScheduler);
            this.socketMonoSubscriber = this.socketMono.subscribe();
        }
    }

    public void stop() {
        if (this.running.getAndSet(false)) {
            this.socketMonoSubscriber.dispose();
        }
    }

    public boolean isRunning() {
        return this.running.get();
    }

    protected Mono<Void> handleMessageInternal(Message<?> message) {
        Assert.state((boolean)this.initialized, (String)"the message handler is not initialized yet or already destroyed");
        return this.socketMono.doOnNext(socket -> {
            ZMsg msg;
            if (message.getPayload() instanceof ZMsg) {
                msg = (ZMsg)message.getPayload();
            } else {
                String topic;
                msg = new ZMsg();
                msg.add((byte[])this.messageMapper.fromMessage(message));
                if (socket.base() instanceof Pub && (topic = (String)this.topicExpression.getValue(this.evaluationContext, (Object)message, String.class)) != null) {
                    ZFrame topicFrame = new ZFrame(topic);
                    if (this.wrapTopic) {
                        msg.wrap(topicFrame);
                    } else {
                        msg.push(topicFrame);
                    }
                }
            }
            msg.send(socket, false);
        }).then();
    }

    public void destroy() {
        this.initialized = false;
        super.destroy();
        this.socketMono.doOnNext(ZMQ.Socket::close).block();
        this.socketMonoSubscriber.dispose();
        this.publisherScheduler.dispose();
    }
}

