/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.execution.scheduler;

import com.facebook.airlift.concurrent.MoreFutures;
import com.facebook.airlift.stats.CounterStat;
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.execution.NodeTaskMap;
import com.facebook.presto.execution.QueryManager;
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.scheduler.BucketNodeMap;
import com.facebook.presto.execution.scheduler.ConsistentHashingNodeProvider;
import com.facebook.presto.execution.scheduler.NetworkLocation;
import com.facebook.presto.execution.scheduler.NetworkLocationCache;
import com.facebook.presto.execution.scheduler.NetworkTopology;
import com.facebook.presto.execution.scheduler.NodeAssignmentStats;
import com.facebook.presto.execution.scheduler.NodeMap;
import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
import com.facebook.presto.execution.scheduler.NodeSelectionHashStrategy;
import com.facebook.presto.execution.scheduler.ResettableRandomizedIterator;
import com.facebook.presto.execution.scheduler.SplitPlacementResult;
import com.facebook.presto.execution.scheduler.nodeSelection.NodeSelectionStats;
import com.facebook.presto.execution.scheduler.nodeSelection.NodeSelector;
import com.facebook.presto.execution.scheduler.nodeSelection.SimpleNodeSelector;
import com.facebook.presto.execution.scheduler.nodeSelection.SimpleTtlNodeSelector;
import com.facebook.presto.execution.scheduler.nodeSelection.SimpleTtlNodeSelectorConfig;
import com.facebook.presto.execution.scheduler.nodeSelection.TopologyAwareNodeSelector;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeState;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SplitContext;
import com.facebook.presto.spi.SplitWeight;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.ttl.nodettlfetchermanagers.NodeTtlFetcherManager;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import javax.annotation.PreDestroy;
import javax.inject.Inject;

public class NodeScheduler {
    private final NetworkLocationCache networkLocationCache;
    private final List<CounterStat> topologicalSplitCounters;
    private final List<String> networkLocationSegmentNames;
    private final InternalNodeManager nodeManager;
    private final NodeSelectionStats nodeSelectionStats;
    private final int minCandidates;
    private final boolean includeCoordinator;
    private final long maxSplitsWeightPerNode;
    private final long maxPendingSplitsWeightPerTask;
    private final NodeTaskMap nodeTaskMap;
    private final boolean useNetworkTopology;
    private final Duration nodeMapRefreshInterval;
    private final NodeTtlFetcherManager nodeTtlFetcherManager;
    private final QueryManager queryManager;
    private final SimpleTtlNodeSelectorConfig simpleTtlNodeSelectorConfig;
    private final NodeSelectionHashStrategy nodeSelectionHashStrategy;
    private final int minVirtualNodeCount;
    private final int maxPreferredNodes;

    @Inject
    public NodeScheduler(NetworkTopology networkTopology, InternalNodeManager nodeManager, NodeSelectionStats nodeSelectionStats, NodeSchedulerConfig config, NodeTaskMap nodeTaskMap, NodeTtlFetcherManager nodeTtlFetcherManager, QueryManager queryManager, SimpleTtlNodeSelectorConfig simpleTtlNodeSelectorConfig) {
        this(new NetworkLocationCache(networkTopology), networkTopology, nodeManager, nodeSelectionStats, config, nodeTaskMap, new Duration(5.0, TimeUnit.SECONDS), nodeTtlFetcherManager, queryManager, simpleTtlNodeSelectorConfig);
    }

    public NodeScheduler(NetworkLocationCache networkLocationCache, NetworkTopology networkTopology, InternalNodeManager nodeManager, NodeSelectionStats nodeSelectionStats, NodeSchedulerConfig config, NodeTaskMap nodeTaskMap, Duration nodeMapRefreshInterval, NodeTtlFetcherManager nodeTtlFetcherManager, QueryManager queryManager, SimpleTtlNodeSelectorConfig simpleTtlNodeSelectorConfig) {
        this.networkLocationCache = networkLocationCache;
        this.nodeManager = nodeManager;
        this.nodeSelectionStats = Objects.requireNonNull(nodeSelectionStats, "nodeSelectionStats is null");
        this.minCandidates = config.getMinCandidates();
        this.includeCoordinator = config.isIncludeCoordinator();
        int maxSplitsPerNode = config.getMaxSplitsPerNode();
        int maxPendingSplitsPerTask = config.getMaxPendingSplitsPerTask();
        Preconditions.checkArgument((maxSplitsPerNode >= maxPendingSplitsPerTask ? 1 : 0) != 0, (Object)"maxSplitsPerNode must be > maxPendingSplitsPerTask");
        this.maxSplitsWeightPerNode = SplitWeight.rawValueForStandardSplitCount((int)maxSplitsPerNode);
        this.maxPendingSplitsWeightPerTask = SplitWeight.rawValueForStandardSplitCount((int)maxPendingSplitsPerTask);
        this.nodeTaskMap = Objects.requireNonNull(nodeTaskMap, "nodeTaskMap is null");
        this.useNetworkTopology = !config.getNetworkTopology().equals("legacy");
        ImmutableList.Builder builder = ImmutableList.builder();
        if (this.useNetworkTopology) {
            this.networkLocationSegmentNames = ImmutableList.copyOf(networkTopology.getLocationSegmentNames());
            for (int i = 0; i < this.networkLocationSegmentNames.size() + 1; ++i) {
                builder.add((Object)new CounterStat());
            }
        } else {
            this.networkLocationSegmentNames = ImmutableList.of();
        }
        this.topologicalSplitCounters = builder.build();
        this.nodeMapRefreshInterval = Objects.requireNonNull(nodeMapRefreshInterval, "nodeMapRefreshInterval is null");
        this.nodeTtlFetcherManager = Objects.requireNonNull(nodeTtlFetcherManager, "nodeTtlFetcherManager is null");
        this.queryManager = Objects.requireNonNull(queryManager, "queryManager is null");
        this.simpleTtlNodeSelectorConfig = Objects.requireNonNull(simpleTtlNodeSelectorConfig, "simpleTtlNodeSelectorConfig is null");
        this.nodeSelectionHashStrategy = config.getNodeSelectionHashStrategy();
        this.minVirtualNodeCount = config.getMinVirtualNodeCount();
        this.maxPreferredNodes = config.getMaxPreferredNodes();
    }

    @PreDestroy
    public void stop() {
        this.networkLocationCache.stop();
    }

    public Map<String, CounterStat> getTopologicalSplitCounters() {
        ImmutableMap.Builder counters = ImmutableMap.builder();
        for (int i = 0; i < this.topologicalSplitCounters.size(); ++i) {
            counters.put((Object)(i == 0 ? "all" : this.networkLocationSegmentNames.get(i - 1)), (Object)this.topologicalSplitCounters.get(i));
        }
        return counters.build();
    }

    public NodeSelector createNodeSelector(Session session, ConnectorId connectorId) {
        return this.createNodeSelector(session, connectorId, Integer.MAX_VALUE, Optional.empty());
    }

    public NodeSelector createNodeSelector(Session session, ConnectorId connectorId, Optional<Predicate<Node>> filterNodePredicate) {
        return this.createNodeSelector(session, connectorId, Integer.MAX_VALUE, filterNodePredicate);
    }

    public NodeSelector createNodeSelector(Session session, ConnectorId connectorId, int maxTasksPerStage) {
        return this.createNodeSelector(session, connectorId, maxTasksPerStage, Optional.empty());
    }

    public NodeSelector createNodeSelector(Session session, ConnectorId connectorId, int maxTasksPerStage, Optional<Predicate<Node>> filterNodePredicate) {
        Supplier nodeMap = this.nodeMapRefreshInterval.toMillis() > 0L ? Suppliers.memoizeWithExpiration(this.createNodeMapSupplier(connectorId, filterNodePredicate), (long)this.nodeMapRefreshInterval.toMillis(), (TimeUnit)TimeUnit.MILLISECONDS) : this.createNodeMapSupplier(connectorId, filterNodePredicate);
        int maxUnacknowledgedSplitsPerTask = SystemSessionProperties.getMaxUnacknowledgedSplitsPerTask(Objects.requireNonNull(session, "session is null"));
        NodeSchedulerConfig.ResourceAwareSchedulingStrategy resourceAwareSchedulingStrategy = SystemSessionProperties.getResourceAwareSchedulingStrategy(session);
        if (this.useNetworkTopology) {
            return new TopologyAwareNodeSelector(this.nodeManager, this.nodeSelectionStats, this.nodeTaskMap, this.includeCoordinator, (Supplier<NodeMap>)nodeMap, this.minCandidates, this.maxSplitsWeightPerNode, this.maxPendingSplitsWeightPerTask, maxUnacknowledgedSplitsPerTask, this.topologicalSplitCounters, this.networkLocationSegmentNames, this.networkLocationCache, this.maxPreferredNodes);
        }
        SimpleNodeSelector simpleNodeSelector = new SimpleNodeSelector(this.nodeManager, this.nodeSelectionStats, this.nodeTaskMap, this.includeCoordinator, (Supplier<NodeMap>)nodeMap, this.minCandidates, this.maxSplitsWeightPerNode, this.maxPendingSplitsWeightPerTask, maxUnacknowledgedSplitsPerTask, maxTasksPerStage, this.maxPreferredNodes);
        if (resourceAwareSchedulingStrategy == NodeSchedulerConfig.ResourceAwareSchedulingStrategy.TTL) {
            return new SimpleTtlNodeSelector(simpleNodeSelector, this.simpleTtlNodeSelectorConfig, this.nodeTaskMap, (Supplier<NodeMap>)nodeMap, this.minCandidates, this.includeCoordinator, this.maxSplitsWeightPerNode, this.maxPendingSplitsWeightPerTask, maxTasksPerStage, this.nodeTtlFetcherManager, this.queryManager, session);
        }
        return simpleNodeSelector;
    }

    private Supplier<NodeMap> createNodeMapSupplier(ConnectorId connectorId, Optional<Predicate<Node>> nodeFilterPredicate) {
        return () -> {
            List allNodes;
            List activeNodes;
            ImmutableMap.Builder activeNodesByNodeId = ImmutableMap.builder();
            ImmutableSetMultimap.Builder activeWorkersByNetworkPath = ImmutableSetMultimap.builder();
            ImmutableSetMultimap.Builder allNodesByHostAndPort = ImmutableSetMultimap.builder();
            ImmutableSetMultimap.Builder allNodesByHost = ImmutableSetMultimap.builder();
            Predicate<Node> resourceManagerFilterPredicate = node -> !node.isResourceManager();
            Predicate<Node> finalNodeFilterPredicate = resourceManagerFilterPredicate.and(nodeFilterPredicate.orElse(node -> true));
            if (connectorId != null) {
                activeNodes = (List)this.nodeManager.getActiveConnectorNodes(connectorId).stream().filter(finalNodeFilterPredicate).collect(ImmutableList.toImmutableList());
                allNodes = (List)this.nodeManager.getAllConnectorNodes(connectorId).stream().filter(finalNodeFilterPredicate).collect(ImmutableList.toImmutableList());
            } else {
                allNodes = activeNodes = (List)this.nodeManager.getNodes(NodeState.ACTIVE).stream().filter(finalNodeFilterPredicate).collect(ImmutableList.toImmutableList());
            }
            Set coordinatorNodeIds = (Set)this.nodeManager.getCoordinators().stream().map(InternalNode::getNodeIdentifier).collect(ImmutableSet.toImmutableSet());
            Optional<ConsistentHashingNodeProvider> consistentHashingNodeProvider = Optional.empty();
            if (this.nodeSelectionHashStrategy == NodeSelectionHashStrategy.CONSISTENT_HASHING) {
                int weight = (int)Math.ceil(1.0 * (double)this.minVirtualNodeCount / (double)activeNodes.size());
                consistentHashingNodeProvider = Optional.of(ConsistentHashingNodeProvider.create(activeNodes, weight));
            }
            for (InternalNode node2 : allNodes) {
                if (node2.getNodeStatus() == InternalNode.NodeStatus.ALIVE) {
                    activeNodesByNodeId.put((Object)node2.getNodeIdentifier(), (Object)node2);
                    if (this.useNetworkTopology && (this.includeCoordinator || !coordinatorNodeIds.contains(node2.getNodeIdentifier()))) {
                        NetworkLocation location = this.networkLocationCache.get(node2.getHostAndPort());
                        for (int i = 0; i <= location.getSegments().size(); ++i) {
                            activeWorkersByNetworkPath.put((Object)location.subLocation(0, i), (Object)node2);
                        }
                    }
                }
                try {
                    allNodesByHostAndPort.put((Object)node2.getHostAndPort(), (Object)node2);
                    InetAddress host = InetAddress.getByName(node2.getInternalUri().getHost());
                    allNodesByHost.put((Object)host, (Object)node2);
                }
                catch (UnknownHostException unknownHostException) {}
            }
            return new NodeMap((Map<String, InternalNode>)activeNodesByNodeId.build(), (SetMultimap<NetworkLocation, InternalNode>)activeWorkersByNetworkPath.build(), coordinatorNodeIds, activeNodes, allNodes, (SetMultimap<InetAddress, InternalNode>)allNodesByHost.build(), (SetMultimap<HostAddress, InternalNode>)allNodesByHostAndPort.build(), consistentHashingNodeProvider);
        };
    }

    public static List<InternalNode> selectNodes(int limit, ResettableRandomizedIterator<InternalNode> candidates) {
        Preconditions.checkArgument((limit > 0 ? 1 : 0) != 0, (Object)"limit must be at least 1");
        ImmutableList.Builder selectedNodes = ImmutableList.builderWithExpectedSize((int)Math.min(limit, candidates.size()));
        for (int selectedCount = 0; selectedCount < limit && candidates.hasNext(); ++selectedCount) {
            selectedNodes.add((Object)candidates.next());
        }
        return selectedNodes.build();
    }

    public static ResettableRandomizedIterator<InternalNode> randomizedNodes(NodeMap nodeMap, boolean includeCoordinator, Set<InternalNode> excludedNodes) {
        ImmutableList nodes = (ImmutableList)nodeMap.getActiveNodes().stream().filter(node -> includeCoordinator || !nodeMap.getCoordinatorNodeIds().contains(node.getNodeIdentifier())).filter(node -> !excludedNodes.contains(node)).collect(ImmutableList.toImmutableList());
        return new ResettableRandomizedIterator<InternalNode>((Collection<InternalNode>)nodes);
    }

    public static List<InternalNode> selectExactNodes(NodeMap nodeMap, List<HostAddress> hosts, boolean includeCoordinator) {
        InetAddress address;
        LinkedHashSet chosen = new LinkedHashSet();
        Set<String> coordinatorIds = nodeMap.getCoordinatorNodeIds();
        for (HostAddress host : hosts) {
            nodeMap.getAllNodesByHostAndPort().get((Object)host).stream().filter(node -> includeCoordinator || !coordinatorIds.contains(node.getNodeIdentifier())).forEach(chosen::add);
            if (host.hasPort()) continue;
            try {
                address = host.toInetAddress();
            }
            catch (UnknownHostException e) {
                continue;
            }
            nodeMap.getAllNodesByHost().get((Object)address).stream().filter(node -> includeCoordinator || !coordinatorIds.contains(node.getNodeIdentifier())).forEach(chosen::add);
        }
        if (chosen.isEmpty() && !includeCoordinator) {
            for (HostAddress host : hosts) {
                chosen.addAll(nodeMap.getAllNodesByHostAndPort().get((Object)host));
                if (host.hasPort()) continue;
                try {
                    address = host.toInetAddress();
                }
                catch (UnknownHostException e) {
                    continue;
                }
                chosen.addAll(nodeMap.getAllNodesByHost().get((Object)address));
            }
        }
        return ImmutableList.copyOf(chosen);
    }

    public static SplitPlacementResult selectDistributionNodes(NodeMap nodeMap, NodeTaskMap nodeTaskMap, long maxSplitsWeightPerNode, long maxPendingSplitsWeightPerTask, int maxUnacknowledgedSplitsPerTask, Set<Split> splits, List<RemoteTask> existingTasks, BucketNodeMap bucketNodeMap, NodeSelectionStats nodeSelectionStats) {
        HashMultimap assignments = HashMultimap.create();
        NodeAssignmentStats assignmentStats = new NodeAssignmentStats(nodeTaskMap, nodeMap, existingTasks);
        HashSet<InternalNode> blockedNodes = new HashSet<InternalNode>();
        for (Split split : splits) {
            Optional<InternalNode> optionalNode = bucketNodeMap.getAssignedNode(split);
            if (!optionalNode.isPresent()) {
                throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.NO_NODES_AVAILABLE, String.format("No assignment for split in bucketNodeMap. Split Info: %s", split.getConnectorSplit().getInfoMap()));
            }
            InternalNode node = optionalNode.get();
            boolean isCacheable = bucketNodeMap.isSplitCacheable(split);
            SplitWeight splitWeight = split.getSplitWeight();
            if (NodeScheduler.canAssignSplitToDistributionNode(assignmentStats, node, maxSplitsWeightPerNode, maxPendingSplitsWeightPerTask, maxUnacknowledgedSplitsPerTask, splitWeight)) {
                if (isCacheable) {
                    split = new Split(split.getConnectorId(), split.getTransactionHandle(), split.getConnectorSplit(), split.getLifespan(), new SplitContext(true));
                    nodeSelectionStats.incrementBucketedPreferredNodeSelectedCount();
                } else {
                    nodeSelectionStats.incrementBucketedNonPreferredNodeSelectedCount();
                }
                assignments.put((Object)node, (Object)split);
                assignmentStats.addAssignedSplit(node, splitWeight);
                continue;
            }
            blockedNodes.add(node);
        }
        ListenableFuture<?> blocked = NodeScheduler.toWhenHasSplitQueueSpaceFuture(blockedNodes, existingTasks, NodeScheduler.calculateLowWatermark(maxPendingSplitsWeightPerTask));
        return new SplitPlacementResult(blocked, (Multimap<InternalNode, Split>)ImmutableMultimap.copyOf((Multimap)assignments));
    }

    private static boolean canAssignSplitToDistributionNode(NodeAssignmentStats assignmentStats, InternalNode node, long maxSplitsWeightPerNode, long maxPendingSplitsWeightPerTask, int maxUnacknowledgedSplitsPerTask, SplitWeight splitWeight) {
        return assignmentStats.getUnacknowledgedSplitCountForStage(node) < maxUnacknowledgedSplitsPerTask && (NodeScheduler.canAssignSplitBasedOnWeight(assignmentStats.getTotalSplitsWeight(node), maxSplitsWeightPerNode, splitWeight) || NodeScheduler.canAssignSplitBasedOnWeight(assignmentStats.getQueuedSplitsWeightForStage(node), maxPendingSplitsWeightPerTask, splitWeight));
    }

    public static boolean canAssignSplitBasedOnWeight(long currentWeight, long weightLimit, SplitWeight splitWeight) {
        return Math.addExact(currentWeight, splitWeight.getRawValue()) <= weightLimit || currentWeight == 0L && weightLimit > 0L;
    }

    public static long calculateLowWatermark(long maxPendingSplitsWeightPerTask) {
        return (long)Math.ceil((double)maxPendingSplitsWeightPerTask * 0.5);
    }

    public static ListenableFuture<?> toWhenHasSplitQueueSpaceFuture(Set<InternalNode> blockedNodes, List<RemoteTask> existingTasks, long weightSpaceThreshold) {
        if (blockedNodes.isEmpty()) {
            return Futures.immediateFuture(null);
        }
        HashMap<String, RemoteTask> nodeToTaskMap = new HashMap<String, RemoteTask>();
        for (RemoteTask task : existingTasks) {
            nodeToTaskMap.put(task.getNodeId(), task);
        }
        List blockedFutures = (List)blockedNodes.stream().map(InternalNode::getNodeIdentifier).map(nodeToTaskMap::get).filter(Objects::nonNull).map(remoteTask -> remoteTask.whenSplitQueueHasSpace(weightSpaceThreshold)).collect(ImmutableList.toImmutableList());
        if (blockedFutures.isEmpty()) {
            return Futures.immediateFuture(null);
        }
        return MoreFutures.whenAnyCompleteCancelOthers((Iterable)blockedFutures);
    }

    public static ListenableFuture<?> toWhenHasSplitQueueSpaceFuture(List<RemoteTask> existingTasks, long weightSpaceThreshold) {
        if (existingTasks.isEmpty()) {
            return Futures.immediateFuture(null);
        }
        List stateChangeFutures = (List)existingTasks.stream().map(remoteTask -> remoteTask.whenSplitQueueHasSpace(weightSpaceThreshold)).collect(ImmutableList.toImmutableList());
        return MoreFutures.whenAnyCompleteCancelOthers((Iterable)stateChangeFutures);
    }
}

