/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.metadata;

import com.facebook.airlift.concurrent.Threads;
import com.facebook.airlift.discovery.client.ServiceDescriptor;
import com.facebook.airlift.discovery.client.ServiceSelector;
import com.facebook.airlift.discovery.client.ServiceType;
import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.HttpUriBuilder;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.node.NodeInfo;
import com.facebook.drift.client.DriftClient;
import com.facebook.presto.client.NodeVersion;
import com.facebook.presto.failureDetector.FailureDetector;
import com.facebook.presto.metadata.AllNodes;
import com.facebook.presto.metadata.ForNodeManager;
import com.facebook.presto.metadata.HttpRemoteNodeState;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.metadata.RemoteNodeState;
import com.facebook.presto.metadata.ThriftRemoteNodeState;
import com.facebook.presto.server.InternalCommunicationConfig;
import com.facebook.presto.server.thrift.ThriftServerInfoClient;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.NodeState;
import com.facebook.presto.statusservice.NodeStatusService;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import org.weakref.jmx.Managed;

@ThreadSafe
public final class DiscoveryNodeManager
implements InternalNodeManager {
    private static final Logger log = Logger.get(DiscoveryNodeManager.class);
    private static final Splitter CONNECTOR_ID_SPLITTER = Splitter.on((char)',').trimResults().omitEmptyStrings();
    private final ServiceSelector serviceSelector;
    private final FailureDetector failureDetector;
    private final Optional<NodeStatusService> nodeStatusService;
    private final NodeVersion expectedNodeVersion;
    private final ConcurrentHashMap<String, RemoteNodeState> nodeStates = new ConcurrentHashMap();
    private final HttpClient httpClient;
    private final DriftClient<ThriftServerInfoClient> driftClient;
    private final ScheduledExecutorService nodeStateUpdateExecutor;
    private final ExecutorService nodeStateEventExecutor;
    private final boolean httpsRequired;
    private final InternalNode currentNode;
    private final InternalCommunicationConfig.CommunicationProtocol protocol;
    private final boolean isMemoizeDeadNodesEnabled;
    @GuardedBy(value="this")
    private SetMultimap<ConnectorId, InternalNode> activeNodesByConnectorId;
    @GuardedBy(value="this")
    private SetMultimap<ConnectorId, InternalNode> nodesByConnectorId;
    @GuardedBy(value="this")
    private SetMultimap<String, ConnectorId> connectorIdsByNodeId;
    @GuardedBy(value="this")
    private Map<String, InternalNode> nodes;
    @GuardedBy(value="this")
    private AllNodes allNodes;
    @GuardedBy(value="this")
    private Set<InternalNode> coordinators;
    @GuardedBy(value="this")
    private Set<InternalNode> resourceManagers;
    @GuardedBy(value="this")
    private final List<Consumer<AllNodes>> listeners = new ArrayList<Consumer<AllNodes>>();

    @Inject
    public DiscoveryNodeManager(@ServiceType(value="presto") ServiceSelector serviceSelector, NodeInfo nodeInfo, FailureDetector failureDetector, Optional<NodeStatusService> nodeStatusService, NodeVersion expectedNodeVersion, @ForNodeManager HttpClient httpClient, @ForNodeManager DriftClient<ThriftServerInfoClient> driftClient, InternalCommunicationConfig internalCommunicationConfig) {
        this.serviceSelector = Objects.requireNonNull(serviceSelector, "serviceSelector is null");
        this.failureDetector = Objects.requireNonNull(failureDetector, "failureDetector is null");
        this.nodeStatusService = Objects.requireNonNull(nodeStatusService, "nodeStatusService is null");
        this.expectedNodeVersion = Objects.requireNonNull(expectedNodeVersion, "expectedNodeVersion is null");
        this.httpClient = Objects.requireNonNull(httpClient, "httpClient is null");
        this.driftClient = Objects.requireNonNull(driftClient, "driftClient is null");
        this.nodeStateUpdateExecutor = Executors.newSingleThreadScheduledExecutor(Threads.threadsNamed((String)"node-state-poller-%s"));
        this.nodeStateEventExecutor = Executors.newCachedThreadPool(Threads.threadsNamed((String)"node-state-events-%s"));
        this.httpsRequired = internalCommunicationConfig.isHttpsRequired();
        this.currentNode = DiscoveryNodeManager.findCurrentNode(serviceSelector.selectAllServices(), Objects.requireNonNull(nodeInfo, "nodeInfo is null").getNodeId(), expectedNodeVersion, this.httpsRequired);
        this.protocol = internalCommunicationConfig.getServerInfoCommunicationProtocol();
        this.isMemoizeDeadNodesEnabled = internalCommunicationConfig.isMemoizeDeadNodesEnabled();
        this.refreshNodesInternal();
    }

    private static InternalNode findCurrentNode(List<ServiceDescriptor> allServices, String currentNodeId, NodeVersion expectedNodeVersion, boolean httpsRequired) {
        for (ServiceDescriptor service : allServices) {
            InternalNode node;
            URI uri = DiscoveryNodeManager.getHttpUri(service, httpsRequired);
            OptionalInt thriftPort = DiscoveryNodeManager.getThriftServerPort(service);
            NodeVersion nodeVersion = DiscoveryNodeManager.getNodeVersion(service);
            if (uri == null || nodeVersion == null || !(node = new InternalNode(service.getNodeId(), uri, thriftPort, nodeVersion, DiscoveryNodeManager.isCoordinator(service), DiscoveryNodeManager.isResourceManager(service), InternalNode.NodeStatus.ALIVE)).getNodeIdentifier().equals(currentNodeId)) continue;
            Preconditions.checkState((boolean)node.getNodeVersion().equals((Object)expectedNodeVersion), (String)"INVARIANT: current node version (%s) should be equal to %s", (Object)node.getNodeVersion(), (Object)expectedNodeVersion);
            return node;
        }
        throw new IllegalStateException("INVARIANT: current node not returned from service selector");
    }

    @PostConstruct
    public void startPollingNodeStates() {
        this.nodeStateUpdateExecutor.scheduleWithFixedDelay(() -> {
            try {
                this.pollWorkers();
            }
            catch (Exception e) {
                log.error((Throwable)e, "Error polling state of nodes");
            }
        }, 5L, 5L, TimeUnit.SECONDS);
        this.pollWorkers();
    }

    private void pollWorkers() {
        AllNodes allNodes = this.getAllNodes();
        ImmutableSet aliveNodes = ImmutableSet.builder().addAll(allNodes.getActiveNodes()).addAll(allNodes.getShuttingDownNodes()).build();
        ImmutableSet aliveNodeIds = (ImmutableSet)aliveNodes.stream().map(InternalNode::getNodeIdentifier).collect(ImmutableSet.toImmutableSet());
        ImmutableSet deadNodes = Sets.difference((Set)this.nodeStates.keySet(), (Set)aliveNodeIds).immutableCopy();
        ((ConcurrentHashMap.KeySetView)this.nodeStates.keySet()).removeAll((Collection)deadNodes);
        for (InternalNode node : aliveNodes) {
            switch (this.protocol) {
                case HTTP: {
                    this.nodeStates.putIfAbsent(node.getNodeIdentifier(), new HttpRemoteNodeState(this.httpClient, HttpUriBuilder.uriBuilderFrom((URI)node.getInternalUri()).appendPath("/v1/info/state").build()));
                    break;
                }
                case THRIFT: {
                    if (!node.getThriftPort().isPresent()) break;
                    this.nodeStates.put(node.getNodeIdentifier(), new ThriftRemoteNodeState(this.driftClient, HttpUriBuilder.uriBuilderFrom((URI)node.getInternalUri()).scheme("thrift").port(node.getThriftPort().getAsInt()).build()));
                }
            }
        }
        this.nodeStates.values().forEach(RemoteNodeState::asyncRefresh);
        this.refreshNodesInternal();
    }

    @PreDestroy
    public void stop() {
        this.nodeStateUpdateExecutor.shutdownNow();
    }

    @Override
    public void refreshNodes() {
        this.refreshNodesInternal();
    }

    private synchronized void refreshNodesInternal() {
        Set services = (Set)this.serviceSelector.selectAllServices().stream().filter(service -> !this.failureDetector.getFailed().contains(service)).filter(service -> !this.nodeStatusService.isPresent() || this.nodeStatusService.get().isAllowed(service.getLocation()) || DiscoveryNodeManager.isCoordinator(service)).collect(ImmutableSet.toImmutableSet());
        ImmutableSortedSet.Builder activeNodesBuilder = ImmutableSortedSet.orderedBy(Comparator.comparing(InternalNode::getNodeIdentifier));
        ImmutableSet.Builder inactiveNodesBuilder = ImmutableSet.builder();
        ImmutableSet.Builder shuttingDownNodesBuilder = ImmutableSet.builder();
        ImmutableSet.Builder coordinatorsBuilder = ImmutableSet.builder();
        ImmutableSet.Builder resourceManagersBuilder = ImmutableSet.builder();
        ImmutableSetMultimap.Builder byConnectorIdBuilder = ImmutableSetMultimap.builder();
        HashMap<String, InternalNode> nodes = new HashMap<String, InternalNode>();
        HashMultimap connectorIdsByNodeId = HashMultimap.create();
        byConnectorIdBuilder.orderValuesBy(Comparator.comparing(InternalNode::getNodeIdentifier));
        if (this.isMemoizeDeadNodesEnabled && this.nodes != null) {
            nodes.putAll(this.nodes);
        }
        if (this.isMemoizeDeadNodesEnabled && this.connectorIdsByNodeId != null) {
            connectorIdsByNodeId.putAll(this.connectorIdsByNodeId);
        }
        block5: for (ServiceDescriptor service2 : services) {
            URI uri = DiscoveryNodeManager.getHttpUri(service2, this.httpsRequired);
            OptionalInt thriftPort = DiscoveryNodeManager.getThriftServerPort(service2);
            NodeVersion nodeVersion = DiscoveryNodeManager.getNodeVersion(service2);
            boolean coordinator = DiscoveryNodeManager.isCoordinator(service2);
            boolean resourceManager = DiscoveryNodeManager.isResourceManager(service2);
            if (uri == null || nodeVersion == null) continue;
            InternalNode node = new InternalNode(service2.getNodeId(), uri, thriftPort, nodeVersion, coordinator, resourceManager, InternalNode.NodeStatus.ALIVE);
            NodeState nodeState = this.getNodeState(node);
            switch (nodeState) {
                case ACTIVE: {
                    activeNodesBuilder.add((Object)node);
                    if (coordinator) {
                        coordinatorsBuilder.add((Object)node);
                    }
                    if (resourceManager) {
                        resourceManagersBuilder.add((Object)node);
                    }
                    nodes.put(node.getNodeIdentifier(), node);
                    String connectorIds = (String)service2.getProperties().get("connectorIds");
                    if (connectorIds != null) {
                        connectorIds = connectorIds.toLowerCase(Locale.ENGLISH);
                        for (String id : CONNECTOR_ID_SPLITTER.split((CharSequence)connectorIds)) {
                            ConnectorId connectorId = new ConnectorId(id);
                            byConnectorIdBuilder.put((Object)connectorId, (Object)node);
                            connectorIdsByNodeId.put((Object)node.getNodeIdentifier(), (Object)connectorId);
                        }
                    }
                    byConnectorIdBuilder.put((Object)new ConnectorId("system"), (Object)node);
                    continue block5;
                }
                case INACTIVE: {
                    inactiveNodesBuilder.add((Object)node);
                    continue block5;
                }
                case SHUTTING_DOWN: {
                    shuttingDownNodesBuilder.add((Object)node);
                    continue block5;
                }
            }
            log.error("Unknown state %s for node %s", new Object[]{nodeState, node});
        }
        if (this.allNodes != null) {
            Sets.SetView missingNodes = Sets.difference(this.allNodes.getActiveNodes(), (Set)Sets.union((Set)activeNodesBuilder.build(), (Set)shuttingDownNodesBuilder.build()));
            for (InternalNode missingNode : missingNodes) {
                log.info("Previously active node is missing: %s (last seen at %s)", new Object[]{missingNode.getNodeIdentifier(), missingNode.getHost()});
            }
        }
        this.activeNodesByConnectorId = byConnectorIdBuilder.build();
        if (this.isMemoizeDeadNodesEnabled) {
            Sets.SetView deadNodeIds = Sets.difference(nodes.keySet(), (Set)((Set)activeNodesBuilder.build().stream().map(InternalNode::getNodeIdentifier).collect(ImmutableSet.toImmutableSet())));
            for (String nodeId : deadNodeIds) {
                InternalNode deadNode = (InternalNode)nodes.get(nodeId);
                Set deadNodeConnectorIds = connectorIdsByNodeId.get((Object)nodeId);
                for (ConnectorId id : deadNodeConnectorIds) {
                    byConnectorIdBuilder.put((Object)id, (Object)new InternalNode(deadNode.getNodeIdentifier(), deadNode.getInternalUri(), deadNode.getThriftPort(), deadNode.getNodeVersion(), deadNode.isCoordinator(), deadNode.isResourceManager(), InternalNode.NodeStatus.DEAD));
                }
            }
        }
        this.nodes = ImmutableMap.copyOf(nodes);
        this.nodesByConnectorId = byConnectorIdBuilder.build();
        this.connectorIdsByNodeId = ImmutableSetMultimap.copyOf((Multimap)connectorIdsByNodeId);
        AllNodes allNodes = new AllNodes((Set<InternalNode>)activeNodesBuilder.build(), (Set<InternalNode>)inactiveNodesBuilder.build(), (Set<InternalNode>)shuttingDownNodesBuilder.build(), (Set<InternalNode>)coordinatorsBuilder.build(), (Set<InternalNode>)resourceManagersBuilder.build());
        if (!allNodes.equals(this.allNodes)) {
            this.allNodes = allNodes;
            this.coordinators = coordinatorsBuilder.build();
            this.resourceManagers = resourceManagersBuilder.build();
            ImmutableList listeners = ImmutableList.copyOf(this.listeners);
            this.nodeStateEventExecutor.submit(() -> DiscoveryNodeManager.lambda$refreshNodesInternal$4((List)listeners, allNodes));
        }
    }

    private NodeState getNodeState(InternalNode node) {
        if (this.expectedNodeVersion.equals((Object)node.getNodeVersion())) {
            if (this.isNodeShuttingDown(node.getNodeIdentifier())) {
                return NodeState.SHUTTING_DOWN;
            }
            return NodeState.ACTIVE;
        }
        return NodeState.INACTIVE;
    }

    private boolean isNodeShuttingDown(String nodeId) {
        Optional remoteNodeState = this.nodeStates.containsKey(nodeId) ? this.nodeStates.get(nodeId).getNodeState() : Optional.empty();
        return remoteNodeState.isPresent() && remoteNodeState.get() == NodeState.SHUTTING_DOWN;
    }

    @Override
    public synchronized AllNodes getAllNodes() {
        return this.allNodes;
    }

    @Managed
    public int getActiveNodeCount() {
        return this.getAllNodes().getActiveNodes().size();
    }

    @Managed
    public int getInactiveNodeCount() {
        return this.getAllNodes().getInactiveNodes().size();
    }

    @Managed
    public int getShuttingDownNodeCount() {
        return this.getAllNodes().getShuttingDownNodes().size();
    }

    @Override
    public Set<InternalNode> getNodes(NodeState state) {
        switch (state) {
            case ACTIVE: {
                return this.getAllNodes().getActiveNodes();
            }
            case INACTIVE: {
                return this.getAllNodes().getInactiveNodes();
            }
            case SHUTTING_DOWN: {
                return this.getAllNodes().getShuttingDownNodes();
            }
        }
        throw new IllegalArgumentException("Unknown node state " + state);
    }

    @Override
    public synchronized Set<InternalNode> getActiveConnectorNodes(ConnectorId connectorId) {
        return this.activeNodesByConnectorId.get((Object)connectorId);
    }

    @Override
    public synchronized Set<InternalNode> getAllConnectorNodes(ConnectorId connectorId) {
        return this.nodesByConnectorId.get((Object)connectorId);
    }

    @Override
    public InternalNode getCurrentNode() {
        return this.currentNode;
    }

    @Override
    public synchronized Set<InternalNode> getCoordinators() {
        return this.coordinators;
    }

    @Override
    public synchronized Set<InternalNode> getResourceManagers() {
        return this.resourceManagers;
    }

    @Override
    public synchronized void addNodeChangeListener(Consumer<AllNodes> listener) {
        this.listeners.add(Objects.requireNonNull(listener, "listener is null"));
        AllNodes allNodes = this.allNodes;
        this.nodeStateEventExecutor.submit(() -> listener.accept(allNodes));
    }

    @Override
    public synchronized void removeNodeChangeListener(Consumer<AllNodes> listener) {
        this.listeners.remove(Objects.requireNonNull(listener, "listener is null"));
    }

    private static URI getHttpUri(ServiceDescriptor descriptor, boolean httpsRequired) {
        String url = (String)descriptor.getProperties().get(httpsRequired ? "https" : "http");
        if (url != null) {
            try {
                return new URI(url);
            }
            catch (URISyntaxException uRISyntaxException) {
                // empty catch block
            }
        }
        return null;
    }

    private static OptionalInt getThriftServerPort(ServiceDescriptor descriptor) {
        String port = (String)descriptor.getProperties().get("thriftServerPort");
        if (port != null) {
            try {
                return OptionalInt.of(Integer.parseInt(port));
            }
            catch (IllegalArgumentException illegalArgumentException) {
                // empty catch block
            }
        }
        return OptionalInt.empty();
    }

    private static NodeVersion getNodeVersion(ServiceDescriptor descriptor) {
        String nodeVersion = (String)descriptor.getProperties().get("node_version");
        return nodeVersion == null ? null : new NodeVersion(nodeVersion);
    }

    private static boolean isCoordinator(ServiceDescriptor service) {
        return Boolean.parseBoolean((String)service.getProperties().get("coordinator"));
    }

    private static boolean isResourceManager(ServiceDescriptor service) {
        return Boolean.parseBoolean((String)service.getProperties().get("resource_manager"));
    }

    private static /* synthetic */ void lambda$refreshNodesInternal$4(List listeners, AllNodes allNodes) {
        listeners.forEach(listener -> listener.accept(allNodes));
    }
}

