/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.loadgen;

import java.io.File;
import java.lang.invoke.CallSite;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import net.minidev.json.JSONObject;
import net.minidev.json.JSONValue;
import net.minidev.json.parser.JSONParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.storm.Config;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.BoltStats;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.ExecutorSummary;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.generated.TopologyPageInfo;
import org.apache.storm.generated.TopologySummary;
import org.apache.storm.generated.WorkerSummary;
import org.apache.storm.loadgen.InputStream;
import org.apache.storm.loadgen.LoadCompConf;
import org.apache.storm.loadgen.NormalDistStats;
import org.apache.storm.loadgen.OutputStream;
import org.apache.storm.loadgen.TopologyLoadConf;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.ObjectReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CaptureLoad {
    private static final Logger LOG = LoggerFactory.getLogger(CaptureLoad.class);
    public static final String DEFAULT_OUT_DIR = "./loadgen/";

    private static List<Double> extractBoltValues(List<ExecutorSummary> summaries, GlobalStreamId id, Function<BoltStats, Map<String, Map<GlobalStreamId, Double>>> func) {
        ArrayList<Double> ret = new ArrayList<Double>();
        if (summaries != null) {
            for (ExecutorSummary summ : summaries) {
                Map<String, Map<GlobalStreamId, Double>> data;
                if (summ == null || !summ.is_set_stats() || (data = func.apply(summ.get_stats().get_specific().get_bolt())) == null) continue;
                List subvalues = data.values().stream().map(subMap -> (Double)subMap.get(id)).filter(value -> value != null).collect(Collectors.toList());
                ret.addAll(subvalues);
            }
        }
        return ret;
    }

    static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary topologySummary) throws Exception {
        Object mem;
        Double cpu;
        Map<String, Double> resources;
        GlobalStreamId id;
        ComponentCommon common;
        String topologyName = topologySummary.get_name();
        LOG.info("Capturing {}...", (Object)topologyName);
        String topologyId = topologySummary.get_id();
        TopologyInfo info = client.getTopologyInfo(topologyId);
        TopologyPageInfo tpinfo = client.getTopologyPageInfo(topologyId, ":all-time", false);
        StormTopology topo = client.getUserTopology(topologyId);
        HashMap<String, Object> savedTopoConf = new HashMap<String, Object>();
        Map topoConf = (Map)JSONValue.parse(client.getTopologyConf(topologyId));
        for (String key : TopologyLoadConf.IMPORTANT_CONF_KEYS) {
            Object o = topoConf.get(key);
            if (o == null) continue;
            savedTopoConf.put(key, o);
            LOG.info("with config {}: {}", (Object)key, o);
        }
        int numWorkers = tpinfo.get_num_workers();
        if (savedTopoConf.containsKey("topology.workers")) {
            numWorkers = Math.max(numWorkers, ((Number)savedTopoConf.get("topology.workers")).intValue());
        }
        savedTopoConf.put("topology.workers", numWorkers);
        HashMap<String, LoadCompConf.Builder> boltBuilders = new HashMap<String, LoadCompConf.Builder>();
        HashMap<String, LoadCompConf.Builder> spoutBuilders = new HashMap<String, LoadCompConf.Builder>();
        ArrayList<Object> inputStreams = new ArrayList<Object>();
        HashMap<GlobalStreamId, OutputStream.Builder> outStreams = new HashMap<GlobalStreamId, OutputStream.Builder>();
        if (topo.get_bolts() != null) {
            for (Map.Entry entry : topo.get_bolts().entrySet()) {
                Map outputs;
                Object builder;
                String string = (String)entry.getKey();
                LOG.info("Found bolt {}...", (Object)string);
                Bolt bolt = (Bolt)entry.getValue();
                common = bolt.get_common();
                Map inputs = common.get_inputs();
                if (inputs != null) {
                    for (Map.Entry entry2 : inputs.entrySet()) {
                        id = (GlobalStreamId)entry2.getKey();
                        LOG.info("with input {}...", (Object)id);
                        Grouping grouping = (Grouping)entry2.getValue();
                        builder = new InputStream.Builder().withId(id.get_streamId()).withFromComponent(id.get_componentId()).withToComponent(string).withGroupingType(grouping);
                        inputStreams.add(builder);
                    }
                }
                if ((outputs = common.get_streams()) != null) {
                    for (String name : outputs.keySet()) {
                        GlobalStreamId id2 = new GlobalStreamId(string, name);
                        LOG.info("and output {}...", (Object)id2);
                        builder = new OutputStream.Builder().withId(name);
                        outStreams.put(id2, (OutputStream.Builder)builder);
                    }
                }
                LoadCompConf.Builder builder2 = new LoadCompConf.Builder().withParallelism(common.get_parallelism_hint()).withId(string);
                boltBuilders.put(string, builder2);
            }
            Map<String, Map<String, Double>> boltResources = CaptureLoad.getBoltsResources(topo, topoConf);
            for (Map.Entry entry : boltResources.entrySet()) {
                LoadCompConf.Builder builder = (LoadCompConf.Builder)boltBuilders.get(entry.getKey());
                if (builder == null) continue;
                resources = (Map<String, Double>)entry.getValue();
                cpu = (Double)resources.get("topology.component.cpu.pcore.percent");
                if (cpu != null) {
                    builder.withCpuLoad(cpu);
                }
                if ((mem = resources.get("topology.component.resources.onheap.memory.mb")) == null) continue;
                builder.withMemoryLoad((Double)mem);
            }
        }
        if (topo.get_spouts() != null) {
            for (Map.Entry entry : topo.get_spouts().entrySet()) {
                String string = (String)entry.getKey();
                LOG.info("Found Spout {}...", (Object)string);
                SpoutSpec spoutSpec = (SpoutSpec)entry.getValue();
                common = spoutSpec.get_common();
                Map outputs = common.get_streams();
                if (outputs != null) {
                    mem = outputs.keySet().iterator();
                    while (mem.hasNext()) {
                        String string2 = (String)mem.next();
                        id = new GlobalStreamId(string, string2);
                        LOG.info("with output {}...", (Object)id);
                        OutputStream.Builder builder = new OutputStream.Builder().withId(string2);
                        outStreams.put(id, builder);
                    }
                }
                LoadCompConf.Builder builder = new LoadCompConf.Builder().withParallelism(common.get_parallelism_hint()).withId(string);
                spoutBuilders.put(string, builder);
            }
            Map<String, Map<String, Double>> spoutResources = CaptureLoad.getSpoutsResources(topo, topoConf);
            for (Map.Entry<String, Map<String, Double>> entry : spoutResources.entrySet()) {
                LoadCompConf.Builder builder = (LoadCompConf.Builder)spoutBuilders.get(entry.getKey());
                if (builder == null) continue;
                resources = entry.getValue();
                cpu = resources.get("topology.component.cpu.pcore.percent");
                if (cpu != null) {
                    builder.withCpuLoad(cpu);
                }
                if ((mem = resources.get("topology.component.resources.onheap.memory.mb")) == null) continue;
                builder.withMemoryLoad((Double)mem);
            }
        }
        HashMap<String, ArrayList<ExecutorSummary>> byComponent = new HashMap<String, ArrayList<ExecutorSummary>>();
        for (ExecutorSummary executorSummary : info.get_executors()) {
            String string = executorSummary.get_component_id();
            ArrayList<ExecutorSummary> list = (ArrayList<ExecutorSummary>)byComponent.get(string);
            if (list == null) {
                list = new ArrayList<ExecutorSummary>();
                byComponent.put(string, list);
            }
            list.add(executorSummary);
        }
        ArrayList<InputStream> arrayList = new ArrayList<InputStream>(inputStreams.size());
        for (InputStream.Builder builder : inputStreams) {
            GlobalStreamId streamId = new GlobalStreamId(builder.getFromComponent(), builder.getId());
            List summaries = (List)byComponent.get(builder.getToComponent());
            builder.withProcessTime(new NormalDistStats(CaptureLoad.extractBoltValues(summaries, streamId, BoltStats::get_process_ms_avg)));
            builder.withExecTime(new NormalDistStats(CaptureLoad.extractBoltValues(summaries, streamId, BoltStats::get_execute_ms_avg)));
            arrayList.add(builder.build());
        }
        HashMap<CallSite, Integer> hashMap = new HashMap<CallSite, Integer>();
        for (WorkerSummary ws : tpinfo.get_workers()) {
            hashMap.put((CallSite)((Object)(ws.get_supervisor_id() + ":" + ws.get_port())), ws.get_uptime_secs());
        }
        LOG.debug("WORKER TO UPTIME {}", (Object)hashMap);
        for (Map.Entry entry : outStreams.entrySet()) {
            OutputStream.Builder builder = (OutputStream.Builder)entry.getValue();
            GlobalStreamId id3 = (GlobalStreamId)entry.getKey();
            ArrayList<Double> arrayList2 = new ArrayList<Double>();
            List summaries = (List)byComponent.get(id3.get_componentId());
            if (summaries != null) {
                for (ExecutorSummary summary : summaries) {
                    if (!summary.is_set_stats()) continue;
                    int uptime = summary.get_uptime_secs();
                    LOG.debug("UPTIME {}", (Object)uptime);
                    if (uptime <= 0) {
                        String key = summary.get_host() + ":" + summary.get_port();
                        uptime = hashMap.getOrDefault(key, 1);
                        LOG.debug("Getting uptime for worker {}, {}", (Object)key, (Object)uptime);
                    }
                    for (Map.Entry statEntry : summary.get_stats().get_emitted().entrySet()) {
                        String timeWindow = (String)statEntry.getKey();
                        long timeSecs = uptime;
                        try {
                            timeSecs = Long.valueOf(timeWindow);
                        }
                        catch (NumberFormatException numberFormatException) {
                            // empty catch block
                        }
                        timeSecs = Math.min(timeSecs, (long)uptime);
                        Long count = (Long)((Map)statEntry.getValue()).get(id3.get_streamId());
                        if (count == null) continue;
                        LOG.debug("{} emitted {} for {} secs or {} tuples/sec", id3, count, timeSecs, count.doubleValue() / (double)timeSecs);
                        arrayList2.add(count.doubleValue() / (double)timeSecs);
                    }
                }
            }
            builder.withRate(new NormalDistStats(arrayList2));
            LoadCompConf.Builder comp = (LoadCompConf.Builder)boltBuilders.get(id3.get_componentId());
            if (comp == null) {
                comp = (LoadCompConf.Builder)spoutBuilders.get(id3.get_componentId());
            }
            comp.withStream(builder.build());
        }
        List<LoadCompConf> list = spoutBuilders.values().stream().map(b -> b.build()).collect(Collectors.toList());
        List<LoadCompConf> bolts = boltBuilders.values().stream().map(b -> b.build()).collect(Collectors.toList());
        return new TopologyLoadConf(topologyName, savedTopoConf, list, bolts, arrayList);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws Exception {
        Options options = new Options();
        options.addOption(Option.builder("a").longOpt("anonymize").desc("Strip out any possibly identifiable information").build());
        options.addOption(Option.builder("o").longOpt("output-dir").argName("<file>").hasArg().desc("Where to write (defaults to ./loadgen/)").build());
        options.addOption(Option.builder("h").longOpt("help").desc("Print a help message").build());
        DefaultParser parser = new DefaultParser();
        CommandLine cmd = null;
        boolean printHelp = false;
        try {
            cmd = parser.parse(options, args);
        }
        catch (ParseException e) {
            System.err.println("ERROR " + e.getMessage());
            printHelp = true;
        }
        if (printHelp || cmd.hasOption('h')) {
            new HelpFormatter().printHelp("CaptureLoad [options] [topologyName]*", options);
            return;
        }
        Config conf = new Config();
        int exitStatus = -1;
        String outputDir = DEFAULT_OUT_DIR;
        if (cmd.hasOption('o')) {
            outputDir = cmd.getOptionValue('o');
        }
        File baseOut = new File(outputDir);
        LOG.info("Will save captured topologies to {}", (Object)baseOut);
        baseOut.mkdirs();
        try (NimbusClient nc = NimbusClient.Builder.withConf((Map)conf).build();){
            Nimbus.Iface client = nc.getClient();
            List<String> topologyNames = cmd.getArgList();
            for (TopologySummary topologySummary : client.getTopologySummaries()) {
                if (!topologyNames.isEmpty() && !topologyNames.contains(topologySummary.get_name())) continue;
                TopologyLoadConf capturedConf = CaptureLoad.captureTopology(client, topologySummary);
                if (cmd.hasOption('a')) {
                    capturedConf = capturedConf.anonymize();
                }
                capturedConf.writeTo(new File(baseOut, capturedConf.name + ".yaml"));
            }
            exitStatus = 0;
        }
        catch (Exception e) {
            LOG.error("Error trying to capture topologies...", e);
        }
        finally {
            System.exit(exitStatus);
        }
    }

    static Map<String, Map<String, Double>> getBoltsResources(StormTopology topology, Map<String, Object> topologyConf) {
        HashMap<String, Map<String, Double>> boltResources = new HashMap<String, Map<String, Double>>();
        if (topology.get_bolts() != null) {
            for (Map.Entry bolt : topology.get_bolts().entrySet()) {
                Map<String, Double> topologyResources = CaptureLoad.parseResources(((Bolt)bolt.getValue()).get_common().get_json_conf());
                CaptureLoad.checkInitialization(topologyResources, ((Bolt)bolt.getValue()).toString(), topologyConf);
                boltResources.put((String)bolt.getKey(), topologyResources);
            }
        }
        return boltResources;
    }

    static Map<String, Map<String, Double>> getSpoutsResources(StormTopology topology, Map<String, Object> topologyConf) {
        HashMap<String, Map<String, Double>> spoutResources = new HashMap<String, Map<String, Double>>();
        if (topology.get_spouts() != null) {
            for (Map.Entry spout : topology.get_spouts().entrySet()) {
                Map<String, Double> topologyResources = CaptureLoad.parseResources(((SpoutSpec)spout.getValue()).get_common().get_json_conf());
                CaptureLoad.checkInitialization(topologyResources, ((SpoutSpec)spout.getValue()).toString(), topologyConf);
                spoutResources.put((String)spout.getKey(), topologyResources);
            }
        }
        return spoutResources;
    }

    static Map<String, Double> parseResources(String input) {
        HashMap<String, Double> topologyResources = new HashMap<String, Double>();
        JSONParser parser = new JSONParser();
        LOG.debug("Input to parseResources {}", (Object)input);
        try {
            if (input != null) {
                Object obj = parser.parse(input);
                JSONObject jsonObject = (JSONObject)obj;
                if (jsonObject.containsKey("topology.component.resources.onheap.memory.mb")) {
                    Double topoMemOnHeap = ObjectReader.getDouble(jsonObject.get("topology.component.resources.onheap.memory.mb"), null);
                    topologyResources.put("topology.component.resources.onheap.memory.mb", topoMemOnHeap);
                }
                if (jsonObject.containsKey("topology.component.resources.offheap.memory.mb")) {
                    Double topoMemOffHeap = ObjectReader.getDouble(jsonObject.get("topology.component.resources.offheap.memory.mb"), null);
                    topologyResources.put("topology.component.resources.offheap.memory.mb", topoMemOffHeap);
                }
                if (jsonObject.containsKey("topology.component.cpu.pcore.percent")) {
                    Double topoCpu = ObjectReader.getDouble(jsonObject.get("topology.component.cpu.pcore.percent"), null);
                    topologyResources.put("topology.component.cpu.pcore.percent", topoCpu);
                }
                LOG.debug("Topology Resources {}", (Object)topologyResources);
            }
        }
        catch (net.minidev.json.parser.ParseException e) {
            LOG.error("Failed to parse component resources is:" + e.toString(), e);
            return null;
        }
        return topologyResources;
    }

    public static void checkInitialization(Map<String, Double> topologyResources, String componentId, Map<String, Object> topologyConf) {
        StringBuilder msgBuilder = new StringBuilder();
        for (String resourceName : topologyResources.keySet()) {
            msgBuilder.append(CaptureLoad.checkInitResource(topologyResources, topologyConf, resourceName));
        }
        if (msgBuilder.length() > 0) {
            String resourceDefaults = msgBuilder.toString();
            LOG.debug("Unable to extract resource requirement for Component {} \n Resources : {}", (Object)componentId, (Object)resourceDefaults);
        }
    }

    private static String checkInitResource(Map<String, Double> topologyResources, Map<String, Object> topologyConf, String resourceName) {
        Double resourceValue;
        StringBuilder msgBuilder = new StringBuilder();
        if (topologyResources.containsKey(resourceName) && (resourceValue = (Double)topologyConf.getOrDefault(resourceName, null)) != null) {
            topologyResources.put(resourceName, resourceValue);
            msgBuilder.append(resourceName.substring(resourceName.lastIndexOf(".")) + " has been set to " + resourceValue);
        }
        return msgBuilder.toString();
    }
}

