package org.springframework.integration.zeromq.inbound;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mapping.ConvertingBytesMessageMapper;
import org.springframework.integration.mapping.InboundMessageMapper;
import org.springframework.integration.support.converter.ConfigurableCompositeMessageConverter;
import org.springframework.integration.support.management.IntegrationManagedResource;
import org.springframework.integration.zeromq.ZeroMqHeaders;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.Assert;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@IntegrationManagedResource
@ManagedResource
/* loaded from: input_file:org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.class */
public class ZeroMqMessageProducer extends MessageProducerSupport {
    public static final Duration DEFAULT_CONSUME_DELAY = Duration.ofSeconds(1);
    private static final List<SocketType> VALID_SOCKET_TYPES = Arrays.asList(SocketType.PAIR, SocketType.PULL, SocketType.SUB);
    private final Scheduler consumerScheduler;
    private final AtomicInteger bindPort;
    private final ZContext context;
    private final SocketType socketType;
    private InboundMessageMapper<byte[]> messageMapper;
    private Consumer<ZMQ.Socket> socketConfigurer;
    private Duration consumeDelay;
    private String[] topics;
    private boolean receiveRaw;

    @Nullable
    private String connectUrl;
    private volatile Mono<ZMQ.Socket> socketMono;

    public ZeroMqMessageProducer(ZContext zContext) {
        this(zContext, SocketType.PAIR);
    }

    public ZeroMqMessageProducer(ZContext zContext, SocketType socketType) {
        this.consumerScheduler = Schedulers.newSingle("zeroMqMessageProducerScheduler");
        this.bindPort = new AtomicInteger();
        this.socketConfigurer = socket -> {
        };
        this.consumeDelay = DEFAULT_CONSUME_DELAY;
        this.topics = new String[]{""};
        Assert.notNull(zContext, "'context' must not be null");
        Assert.state(VALID_SOCKET_TYPES.contains(socketType), () -> {
            return "'socketType' can only be one of the: " + VALID_SOCKET_TYPES;
        });
        this.context = zContext;
        this.socketType = socketType;
    }

    public void setConsumeDelay(Duration duration) {
        Assert.notNull(duration, "'consumeDelay' must not be null");
        this.consumeDelay = duration;
    }

    public void setMessageMapper(InboundMessageMapper<byte[]> inboundMessageMapper) {
        Assert.notNull(inboundMessageMapper, "'messageMapper' must not be null");
        this.messageMapper = inboundMessageMapper;
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        setMessageMapper(new ConvertingBytesMessageMapper(messageConverter));
    }

    public void setReceiveRaw(boolean z) {
        this.receiveRaw = z;
    }

    public void setSocketConfigurer(Consumer<ZMQ.Socket> consumer) {
        Assert.notNull(consumer, "'socketConfigurer' must not be null");
        this.socketConfigurer = consumer;
    }

    public void setTopics(String... strArr) {
        Assert.notNull(strArr, "'topics' cannot be null");
        Assert.noNullElements(strArr, "'topics' must not contain null elements");
        this.topics = (String[]) Arrays.copyOf(strArr, strArr.length);
    }

    public void setConnectUrl(@Nullable String str) {
        this.connectUrl = str;
    }

    public void setBindPort(int i) {
        Assert.isTrue(i > 0, "'port' must not be zero or negative");
        this.bindPort.set(i);
    }

    public int getBoundPort() {
        return this.bindPort.get();
    }

    public String getComponentType() {
        return "zeromq:inbound-channel-adapter";
    }

    protected void onInit() {
        super.onInit();
        Assert.state(this.connectUrl == null || this.bindPort.get() == 0, "Only one of the 'connectUrl' or `bindPort` must be provided on none");
        if (this.messageMapper != null || this.receiveRaw) {
            return;
        }
        ConfigurableCompositeMessageConverter configurableCompositeMessageConverter = new ConfigurableCompositeMessageConverter();
        configurableCompositeMessageConverter.setBeanFactory(getBeanFactory());
        configurableCompositeMessageConverter.afterPropertiesSet();
        this.messageMapper = new ConvertingBytesMessageMapper(configurableCompositeMessageConverter);
    }

    @ManagedOperation
    public void subscribeToTopics(String... strArr) {
        Assert.state(SocketType.SUB.equals(this.socketType), "Only SUB socket can accept a subscription option.");
        Assert.state(isActive(), "This message producer is not active to accept a new subscription.");
        Flux.fromArray(strArr).flatMap(str -> {
            return this.socketMono.doOnNext(socket -> {
                socket.subscribe(str);
            });
        }).subscribe();
    }

    @ManagedOperation
    public void unsubscribeFromTopics(String... strArr) {
        Assert.state(SocketType.SUB.equals(this.socketType), "Only SUB socket can accept a unsubscription option.");
        Assert.state(isActive(), "This message producer is not active to cancel a subscription.");
        Flux.fromArray(strArr).flatMap(str -> {
            return this.socketMono.doOnNext(socket -> {
                socket.unsubscribe(str);
            });
        }).subscribe();
    }

    protected void doStart() {
        this.socketMono = Mono.just(this.context.createSocket(this.socketType)).publishOn(this.consumerScheduler).doOnNext(this.socketConfigurer).doOnNext(socket -> {
            if (SocketType.SUB.equals(this.socketType)) {
                for (String str : this.topics) {
                    socket.subscribe(str);
                }
            }
        }).doOnNext(socket2 -> {
            if (this.connectUrl != null) {
                socket2.connect(this.connectUrl);
            } else {
                this.bindPort.set(bindSocket(socket2, this.bindPort.get()));
            }
        }).cache().publishOn(this.consumerScheduler);
        Flux repeat = this.socketMono.flatMap(socket3 -> {
            ZMsg recvMsg;
            return (!isRunning() || (recvMsg = ZMsg.recvMsg(socket3, false)) == null) ? Mono.empty() : Mono.just(recvMsg);
        }).publishOn(Schedulers.boundedElastic()).transform(mono -> {
            return this.receiveRaw ? mapRaw(mono) : convertMessage(mono);
        }).doOnError(th -> {
            this.logger.error(th, () -> {
                return "Error processing ZeroMQ message in the " + this;
            });
        }).repeatWhenEmpty(flux -> {
            return isActive() ? flux.delayElements(this.consumeDelay) : flux;
        }).repeat(this::isActive);
        Scheduler scheduler = this.consumerScheduler;
        Objects.requireNonNull(scheduler);
        subscribeToPublisher(repeat.doOnComplete(scheduler::dispose));
    }

    private Mono<Message<?>> mapRaw(Mono<ZMsg> mono) {
        return mono.map(zMsg -> {
            return getMessageBuilderFactory().withPayload(zMsg).build();
        });
    }

    private Mono<Message<?>> convertMessage(Mono<ZMsg> mono) {
        return mono.map(zMsg -> {
            Map map = null;
            if (zMsg.size() > 1) {
                map = Collections.singletonMap(ZeroMqHeaders.TOPIC, zMsg.unwrap().getString(ZMQ.CHARSET));
            }
            return this.messageMapper.toMessage(zMsg.getLast().getData(), map);
        });
    }

    protected void doStop() {
        this.socketMono.doOnNext((v0) -> {
            v0.close();
        }).subscribe();
    }

    public void destroy() {
        super.destroy();
        this.socketMono.doOnNext((v0) -> {
            v0.close();
        }).block();
    }

    private static int bindSocket(ZMQ.Socket socket, int i) {
        if (i == 0) {
            return socket.bindToRandomPort("tcp://*");
        }
        if (socket.bind("tcp://*:" + i)) {
            return i;
        }
        throw new IllegalArgumentException("Cannot bind ZeroMQ socket to port: " + i);
    }
}
