/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.execution;

import com.facebook.presto.execution.NodeSchedulerConfig;
import com.facebook.presto.execution.NodeTaskMap;
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.ResettableRandomizedIterator;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.util.ImmutableCollectors;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import io.airlift.log.Logger;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.inject.Inject;
import org.weakref.jmx.Managed;

public class NodeScheduler {
    private static final Logger log = Logger.get(NodeScheduler.class);
    private final NodeManager nodeManager;
    private final AtomicLong scheduleLocal = new AtomicLong();
    private final AtomicLong scheduleRandom = new AtomicLong();
    private final int minCandidates;
    private final boolean includeCoordinator;
    private final int maxSplitsPerNode;
    private final int maxSplitsPerNodePerTaskWhenFull;
    private final NodeTaskMap nodeTaskMap;
    private final boolean doubleScheduling;

    @Inject
    public NodeScheduler(NodeManager nodeManager, NodeSchedulerConfig config, NodeTaskMap nodeTaskMap) {
        this.nodeManager = nodeManager;
        this.minCandidates = config.getMinCandidates();
        this.includeCoordinator = config.isIncludeCoordinator();
        this.doubleScheduling = config.isMultipleTasksPerNodeEnabled();
        this.maxSplitsPerNode = config.getMaxSplitsPerNode();
        this.maxSplitsPerNodePerTaskWhenFull = config.getMaxPendingSplitsPerNodePerTask();
        this.nodeTaskMap = Objects.requireNonNull(nodeTaskMap, "nodeTaskMap is null");
        Preconditions.checkArgument((this.maxSplitsPerNode > this.maxSplitsPerNodePerTaskWhenFull ? 1 : 0) != 0, (Object)"maxSplitsPerNode must be > maxSplitsPerNodePerTaskWhenFull");
    }

    @Managed
    public long getScheduleLocal() {
        return this.scheduleLocal.get();
    }

    @Managed
    public long getScheduleRandom() {
        return this.scheduleRandom.get();
    }

    @Managed
    public void reset() {
        this.scheduleLocal.set(0L);
        this.scheduleRandom.set(0L);
    }

    public NodeSelector createNodeSelector(String dataSourceName) {
        Supplier nodeMap = Suppliers.memoizeWithExpiration(() -> {
            ImmutableSetMultimap.Builder byHostAndPort = ImmutableSetMultimap.builder();
            ImmutableSetMultimap.Builder byHost = ImmutableSetMultimap.builder();
            Set nodes = dataSourceName != null ? this.nodeManager.getActiveDatasourceNodes(dataSourceName) : this.nodeManager.getActiveNodes();
            for (Node node : nodes) {
                try {
                    byHostAndPort.put((Object)node.getHostAndPort(), (Object)node);
                    InetAddress host = InetAddress.getByName(node.getHttpUri().getHost());
                    byHost.put((Object)host, (Object)node);
                }
                catch (UnknownHostException unknownHostException) {}
            }
            Set coordinatorNodeIds = (Set)this.nodeManager.getCoordinators().stream().map(Node::getNodeIdentifier).collect(ImmutableCollectors.toImmutableSet());
            return new NodeMap((SetMultimap<HostAddress, Node>)byHostAndPort.build(), (SetMultimap<InetAddress, Node>)byHost.build(), coordinatorNodeIds);
        }, (long)5L, (TimeUnit)TimeUnit.SECONDS);
        return new NodeSelector((Supplier<NodeMap>)nodeMap);
    }

    private static class NodeMap {
        private final SetMultimap<HostAddress, Node> nodesByHostAndPort;
        private final SetMultimap<InetAddress, Node> nodesByHost;
        private final Set<String> coordinatorNodeIds;

        public NodeMap(SetMultimap<HostAddress, Node> nodesByHostAndPort, SetMultimap<InetAddress, Node> nodesByHost, Set<String> coordinatorNodeIds) {
            this.nodesByHostAndPort = nodesByHostAndPort;
            this.nodesByHost = nodesByHost;
            this.coordinatorNodeIds = coordinatorNodeIds;
        }

        private SetMultimap<HostAddress, Node> getNodesByHostAndPort() {
            return this.nodesByHostAndPort;
        }

        public SetMultimap<InetAddress, Node> getNodesByHost() {
            return this.nodesByHost;
        }

        public Set<String> getCoordinatorNodeIds() {
            return this.coordinatorNodeIds;
        }
    }

    public class NodeSelector {
        private final AtomicReference<Supplier<NodeMap>> nodeMap;

        public NodeSelector(Supplier<NodeMap> nodeMap) {
            this.nodeMap = new AtomicReference<Supplier<NodeMap>>(nodeMap);
        }

        public void lockDownNodes() {
            this.nodeMap.set((Supplier<NodeMap>)Suppliers.ofInstance((Object)this.nodeMap.get().get()));
        }

        public List<Node> allNodes() {
            return ImmutableList.copyOf((Collection)((NodeMap)this.nodeMap.get().get()).getNodesByHostAndPort().values());
        }

        public Node selectCurrentNode() {
            return NodeScheduler.this.nodeManager.getCurrentNode();
        }

        public List<Node> selectRandomNodes(int limit) {
            return this.selectNodes(limit, this.randomizedNodes());
        }

        private List<Node> selectNodes(int limit, Iterator<Node> candidates) {
            Preconditions.checkArgument((limit > 0 ? 1 : 0) != 0, (Object)"limit must be at least 1");
            ArrayList<Node> selected = new ArrayList<Node>(limit);
            while (selected.size() < limit && candidates.hasNext()) {
                selected.add(candidates.next());
            }
            if (NodeScheduler.this.doubleScheduling && !selected.isEmpty()) {
                int uniqueNodes = selected.size();
                int i = 0;
                while (selected.size() < limit) {
                    if (i >= uniqueNodes) {
                        i = 0;
                    }
                    selected.add((Node)selected.get(i));
                    ++i;
                }
            }
            return selected;
        }

        public Multimap<Node, Split> computeAssignments(Set<Split> splits, Iterable<RemoteTask> existingTasks) {
            HashMultimap assignment = HashMultimap.create();
            HashMap<Node, Integer> assignmentCount = new HashMap<Node, Integer>();
            for (Node node : ((NodeMap)this.nodeMap.get().get()).getNodesByHostAndPort().values()) {
                assignmentCount.put(node, 0);
            }
            HashMap<Node, Integer> splitCountByNode = new HashMap<Node, Integer>();
            HashMap<String, Integer> queuedSplitCountByNode = new HashMap<String, Integer>();
            for (RemoteTask task : existingTasks) {
                String nodeId = task.getNodeId();
                queuedSplitCountByNode.put(nodeId, queuedSplitCountByNode.getOrDefault(nodeId, 0) + task.getQueuedPartitionedSplitCount());
            }
            ResettableRandomizedIterator<Node> randomCandidates = this.randomizedNodes();
            for (Split split : splits) {
                randomCandidates.reset();
                NodeMap nodeMap = (NodeMap)this.nodeMap.get().get();
                List<Node> candidateNodes = !split.isRemotelyAccessible() ? this.selectNodesBasedOnHint(nodeMap, split.getAddresses()) : this.selectNodes(NodeScheduler.this.minCandidates, randomCandidates);
                if (candidateNodes.isEmpty()) {
                    log.debug("No nodes available to schedule %s. Available nodes %s", new Object[]{split, nodeMap.getNodesByHost().keys()});
                    throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.NO_NODES_AVAILABLE, "No nodes available to run query");
                }
                for (Node node : candidateNodes) {
                    if (splitCountByNode.containsKey(node)) continue;
                    splitCountByNode.put(node, NodeScheduler.this.nodeTaskMap.getPartitionedSplitsOnNode(node));
                }
                Node chosenNode = null;
                int min = Integer.MAX_VALUE;
                for (Node node : candidateNodes) {
                    int totalSplitCount = assignmentCount.getOrDefault(node, 0) + (Integer)splitCountByNode.get(node);
                    if (totalSplitCount >= min || totalSplitCount >= NodeScheduler.this.maxSplitsPerNode) continue;
                    chosenNode = node;
                    min = totalSplitCount;
                }
                if (chosenNode == null) {
                    for (Node node : candidateNodes) {
                        int assignedSplitCount = assignmentCount.getOrDefault(node, 0);
                        int queuedSplitCount = queuedSplitCountByNode.getOrDefault(node.getNodeIdentifier(), 0);
                        int totalSplitCount = queuedSplitCount + assignedSplitCount;
                        if (totalSplitCount >= min || totalSplitCount >= NodeScheduler.this.maxSplitsPerNodePerTaskWhenFull) continue;
                        chosenNode = node;
                        min = totalSplitCount;
                    }
                }
                if (chosenNode == null) continue;
                assignment.put(chosenNode, (Object)split);
                assignmentCount.put(chosenNode, assignmentCount.getOrDefault(chosenNode, 0) + 1);
            }
            return assignment;
        }

        private ResettableRandomizedIterator<Node> randomizedNodes() {
            NodeMap nodeMap = (NodeMap)this.nodeMap.get().get();
            ImmutableList nodes = nodeMap.getNodesByHostAndPort().values().stream().filter(node -> NodeScheduler.this.includeCoordinator || !nodeMap.getCoordinatorNodeIds().contains(node.getNodeIdentifier())).collect(ImmutableCollectors.toImmutableList());
            return new ResettableRandomizedIterator<Node>((Collection<Node>)nodes);
        }

        private List<Node> selectNodesBasedOnHint(NodeMap nodeMap, List<HostAddress> addresses) {
            InetAddress address;
            LinkedHashSet chosen = new LinkedHashSet(NodeScheduler.this.minCandidates);
            Set<String> coordinatorIds = nodeMap.getCoordinatorNodeIds();
            for (HostAddress hint : addresses) {
                nodeMap.getNodesByHostAndPort().get((Object)hint).stream().filter(node -> NodeScheduler.this.includeCoordinator || !coordinatorIds.contains(node.getNodeIdentifier())).filter(chosen::add).forEach(node -> NodeScheduler.this.scheduleLocal.incrementAndGet());
                try {
                    address = hint.toInetAddress();
                }
                catch (UnknownHostException e) {
                    continue;
                }
                if (hint.hasPort()) continue;
                nodeMap.getNodesByHost().get((Object)address).stream().filter(node -> NodeScheduler.this.includeCoordinator || !coordinatorIds.contains(node.getNodeIdentifier())).filter(chosen::add).forEach(node -> NodeScheduler.this.scheduleLocal.incrementAndGet());
            }
            if (chosen.isEmpty() && !NodeScheduler.this.includeCoordinator) {
                for (HostAddress hint : addresses) {
                    nodeMap.getNodesByHostAndPort().get((Object)hint).stream().forEach(chosen::add);
                    try {
                        address = hint.toInetAddress();
                    }
                    catch (UnknownHostException e) {
                        continue;
                    }
                    if (hint.hasPort()) continue;
                    nodeMap.getNodesByHost().get((Object)address).stream().forEach(chosen::add);
                }
            }
            return ImmutableList.copyOf(chosen);
        }
    }
}

