/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.scheduler.resource;

import com.codahale.metrics.Meter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.IScheduler;
import org.apache.storm.scheduler.SchedulerAssignment;
import org.apache.storm.scheduler.SingleTopologyCluster;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.RasNodes;
import org.apache.storm.scheduler.resource.SchedulingResult;
import org.apache.storm.scheduler.resource.SchedulingStatus;
import org.apache.storm.scheduler.resource.User;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
import org.apache.storm.scheduler.utils.ConfigLoaderFactoryService;
import org.apache.storm.scheduler.utils.IConfigLoader;
import org.apache.storm.scheduler.utils.SchedulerConfigCache;
import org.apache.storm.shade.com.google.common.collect.ImmutableList;
import org.apache.storm.utils.DisallowedStrategyException;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ReflectionUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResourceAwareScheduler
implements IScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(ResourceAwareScheduler.class);
    private Map<String, Object> conf;
    private ISchedulingPriorityStrategy schedulingPriorityStrategy;
    private IConfigLoader configLoader;
    private int maxSchedulingAttempts;
    private int schedulingTimeoutSeconds;
    private ExecutorService backgroundScheduling;
    private Map<String, Set<String>> evictedTopologiesMap;
    private Meter schedulingTimeoutMeter;
    private Meter internalErrorMeter;
    private SchedulerConfigCache<Map<String, Map<String, Double>>> schedulerConfigCache;

    private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message) {
        ResourceAwareScheduler.markFailedTopology(u, c, td, message, null);
    }

    private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message, Throwable t) {
        c.setStatus(td, message);
        String realMessage = td.getId() + " " + message;
        if (t != null) {
            LOG.error(realMessage, t);
        } else {
            LOG.error(realMessage);
        }
        u.markTopoUnsuccess(td);
    }

    @Override
    public void prepare(Map<String, Object> conf, StormMetricsRegistry metricsRegistry) {
        this.conf = conf;
        this.schedulingTimeoutMeter = metricsRegistry.registerMeter("nimbus:num-scheduling-timeouts");
        this.internalErrorMeter = metricsRegistry.registerMeter("nimbus:scheduler-internal-errors");
        this.schedulingPriorityStrategy = (ISchedulingPriorityStrategy)ReflectionUtils.newInstance((String)((String)conf.get("resource.aware.scheduler.priority.strategy")));
        this.configLoader = ConfigLoaderFactoryService.createConfigLoader(conf);
        this.maxSchedulingAttempts = ObjectReader.getInt((Object)conf.get("resource.aware.scheduler.max.topology.scheduling.attempts"), (Integer)5);
        this.schedulingTimeoutSeconds = ObjectReader.getInt((Object)conf.get("scheduling.timeout.seconds.per.topology"), (Integer)60);
        this.backgroundScheduling = Executors.newFixedThreadPool(1);
        this.evictedTopologiesMap = new HashMap<String, Set<String>>();
        this.schedulerConfigCache = new SchedulerConfigCache<Map>(conf, this::loadConfig);
        this.schedulerConfigCache.prepare();
    }

    @Override
    public void cleanup() {
        LOG.info("Cleanup ResourceAwareScheduler scheduler");
        this.backgroundScheduling.shutdown();
    }

    @Override
    public Map<String, Map<String, Double>> config() {
        return Collections.unmodifiableMap(this.schedulerConfigCache.get());
    }

    @Override
    public void schedule(Topologies topologies, Cluster cluster) {
        this.schedulerConfigCache.refresh();
        Map<String, User> userMap = this.getUsers(cluster);
        ArrayList<TopologyDetails> orderedTopologies = new ArrayList<TopologyDetails>(this.schedulingPriorityStrategy.getOrderedTopologies(cluster, userMap));
        if (LOG.isDebugEnabled()) {
            LOG.debug("Ordered list of topologies is: {}", orderedTopologies.stream().map(t -> t.getId()).collect(Collectors.toList()));
        }
        HashMap<String, Set<String>> tmpEvictedTopologiesMap = new HashMap<String, Set<String>>();
        for (TopologyDetails td : orderedTopologies) {
            if (!cluster.needsSchedulingRas(td)) {
                cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
                continue;
            }
            User submitter = userMap.get(td.getTopologySubmitter());
            this.scheduleTopology(td, cluster, submitter, orderedTopologies, tmpEvictedTopologiesMap);
        }
        this.evictedTopologiesMap = tmpEvictedTopologiesMap;
    }

    private void scheduleTopology(TopologyDetails td, Cluster cluster, User topologySubmitter, List<TopologyDetails> orderedTopologies, Map<String, Set<String>> tmpEvictedTopologiesMap) {
        Cluster workingState = new Cluster(cluster);
        RasNodes nodes = new RasNodes(workingState);
        IStrategy rasStrategy = null;
        String strategyConf = (String)td.getConf().get("topology.scheduler.strategy");
        try {
            String strategy = (String)td.getConf().get("topology.scheduler.strategy");
            if (strategy.startsWith("backtype.storm")) {
                strategy = strategy.replace("backtype.storm", "org.apache.storm");
                LOG.debug("Replaced backtype.storm with org.apache.storm for Config.TOPOLOGY_SCHEDULER_STRATEGY");
            }
            rasStrategy = (IStrategy)ReflectionUtils.newSchedulerStrategyInstance((String)strategy, this.conf);
            rasStrategy.prepare(this.conf);
        }
        catch (DisallowedStrategyException e) {
            ResourceAwareScheduler.markFailedTopology(topologySubmitter, cluster, td, "Unsuccessful in scheduling - " + e.getAttemptedClass() + " is not an allowed strategy. Please make sure your topology.scheduler.strategy config is one of the allowed strategies: " + e.getAllowedStrategies(), e);
            return;
        }
        catch (RuntimeException e) {
            ResourceAwareScheduler.markFailedTopology(topologySubmitter, cluster, td, "Unsuccessful in scheduling - failed to create instance of topology strategy " + strategyConf + ". Please check logs for details", e);
            return;
        }
        boolean oneExecutorPerWorker = (Boolean)td.getConf().get("topology.ras.one.executor.per.worker");
        boolean oneComponentPerWorker = (Boolean)td.getConf().get("topology.ras.one.component.per.worker");
        if (oneExecutorPerWorker && oneComponentPerWorker) {
            LOG.warn("Conflicting options: {} and {} are both set! Ignoring {} option.", new Object[]{"topology.ras.one.executor.per.worker", "topology.ras.one.component.per.worker", "topology.ras.one.component.per.worker"});
        }
        TopologySchedulingResources topologySchedulingResources = new TopologySchedulingResources(workingState, td);
        IStrategy finalRasStrategy = rasStrategy;
        for (int i = 0; i < this.maxSchedulingAttempts; ++i) {
            SingleTopologyCluster toSchedule = new SingleTopologyCluster(workingState, td.getId());
            try {
                HashSet<String> tmpEvictedTopos;
                SchedulingResult result = null;
                topologySchedulingResources.resetRemaining();
                if (topologySchedulingResources.canSchedule()) {
                    Future<SchedulingResult> schedulingFuture = this.backgroundScheduling.submit(() -> finalRasStrategy.schedule(toSchedule, td));
                    try {
                        result = schedulingFuture.get(this.schedulingTimeoutSeconds, TimeUnit.SECONDS);
                    }
                    catch (TimeoutException te) {
                        ResourceAwareScheduler.markFailedTopology(topologySubmitter, cluster, td, "Scheduling took too long for " + td.getId() + " using strategy " + rasStrategy.getClass().getName() + " timeout after " + this.schedulingTimeoutSeconds + " seconds using config scheduling.timeout.seconds.per.topology.");
                        this.schedulingTimeoutMeter.mark();
                        schedulingFuture.cancel(true);
                        return;
                    }
                } else {
                    result = SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "");
                }
                LOG.debug("scheduling result: {}", (Object)result);
                if (result == null) {
                    ResourceAwareScheduler.markFailedTopology(topologySubmitter, cluster, td, "Internal scheduler error");
                    return;
                }
                if (result.isSuccess()) {
                    cluster.updateFrom(toSchedule);
                    cluster.setStatus(td.getId(), "Running - " + result.getMessage());
                    return;
                }
                if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
                    LOG.debug("Not enough resources to schedule {}", (Object)td.getName());
                    ImmutableList reversedList = ImmutableList.copyOf(orderedTopologies).reverse();
                    LOG.debug("Attempting to make space for topo {} from user {}", (Object)td.getName(), (Object)td.getTopologySubmitter());
                    int tdIndex = reversedList.indexOf(td);
                    topologySchedulingResources.setRemainingRequiredResources(toSchedule, td);
                    tmpEvictedTopos = new HashSet<String>();
                    for (int index = 0; index < tdIndex; ++index) {
                        TopologyDetails topologyEvict = (TopologyDetails)reversedList.get(index);
                        SchedulerAssignment evictAssignemnt = workingState.getAssignmentById(topologyEvict.getId());
                        if (evictAssignemnt == null || evictAssignemnt.getSlots().isEmpty()) continue;
                        topologySchedulingResources.adjustResourcesForEvictedTopology(toSchedule, topologyEvict);
                        tmpEvictedTopos.add(topologyEvict.getId());
                        Collection<WorkerSlot> workersToEvict = workingState.getUsedSlotsByTopologyId(topologyEvict.getId());
                        nodes.freeSlots(workersToEvict);
                        if (topologySchedulingResources.canSchedule()) break;
                    }
                    if (tmpEvictedTopos.isEmpty()) {
                        StringBuilder message = new StringBuilder();
                        message.append("Not enough resources to schedule after evicting lower priority topologies. ");
                        message.append(topologySchedulingResources.getRemainingRequiredResourcesMessage());
                        message.append(result.getErrorMessage());
                        ResourceAwareScheduler.markFailedTopology(topologySubmitter, cluster, td, message.toString());
                        return;
                    }
                } else {
                    topologySubmitter.markTopoUnsuccess(td, cluster, result.toString());
                    return;
                }
                LOG.warn("Evicted Topologies {} when scheduling topology: {}", tmpEvictedTopos, (Object)td.getId());
                tmpEvictedTopologiesMap.computeIfAbsent(td.getId(), k -> new HashSet()).addAll(tmpEvictedTopos);
                continue;
            }
            catch (Exception ex) {
                this.internalErrorMeter.mark();
                ResourceAwareScheduler.markFailedTopology(topologySubmitter, cluster, td, "Internal Error - Exception thrown when scheduling. Please check logs for details", ex);
                return;
            }
        }
        ResourceAwareScheduler.markFailedTopology(topologySubmitter, cluster, td, "Failed to make enough resources for " + td.getId() + " by evicting lower priority topologies within " + this.maxSchedulingAttempts + " attempts. " + topologySchedulingResources.getRemainingRequiredResourcesMessage());
    }

    public Map<String, Set<String>> getEvictedTopologiesMap() {
        return Collections.unmodifiableMap(this.evictedTopologiesMap);
    }

    private Map<String, User> getUsers(Cluster cluster) {
        HashMap<String, User> userMap = new HashMap<String, User>();
        Map<String, Map<String, Double>> userResourcePools = this.config();
        LOG.debug("userResourcePools: {}", userResourcePools);
        for (TopologyDetails td : cluster.getTopologies()) {
            String topologySubmitter = td.getTopologySubmitter();
            if (topologySubmitter == null || topologySubmitter.equals("")) {
                LOG.error("Cannot determine user for topology {}.  Will skip scheduling this topology", (Object)td.getName());
                continue;
            }
            if (userMap.containsKey(topologySubmitter)) continue;
            userMap.put(topologySubmitter, new User(topologySubmitter, userResourcePools.get(topologySubmitter)));
        }
        return userMap;
    }

    private Map<String, Map<String, Double>> convertToDouble(Map<String, Map<String, Number>> raw) {
        HashMap<String, Map<String, Double>> ret = new HashMap<String, Map<String, Double>>();
        if (raw != null) {
            for (Map.Entry<String, Map<String, Number>> userPoolEntry : raw.entrySet()) {
                String user = userPoolEntry.getKey();
                ret.put(user, new HashMap());
                for (Map.Entry<String, Number> resourceEntry : userPoolEntry.getValue().entrySet()) {
                    ((Map)ret.get(user)).put(resourceEntry.getKey(), resourceEntry.getValue().doubleValue());
                }
            }
        }
        return ret;
    }

    private Map<String, Map<String, Double>> loadConfig() {
        Map fromFile;
        Map raw;
        if (this.configLoader != null) {
            raw = this.configLoader.load("resource.aware.scheduler.user.pools");
            if (raw != null) {
                return this.convertToDouble(raw);
            }
            LOG.warn("Config loader returned null. Will try to read from user-resource-pools.yaml");
        }
        if ((raw = (Map)(fromFile = Utils.findAndReadConfigFile((String)"user-resource-pools.yaml", (boolean)false)).get("resource.aware.scheduler.user.pools")) != null) {
            return this.convertToDouble(raw);
        }
        LOG.warn("Reading from user-resource-pools.yaml returned null. This could because the file is not available. Will load configs from storm configuration");
        raw = (Map)this.conf.get("resource.aware.scheduler.user.pools");
        return this.convertToDouble(raw);
    }

    private class TopologySchedulingResources {
        boolean remainingResourcesAreSet = false;
        NormalizedResourceOffer clusterAvailableResources;
        NormalizedResourceRequest topologyRequiredResources;
        NormalizedResourceRequest topologyScheduledResources;
        double clusterAvailableMemory;
        double topologyRequiredNonSharedMemory;
        double topologySharedMemoryLowerBound;
        NormalizedResourceOffer remainingRequiredTopologyResources;
        double remainingRequiredTopologyMemory;
        double topologyScheduledMemory;

        TopologySchedulingResources(Cluster cluster, TopologyDetails td) {
            this.clusterAvailableResources = cluster.getNonBlacklistedClusterAvailableResources(Collections.emptyList());
            this.clusterAvailableMemory = this.clusterAvailableResources.getTotalMemoryMb();
            this.topologyRequiredResources = td.getApproximateTotalResources();
            this.topologyRequiredNonSharedMemory = td.getRequestedNonSharedOffHeap() + td.getRequestedNonSharedOnHeap();
            this.topologySharedMemoryLowerBound = td.getRequestedSharedOffHeap() + td.getRequestedSharedOnHeap();
            this.setScheduledTopologyResources(cluster, td);
        }

        void setScheduledTopologyResources(Cluster cluster, TopologyDetails td) {
            SchedulerAssignment assignment = cluster.getAssignmentById(td.getId());
            if (assignment != null) {
                this.topologyScheduledResources = td.getApproximateResources(assignment.getExecutors());
                this.topologyScheduledMemory = this.computeScheduledTopologyMemory(cluster, td);
            } else {
                this.topologyScheduledResources = new NormalizedResourceRequest();
                this.topologyScheduledMemory = 0.0;
            }
        }

        boolean canSchedule() {
            return this.canScheduleAvailable() && this.canScheduleRemainingRequired();
        }

        boolean canScheduleAvailable() {
            NormalizedResourceOffer availableResources = new NormalizedResourceOffer(this.clusterAvailableResources);
            availableResources.add(this.topologyScheduledResources);
            boolean insufficientResources = availableResources.remove(this.topologyRequiredResources);
            if (insufficientResources) {
                return false;
            }
            double availableMemory = this.clusterAvailableMemory + this.topologyScheduledMemory;
            double totalRequiredTopologyMemory = this.topologyRequiredNonSharedMemory + this.topologySharedMemoryLowerBound;
            return availableMemory >= totalRequiredTopologyMemory;
        }

        boolean canScheduleRemainingRequired() {
            if (!this.remainingResourcesAreSet) {
                return true;
            }
            return !this.remainingRequiredTopologyResources.areAnyOverZero() && !(this.remainingRequiredTopologyMemory > 0.0);
        }

        void setRemainingRequiredResources(Cluster cluster, TopologyDetails td) {
            this.remainingResourcesAreSet = true;
            this.setScheduledTopologyResources(cluster, td);
            this.remainingRequiredTopologyResources = new NormalizedResourceOffer();
            this.remainingRequiredTopologyResources.add(this.topologyRequiredResources);
            this.remainingRequiredTopologyResources.remove(this.topologyScheduledResources);
            this.remainingRequiredTopologyMemory = this.topologyRequiredNonSharedMemory + this.topologySharedMemoryLowerBound - this.topologyScheduledMemory;
        }

        void adjustResourcesForEvictedTopology(Cluster cluster, TopologyDetails evict) {
            SchedulerAssignment assignment = cluster.getAssignmentById(evict.getId());
            if (assignment != null) {
                NormalizedResourceRequest evictResources = evict.getApproximateResources(assignment.getExecutors());
                double topologyScheduledMemory = this.computeScheduledTopologyMemory(cluster, evict);
                this.clusterAvailableResources.add(evictResources);
                this.clusterAvailableMemory += topologyScheduledMemory;
                this.remainingRequiredTopologyResources.remove(evictResources);
                this.remainingRequiredTopologyMemory -= topologyScheduledMemory;
            }
        }

        void resetRemaining() {
            this.remainingResourcesAreSet = false;
            this.remainingRequiredTopologyMemory = 0.0;
        }

        private double getMemoryUsed(SchedulerAssignment assignment) {
            return assignment.getScheduledResources().values().stream().mapToDouble(wr -> wr.get_mem_on_heap() + wr.get_mem_off_heap()).sum();
        }

        private double computeScheduledTopologyMemory(Cluster cluster, TopologyDetails td) {
            SchedulerAssignment assignment = cluster.getAssignmentById(td.getId());
            double scheduledTopologyMemory = 0.0;
            if (assignment != null) {
                for (double mem : assignment.getNodeIdToTotalSharedOffHeapNodeMemory().values()) {
                    scheduledTopologyMemory += mem;
                }
                scheduledTopologyMemory += this.getMemoryUsed(assignment);
            }
            return scheduledTopologyMemory;
        }

        String getRemainingRequiredResourcesMessage() {
            StringBuilder message = new StringBuilder();
            NormalizedResourceOffer clusterRemainingAvailableResources = new NormalizedResourceOffer();
            clusterRemainingAvailableResources.add(this.clusterAvailableResources);
            clusterRemainingAvailableResources.remove(this.topologyScheduledResources);
            double memoryNeeded = this.remainingRequiredTopologyMemory;
            double cpuNeeded = this.remainingRequiredTopologyResources.getTotalCpu();
            if (memoryNeeded > 0.0) {
                message.append("Additional Memory Required: ").append(memoryNeeded).append(" MB ");
                message.append("(Available: ").append(clusterRemainingAvailableResources.getTotalMemoryMb()).append(" MB). ");
            }
            if (cpuNeeded > 0.0) {
                message.append("Additional CPU Required: ").append(cpuNeeded).append("% CPU ");
                message.append("(Available: ").append(clusterRemainingAvailableResources.getTotalCpu()).append(" % CPU).");
            }
            if (this.remainingRequiredTopologyResources.getNormalizedResources().anyNonCpuOverZero()) {
                message.append(" Additional Topology Required Resources: ");
                message.append(this.remainingRequiredTopologyResources.getNormalizedResources().toString());
                message.append(" Cluster Available Resources: ");
                message.append(clusterRemainingAvailableResources.getNormalizedResources().toString());
                message.append(".  ");
            }
            return message.toString();
        }
    }
}

