/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.core.eventbus.impl.clustered;

import io.vertx.core.Completable;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.VertxOptions;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBusOptions;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.impl.CodecManager;
import io.vertx.core.eventbus.impl.EventBusImpl;
import io.vertx.core.eventbus.impl.HandlerHolder;
import io.vertx.core.eventbus.impl.HandlerRegistration;
import io.vertx.core.eventbus.impl.MessageImpl;
import io.vertx.core.eventbus.impl.clustered.ClusteredHandlerHolder;
import io.vertx.core.eventbus.impl.clustered.ClusteredMessage;
import io.vertx.core.eventbus.impl.clustered.InboundConnection;
import io.vertx.core.eventbus.impl.clustered.NodeSelector;
import io.vertx.core.eventbus.impl.clustered.OutboundConnection;
import io.vertx.core.impl.utils.ConcurrentCyclicSequence;
import io.vertx.core.internal.CloseFuture;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.internal.net.NetSocketInternal;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.NetClientBuilder;
import io.vertx.core.net.impl.NetServerInternal;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeInfo;
import io.vertx.core.spi.cluster.RegistrationInfo;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.core.spi.metrics.VertxMetrics;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

public final class ClusteredEventBus
extends EventBusImpl {
    private static final Logger log = LoggerFactory.getLogger(ClusteredEventBus.class);
    private final EventBusOptions options;
    private final ClusterManager clusterManager;
    private final NodeSelector nodeSelector;
    private final AtomicLong handlerSequence = new AtomicLong(0L);
    private final NetClient client;
    private final ConcurrentMap<String, OutboundConnection> outboundConnections = new ConcurrentHashMap<String, OutboundConnection>();
    private final ContextInternal context;
    private NodeInfo nodeInfo;
    private String nodeId;
    private NetServerInternal server;

    public ClusteredEventBus(VertxInternal vertx, VertxOptions options, ClusterManager clusterManager, NodeSelector nodeSelector) {
        super(vertx);
        NetClient client = this.createNetClient(vertx, new NetClientOptions(options.getEventBusOptions().toJson()).setHostnameVerificationAlgorithm(""));
        this.options = options.getEventBusOptions();
        this.clusterManager = clusterManager;
        this.nodeSelector = nodeSelector;
        this.context = vertx.contextBuilder().withClassLoader(Thread.currentThread().getContextClassLoader()).withCloseFuture(new CloseFuture()).build();
        this.client = client;
    }

    CodecManager codecManager() {
        return this.codecManager;
    }

    EventBusMetrics<?> metrics() {
        return this.metrics;
    }

    VertxInternal vertx() {
        return this.vertx;
    }

    EventBusOptions options() {
        return this.options;
    }

    public static String defaultAddress() {
        Enumeration<NetworkInterface> nets;
        try {
            nets = NetworkInterface.getNetworkInterfaces();
        }
        catch (SocketException e) {
            return null;
        }
        while (nets.hasMoreElements()) {
            NetworkInterface netinf = nets.nextElement();
            Enumeration<InetAddress> addresses = netinf.getInetAddresses();
            while (addresses.hasMoreElements()) {
                InetAddress address = addresses.nextElement();
                if (address.isAnyLocalAddress() || address.isMulticastAddress() || address instanceof Inet6Address) continue;
                return address.getHostAddress();
            }
        }
        return null;
    }

    private NetClient createNetClient(VertxInternal vertx, NetClientOptions clientOptions) {
        NetClientBuilder builder = new NetClientBuilder(vertx, clientOptions);
        VertxMetrics metricsSPI = vertx.metrics();
        if (metricsSPI != null) {
            builder.metrics(metricsSPI.createNetClientMetrics(clientOptions));
        }
        return builder.build();
    }

    private NetServerOptions getServerOptions() {
        return new NetServerOptions(this.options.toJson());
    }

    @Override
    public void start(Promise<Void> promise) {
        NetServerOptions serverOptions = this.getServerOptions();
        this.server = this.vertx.createNetServer(serverOptions);
        this.server.connectHandler(socket -> {
            InboundConnection inboundConnection = new InboundConnection(this, (NetSocket)socket);
            inboundConnection.handler(this::deliverMessageLocally);
            socket.handler((Handler)inboundConnection);
        });
        int port = this.getClusterPort();
        String host = this.getClusterHost();
        this.server.listen(this.context, SocketAddress.inetSocketAddress(port, host)).flatMap(s -> {
            int publicPort = this.getClusterPublicPort(this.server.actualPort());
            String publicHost = this.getClusterPublicHost(host);
            this.nodeInfo = new NodeInfo(publicHost, publicPort, this.options.getClusterNodeMetadata());
            this.nodeId = this.clusterManager.getNodeId();
            Promise<Void> setPromise = Promise.promise();
            this.clusterManager.setNodeInfo(this.nodeInfo, setPromise);
            return setPromise.future();
        }).andThen(ar -> {
            if (ar.succeeded()) {
                this.started = true;
                this.nodeSelector.eventBusStarted();
            }
        }).onComplete(promise);
    }

    @Override
    public void close(Promise<Void> promise) {
        Promise<Void> parentClose = Promise.promise();
        super.close(parentClose);
        Future<Void> ret = parentClose.future().eventually(this.client::close);
        if (this.server != null) {
            ret = ret.eventually(() -> this.server.close());
        }
        ret.onComplete(promise);
    }

    @Override
    public MessageImpl createMessage(boolean send, boolean local, String address, MultiMap headers, Object body, String codecName) {
        Objects.requireNonNull(address, "no null address accepted");
        MessageCodec codec = this.codecManager.lookupCodec(body, codecName, local);
        ClusteredMessage msg = new ClusteredMessage(this.nodeId, address, headers, body, codec, send, this);
        return msg;
    }

    @Override
    protected <T> void onLocalRegistration(HandlerHolder<T> handlerHolder, Completable<Void> promise) {
        RegistrationInfo registrationInfo = new RegistrationInfo(this.nodeId, handlerHolder.getSeq(), handlerHolder.isLocalOnly());
        this.clusterManager.addRegistration(handlerHolder.getHandler().address(), registrationInfo, Objects.requireNonNull(promise));
    }

    @Override
    protected <T> HandlerHolder<T> createHandlerHolder(HandlerRegistration<T> registration, boolean localOnly, ContextInternal context) {
        return new ClusteredHandlerHolder<T>(registration, localOnly, context, this.handlerSequence.getAndIncrement());
    }

    @Override
    protected <T> void onLocalUnregistration(HandlerHolder<T> handlerHolder, Completable<Void> completionHandler) {
        RegistrationInfo registrationInfo = new RegistrationInfo(this.nodeId, handlerHolder.getSeq(), handlerHolder.isLocalOnly());
        this.clusterManager.removeRegistration(handlerHolder.getHandler().address(), registrationInfo, completionHandler);
    }

    @Override
    protected <T> void sendOrPub(ContextInternal ctx, MessageImpl<?, T> message, DeliveryOptions options, Promise<Void> writePromise) {
        if (((ClusteredMessage)message).getRepliedTo() != null) {
            this.clusteredSendReply(message, writePromise, ((ClusteredMessage)message).getRepliedTo());
        } else if (options.isLocalOnly()) {
            this.sendLocally(message, writePromise);
        } else if (message.isSend()) {
            this.nodeSelector.selectForSend(message.address(), (nodeId, failure) -> {
                if (failure == null) {
                    this.sendToNode((String)nodeId, message, writePromise);
                } else {
                    this.sendOrPublishFailed(writePromise, failure);
                }
            });
        } else {
            this.nodeSelector.selectForPublish(message.address(), (nodeIds, failure) -> {
                if (failure == null) {
                    this.sendToNodes((Iterable<String>)nodeIds, message, writePromise);
                } else {
                    this.sendOrPublishFailed(writePromise, failure);
                }
            });
        }
    }

    private void sendOrPublishFailed(Promise<Void> promise, Throwable cause) {
        if (log.isDebugEnabled()) {
            log.error("Failed to send message", cause);
        }
        promise.tryFail(cause);
    }

    @Override
    protected String generateReplyAddress() {
        return "__vertx.reply." + UUID.randomUUID().toString();
    }

    @Override
    protected boolean isMessageLocal(MessageImpl msg) {
        ClusteredMessage clusteredMessage = (ClusteredMessage)msg;
        return !clusteredMessage.isFromWire();
    }

    @Override
    protected HandlerHolder nextHandler(ConcurrentCyclicSequence<HandlerHolder> handlers, boolean messageLocal) {
        HandlerHolder handlerHolder = null;
        if (messageLocal) {
            handlerHolder = handlers.next();
        } else {
            Iterator<HandlerHolder> iterator = handlers.iterator(false);
            while (iterator.hasNext()) {
                HandlerHolder next = iterator.next();
                if (next.isLocalOnly()) continue;
                handlerHolder = next;
                break;
            }
        }
        return handlerHolder;
    }

    private int getClusterPort() {
        return this.options.getPort();
    }

    private String getClusterHost() {
        String host = this.options.getHost();
        if (host != null) {
            return host;
        }
        host = this.clusterManager.clusterHost();
        if (host != null) {
            return host;
        }
        return ClusteredEventBus.defaultAddress();
    }

    private int getClusterPublicPort(int actualPort) {
        int publicPort = this.options.getClusterPublicPort();
        return publicPort > 0 ? publicPort : actualPort;
    }

    private String getClusterPublicHost(String host) {
        String publicHost = this.options.getClusterPublicHost();
        if (publicHost != null) {
            return publicHost;
        }
        publicHost = this.options.getHost();
        if (publicHost != null) {
            return publicHost;
        }
        publicHost = this.clusterManager.clusterPublicHost();
        if (publicHost != null) {
            return publicHost;
        }
        return host;
    }

    private <T> void sendToNode(String nodeId, MessageImpl<?, T> message, Promise<Void> writePromise) {
        if (nodeId != null && !nodeId.equals(this.nodeId)) {
            this.sendRemote(nodeId, message, writePromise);
        } else {
            this.sendLocally(message, writePromise);
        }
    }

    private <T> void sendToNodes(Iterable<String> nodeIds, MessageImpl<?, T> message, Promise<Void> writePromise) {
        boolean sentRemote = false;
        if (nodeIds != null) {
            for (String nid : nodeIds) {
                if (!sentRemote) {
                    sentRemote = true;
                }
                this.sendToNode(nid, message, writePromise);
            }
        }
        if (!sentRemote) {
            this.sendLocally(message, writePromise);
        }
    }

    private <T> void clusteredSendReply(MessageImpl<?, T> message, Promise<Void> writePromise, String replyDest) {
        if (!replyDest.equals(this.nodeId)) {
            this.sendRemote(replyDest, message, writePromise);
        } else {
            this.sendLocally(message, writePromise);
        }
    }

    private void sendRemote(String remoteNodeId, MessageImpl<?, ?> message, Promise<Void> writePromise) {
        OutboundConnection outboundConnection = this.getOutboundConnection(remoteNodeId);
        outboundConnection.writeMessage(message, writePromise);
    }

    private OutboundConnection getOutboundConnection(String remoteNodeId) {
        OutboundConnection conn = (OutboundConnection)this.outboundConnections.get(remoteNodeId);
        if (conn == null) {
            conn = new OutboundConnection(this, remoteNodeId);
            OutboundConnection prev = this.outboundConnections.putIfAbsent(remoteNodeId, conn);
            if (prev != null) {
                conn = prev;
            } else {
                this.connect(conn);
            }
        }
        return conn;
    }

    private void connect(OutboundConnection conn) {
        Promise<NodeInfo> promise = Promise.promise();
        this.clusterManager.getNodeInfo(conn.remoteNodeId(), promise);
        promise.future().flatMap(info -> this.client.connect(info.port(), info.host())).onComplete(ar -> {
            if (ar.succeeded()) {
                NetSocket connection = (NetSocket)ar.result();
                connection.handler((Handler)conn);
                connection.closeHandler(v -> {
                    if (this.outboundConnections.remove(conn.remoteNodeId(), conn) && log.isDebugEnabled()) {
                        log.debug("Cluster connection closed for server " + conn.remoteNodeId());
                    }
                    conn.handleClose(NetSocketInternal.CLOSED_EXCEPTION);
                });
                conn.connected(connection);
            } else {
                log.warn("Connecting to server " + conn.remoteNodeId() + " failed", ar.cause());
                conn.handleClose(ar.cause());
            }
        });
    }
}

