/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.zeromq.channel;

import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.mapping.BytesMessageMapper;
import org.springframework.integration.support.json.EmbeddedJsonHeadersMessageMapper;
import org.springframework.integration.zeromq.ZeroMqProxy;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class ZeroMqChannel
extends AbstractMessageChannel
implements SubscribableChannel {
    public static final Duration DEFAULT_CONSUME_DELAY = Duration.ofSeconds(1L);
    private final Map<MessageHandler, Disposable> subscribers = new HashMap<MessageHandler, Disposable>();
    private final Scheduler publisherScheduler = Schedulers.newSingle((String)"publisherScheduler");
    private final Scheduler subscriberScheduler = Schedulers.newSingle((String)"subscriberScheduler");
    private final ZContext context;
    private final boolean pubSub;
    private final Mono<ZMQ.Socket> sendSocket;
    private final Mono<ZMQ.Socket> subscribeSocket;
    private final Flux<? extends Message<?>> subscriberData;
    private Duration consumeDelay = DEFAULT_CONSUME_DELAY;
    private BytesMessageMapper messageMapper = new EmbeddedJsonHeadersMessageMapper();
    private Consumer<ZMQ.Socket> sendSocketConfigurer = socket -> {};
    private Consumer<ZMQ.Socket> subscribeSocketConfigurer = socket -> {};
    @Nullable
    private ZeroMqProxy zeroMqProxy;
    @Nullable
    private volatile String connectSendUrl;
    @Nullable
    private volatile String connectSubscribeUrl;
    @Nullable
    private volatile Disposable subscriberDataDisposable;
    private volatile boolean initialized;

    public ZeroMqChannel(ZContext context) {
        this(context, false);
    }

    public ZeroMqChannel(ZContext context, boolean pubSub) {
        Assert.notNull((Object)context, (String)"'context' must not be null");
        this.context = context;
        this.pubSub = pubSub;
        Supplier<String> localPairConnection = () -> "inproc://" + this.getComponentName() + ".pair";
        Mono<Integer> proxyMono = this.prepareProxyMono();
        this.sendSocket = this.prepareSendSocketMono(localPairConnection, proxyMono);
        this.subscribeSocket = this.prepareSubscribeSocketMono(localPairConnection, proxyMono);
        this.subscriberData = this.prepareSubscriberDataFlux();
    }

    private Mono<Integer> prepareProxyMono() {
        return Mono.defer(() -> {
            if (this.zeroMqProxy != null) {
                return Mono.fromCallable(() -> this.zeroMqProxy.getBackendPort()).filter(proxyPort -> proxyPort > 0).repeatWhenEmpty(100, repeat -> repeat.delayElements(Duration.ofMillis(100L))).doOnNext(proxyPort -> this.setConnectUrl("tcp://localhost:" + this.zeroMqProxy.getFrontendPort() + ":" + this.zeroMqProxy.getBackendPort())).doOnError(error -> this.logger.error(error, () -> "The provided '" + String.valueOf(this.zeroMqProxy) + "' has not been started"));
            }
            return Mono.empty();
        }).cache();
    }

    private Mono<ZMQ.Socket> prepareSendSocketMono(Supplier<String> localPairConnection, Mono<?> proxyMono) {
        return proxyMono.publishOn(this.publisherScheduler).then(Mono.fromCallable(() -> this.context.createSocket(this.connectSendUrl == null ? SocketType.PAIR : (this.pubSub ? SocketType.PUB : SocketType.PUSH)))).doOnNext(socket -> this.sendSocketConfigurer.accept((ZMQ.Socket)socket)).doOnNext(socket -> socket.connect(this.connectSendUrl != null ? this.connectSendUrl : (String)localPairConnection.get())).cache().publishOn(this.publisherScheduler);
    }

    private Mono<ZMQ.Socket> prepareSubscribeSocketMono(Supplier<String> localPairConnection, Mono<?> proxyMono) {
        return proxyMono.publishOn(this.subscriberScheduler).then(Mono.fromCallable(() -> this.context.createSocket(this.connectSubscribeUrl == null ? SocketType.PAIR : (this.pubSub ? SocketType.SUB : SocketType.PULL)))).doOnNext(socket -> this.subscribeSocketConfigurer.accept((ZMQ.Socket)socket)).doOnNext(socket -> {
            if (this.connectSubscribeUrl != null) {
                if (this.pubSub) {
                    socket.subscribe(ZMQ.SUBSCRIPTION_ALL);
                }
                socket.connect(this.connectSubscribeUrl);
            } else {
                socket.bind((String)localPairConnection.get());
            }
        }).cache().publishOn(this.subscriberScheduler);
    }

    private Flux<? extends Message<?>> prepareSubscriberDataFlux() {
        Flux receiveData = this.subscribeSocket.flatMap(socket -> {
            byte[] data;
            if (this.initialized && (data = socket.recv(1)) != null) {
                return Mono.just((Object)data);
            }
            return Mono.empty();
        }).publishOn(Schedulers.parallel()).map(data -> this.messageMapper.toMessage(data)).doOnError(error -> this.logger.error(error, () -> "Error processing ZeroMQ message in the " + String.valueOf((Object)this))).repeatWhenEmpty(repeat -> this.initialized ? repeat.delayElements(this.consumeDelay) : repeat).repeat(() -> this.initialized);
        if (this.pubSub) {
            receiveData = receiveData.publish().autoConnect(1, disposable -> {
                this.subscriberDataDisposable = disposable;
            });
        }
        return receiveData;
    }

    public void setConnectUrl(@Nullable String connectUrl) {
        if (connectUrl != null) {
            this.connectSendUrl = connectUrl.substring(0, connectUrl.lastIndexOf(58));
            this.connectSubscribeUrl = this.connectSendUrl.substring(0, this.connectSendUrl.lastIndexOf(58)) + connectUrl.substring(connectUrl.lastIndexOf(58));
        }
    }

    public void setZeroMqProxy(@Nullable ZeroMqProxy zeroMqProxy) {
        this.zeroMqProxy = zeroMqProxy;
    }

    public void setConsumeDelay(Duration consumeDelay) {
        Assert.notNull((Object)consumeDelay, (String)"'consumeDelay' must not be null");
        this.consumeDelay = consumeDelay;
    }

    public void setMessageMapper(BytesMessageMapper messageMapper) {
        Assert.notNull((Object)messageMapper, (String)"'messageMapper' must not be null");
        this.messageMapper = messageMapper;
    }

    public void setSendSocketConfigurer(Consumer<ZMQ.Socket> sendSocketConfigurer) {
        Assert.notNull(sendSocketConfigurer, (String)"'sendSocketConfigurer' must not be null");
        this.sendSocketConfigurer = sendSocketConfigurer;
    }

    public void setSubscribeSocketConfigurer(Consumer<ZMQ.Socket> subscribeSocketConfigurer) {
        Assert.notNull(subscribeSocketConfigurer, (String)"'subscribeSocketConfigurer' must not be null");
        this.subscribeSocketConfigurer = subscribeSocketConfigurer;
    }

    protected void onInit() {
        Assert.state((this.zeroMqProxy == null || this.connectSendUrl == null ? 1 : 0) != 0, (String)"A 'zeroMqProxy' or 'connectUrl' can be provided (or none), but not both.");
        super.onInit();
        this.sendSocket.subscribe();
        this.initialized = true;
    }

    protected boolean doSend(Message<?> message, long timeout) {
        Assert.state((boolean)this.initialized, (String)"the channel is not initialized yet or already destroyed");
        byte[] data = (byte[])this.messageMapper.fromMessage(message);
        Assert.state((data != null ? 1 : 0) != 0, () -> "The '" + String.valueOf(this.messageMapper) + "' returned null for '" + String.valueOf(message) + "'");
        Mono sendMono = this.sendSocket.map(socket -> socket.send(data));
        Boolean sent = timeout > 0L ? (Boolean)sendMono.block(Duration.ofMillis(timeout)) : (Boolean)sendMono.block();
        return Boolean.TRUE.equals(sent);
    }

    public boolean subscribe(MessageHandler handler) {
        Assert.state((boolean)this.initialized, (String)"the channel is not initialized yet or already destroyed");
        this.subscribers.computeIfAbsent(handler, key -> this.subscriberData.subscribe(arg_0 -> ((MessageHandler)handler).handleMessage(arg_0)));
        return true;
    }

    public boolean unsubscribe(MessageHandler handler) {
        Disposable disposable = this.subscribers.remove(handler);
        if (disposable != null) {
            disposable.dispose();
            return true;
        }
        return false;
    }

    public void destroy() {
        this.initialized = false;
        super.destroy();
        this.sendSocket.doOnNext(ZMQ.Socket::close).block();
        this.publisherScheduler.dispose();
        HashSet<MessageHandler> handlersCopy = new HashSet<MessageHandler>(this.subscribers.keySet());
        handlersCopy.forEach(this::unsubscribe);
        this.subscribeSocket.doOnNext(ZMQ.Socket::close).block();
        this.subscriberScheduler.dispose();
        if (this.subscriberDataDisposable != null) {
            this.subscriberDataDisposable.dispose();
        }
    }
}

