package org.apache.storm.daemon.nimbus;

import com.codahale.metrics.CachedGauge;
import com.codahale.metrics.DerivativeGauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
import com.codahale.metrics.SlidingTimeWindowReservoir;
import com.codahale.metrics.Timer;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.BindException;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import javax.security.auth.Subject;
import org.apache.storm.DaemonConfig;
import org.apache.storm.StormTimer;
import org.apache.storm.blobstore.AtomicOutputStream;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.BlobStoreAclHandler;
import org.apache.storm.blobstore.InputStreamWithMeta;
import org.apache.storm.blobstore.KeySequenceNumber;
import org.apache.storm.blobstore.LocalFsBlobStore;
import org.apache.storm.callback.DefaultWatcherCallBack;
import org.apache.storm.cluster.ClusterStateContext;
import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.DaemonType;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.container.oci.OciUtils;
import org.apache.storm.daemon.DaemonCommon;
import org.apache.storm.daemon.Shutdownable;
import org.apache.storm.daemon.StormCommon;
import org.apache.storm.daemon.supervisor.Supervisor;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.Assignment;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.BeginDownloadResult;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.BoltAggregateStats;
import org.apache.storm.generated.ClusterSummary;
import org.apache.storm.generated.CommonAggregateStats;
import org.apache.storm.generated.ComponentAggregateStats;
import org.apache.storm.generated.ComponentPageInfo;
import org.apache.storm.generated.ComponentType;
import org.apache.storm.generated.Credentials;
import org.apache.storm.generated.DebugOptions;
import org.apache.storm.generated.ErrorInfo;
import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.ExecutorSummary;
import org.apache.storm.generated.GetInfoOptions;
import org.apache.storm.generated.IllegalStateException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.KeyAlreadyExistsException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.KillOptions;
import org.apache.storm.generated.LSTopoHistory;
import org.apache.storm.generated.LogConfig;
import org.apache.storm.generated.LogLevel;
import org.apache.storm.generated.LogLevelAction;
import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.NimbusSummary;
import org.apache.storm.generated.NodeInfo;
import org.apache.storm.generated.NotAliveException;
import org.apache.storm.generated.NumErrorsChoice;
import org.apache.storm.generated.OwnerResourceSummary;
import org.apache.storm.generated.ProfileAction;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.ReadableBlobMeta;
import org.apache.storm.generated.RebalanceOptions;
import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.generated.SpecificAggregateStats;
import org.apache.storm.generated.SpoutAggregateStats;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormBase;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.SubmitOptions;
import org.apache.storm.generated.SupervisorAssignments;
import org.apache.storm.generated.SupervisorInfo;
import org.apache.storm.generated.SupervisorPageInfo;
import org.apache.storm.generated.SupervisorSummary;
import org.apache.storm.generated.SupervisorWorkerHeartbeat;
import org.apache.storm.generated.SupervisorWorkerHeartbeats;
import org.apache.storm.generated.TopologyActionOptions;
import org.apache.storm.generated.TopologyHistoryInfo;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.generated.TopologyInitialStatus;
import org.apache.storm.generated.TopologyPageInfo;
import org.apache.storm.generated.TopologyStatus;
import org.apache.storm.generated.TopologySummary;
import org.apache.storm.generated.WorkerMetricPoint;
import org.apache.storm.generated.WorkerMetrics;
import org.apache.storm.generated.WorkerResources;
import org.apache.storm.generated.WorkerSummary;
import org.apache.storm.logging.ThriftAccessLogger;
import org.apache.storm.metric.ClusterMetricsConsumerExecutor;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.metric.api.DataPoint;
import org.apache.storm.metric.api.IClusterMetricsConsumer;
import org.apache.storm.metricstore.AggLevel;
import org.apache.storm.metricstore.MetricStore;
import org.apache.storm.metricstore.MetricStoreConfig;
import org.apache.storm.nimbus.AssignmentDistributionService;
import org.apache.storm.nimbus.DefaultTopologyValidator;
import org.apache.storm.nimbus.ILeaderElector;
import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
import org.apache.storm.nimbus.ITopologyValidator;
import org.apache.storm.nimbus.IWorkerHeartbeatsRecoveryStrategy;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.nimbus.WorkerHeartbeatsRecoveryStrategyFactory;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.DefaultScheduler;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.INimbus;
import org.apache.storm.scheduler.IScheduler;
import org.apache.storm.scheduler.SchedulerAssignment;
import org.apache.storm.scheduler.SchedulerAssignmentImpl;
import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.SupervisorResources;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.blacklist.BlacklistScheduler;
import org.apache.storm.scheduler.multitenant.MultitenantScheduler;
import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
import org.apache.storm.scheduler.resource.ResourceUtils;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
import org.apache.storm.security.INimbusCredentialPlugin;
import org.apache.storm.security.auth.ClientAuthUtils;
import org.apache.storm.security.auth.IAuthorizer;
import org.apache.storm.security.auth.ICredentialsRenewer;
import org.apache.storm.security.auth.IGroupMappingServiceProvider;
import org.apache.storm.security.auth.IPrincipalToLocal;
import org.apache.storm.security.auth.NimbusPrincipal;
import org.apache.storm.security.auth.ReqContext;
import org.apache.storm.security.auth.ThriftConnectionType;
import org.apache.storm.security.auth.ThriftServer;
import org.apache.storm.security.auth.workertoken.WorkerTokenManager;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.shade.com.google.common.base.Strings;
import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
import org.apache.storm.shade.com.google.common.collect.MapDifference;
import org.apache.storm.shade.com.google.common.collect.Maps;
import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
import org.apache.storm.shade.org.apache.zookeeper.ZooDefs;
import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
import org.apache.storm.stats.ClientStatsUtil;
import org.apache.storm.stats.StatsUtil;
import org.apache.storm.thrift.TException;
import org.apache.storm.utils.BufferInputStream;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ReflectionUtils;
import org.apache.storm.utils.RotatingMap;
import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.SimpleVersion;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.TimeCacheMap;
import org.apache.storm.utils.TupleUtils;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.VersionInfo;
import org.apache.storm.utils.WrappedAlreadyAliveException;
import org.apache.storm.utils.WrappedAuthorizationException;
import org.apache.storm.utils.WrappedIllegalStateException;
import org.apache.storm.utils.WrappedInvalidTopologyException;
import org.apache.storm.utils.WrappedNotAliveException;
import org.apache.storm.validation.ConfigValidation;
import org.apache.storm.zookeeper.AclEnforcement;
import org.apache.storm.zookeeper.ClientZookeeper;
import org.apache.storm.zookeeper.Zookeeper;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/daemon/nimbus/Nimbus.class */
public class Nimbus implements Nimbus.Iface, Shutdownable, DaemonCommon {

    @VisibleForTesting
    public static final List<ACL> ZK_ACLS;
    public static final SimpleVersion MIN_VERSION_SUPPORT_RPC_HEARTBEAT;
    private static final Logger LOG;
    private final Meter submitTopologyWithOptsCalls;
    private final Meter submitTopologyCalls;
    private final Meter killTopologyWithOptsCalls;
    private final Meter killTopologyCalls;
    private final Meter rebalanceCalls;
    private final Meter activateCalls;
    private final Meter deactivateCalls;
    private final Meter debugCalls;
    private final Meter setWorkerProfilerCalls;
    private final Meter getComponentPendingProfileActionsCalls;
    private final Meter setLogConfigCalls;
    private final Meter uploadNewCredentialsCalls;
    private final Meter beginFileUploadCalls;
    private final Meter uploadChunkCalls;
    private final Meter finishFileUploadCalls;
    private final Meter downloadChunkCalls;
    private final Meter getNimbusConfCalls;
    private final Meter getLogConfigCalls;
    private final Meter getTopologyConfCalls;
    private final Meter getTopologyCalls;
    private final Meter getUserTopologyCalls;
    private final Meter getClusterInfoCalls;
    private final Meter getTopologySummariesCalls;
    private final Meter getTopologySummaryCalls;
    private final Meter getTopologySummaryByNameCalls;
    private final Meter getLeaderCalls;
    private final Meter isTopologyNameAllowedCalls;
    private final Meter getTopologyInfoWithOptsCalls;
    private final Meter getTopologyInfoCalls;
    private final Meter getTopologyInfoByNameCalls;
    private final Meter getTopologyInfoByNameWithOptsCalls;
    private final Meter getTopologyPageInfoCalls;
    private final Meter getSupervisorPageInfoCalls;
    private final Meter getComponentPageInfoCalls;
    private final Meter getOwnerResourceSummariesCalls;
    private final Meter shutdownCalls;
    private final Meter processWorkerMetricsCalls;
    private final Meter mkAssignmentsErrors;
    private final Meter sendAssignmentExceptions;
    private final Timer fileUploadDuration;
    private final Timer schedulingDuration;
    private final Histogram numAddedExecPerScheduling;
    private final Histogram numAddedSlotPerScheduling;
    private final Histogram numRemovedExecPerScheduling;
    private final Histogram numRemovedSlotPerScheduling;
    private final Histogram numNetExecIncreasePerScheduling;
    private final Histogram numNetSlotIncreasePerScheduling;
    private static final String STORM_VERSION;
    public static final Subject NIMBUS_SUBJECT;
    private static final TopologyStateTransition NOOP_TRANSITION;
    private static final TopologyStateTransition INACTIVE_TRANSITION;
    private static final TopologyStateTransition ACTIVE_TRANSITION;
    private static final TopologyStateTransition REMOVE_TRANSITION;
    private static final TopologyStateTransition DO_REBALANCE_TRANSITION;
    private static final TopologyStateTransition KILL_TRANSITION;
    private static final TopologyStateTransition REBALANCE_TRANSITION;
    private static final TopologyStateTransition GAIN_LEADERSHIP_WHEN_KILLED_TRANSITION;
    private static final TopologyStateTransition GAIN_LEADERSHIP_WHEN_REBALANCING_TRANSITION;
    private static final Map<TopologyStatus, Map<TopologyActions, TopologyStateTransition>> TOPO_STATE_TRANSITIONS;
    private static final List<String> EMPTY_STRING_LIST;
    private static final Set<String> EMPTY_STRING_SET;
    private static final RotatingMap<String, Long> topologyCleanupDetected;
    private static long topologyCleanupRotationTime;
    private final Map<String, Object> conf;
    private final NavigableMap<SimpleVersion, List<String>> supervisorClasspaths;
    private final NimbusInfo nimbusHostPortInfo;
    private final INimbus inimbus;
    private final IAuthorizer impersonationAuthorizationHandler;
    private final AtomicLong submittedCount;
    private final IStormClusterState stormClusterState;
    private final Object submitLock;
    private final Object schedLock;
    private final Object credUpdateLock;
    private final HeartbeatCache heartbeatsCache;
    private final AtomicBoolean heartbeatsReadyFlag;
    private final IWorkerHeartbeatsRecoveryStrategy heartbeatsRecoveryStrategy;
    private final TimeCacheMap<String, BufferInputStream> downloaders;
    private final TimeCacheMap<String, WritableByteChannel> uploaders;
    private final BlobStore blobStore;
    private final TopoCache topoCache;
    private final TimeCacheMap<String, BufferInputStream> blobDownloaders;
    private final TimeCacheMap<String, OutputStream> blobUploaders;
    private final TimeCacheMap<String, Iterator<String>> blobListers;
    private final Utils.UptimeComputer uptime;
    private final ITopologyValidator validator;
    private final StormTimer timer;
    private final IScheduler scheduler;
    private final IScheduler underlyingScheduler;
    private final AtomicReference<Long> schedulingStartTimeNs;
    private final AtomicLong longestSchedulingTime;
    private final ILeaderElector leaderElector;
    private final AssignmentDistributionService assignmentsDistributer;
    private final AtomicReference<Map<String, String>> idToSchedStatus;
    private final AtomicReference<Map<String, SupervisorResources>> nodeIdToResources;
    private final AtomicReference<Map<String, TopologyResources>> idToResources;
    private final AtomicReference<Map<String, Map<WorkerSlot, WorkerResources>>> idToWorkerResources;
    private final Collection<ICredentialsRenewer> credRenewers;
    private final Object topologyHistoryLock;
    private final LocalState topologyHistoryState;
    private final Collection<INimbusCredentialPlugin> nimbusAutocredPlugins;
    private final ITopologyActionNotifierPlugin nimbusTopologyActionNotifier;
    private final List<ClusterMetricsConsumerExecutor> clusterConsumerExceutors;
    private final IGroupMappingServiceProvider groupMapper;
    private final IPrincipalToLocal principalToLocal;
    private final StormMetricsRegistry metricsRegistry;
    private final ResourceMetrics resourceMetrics;
    private final ClusterSummaryMetricSet clusterMetricSet;
    private MetricStore metricsStore;
    private IAuthorizer authorizationHandler;
    private final CuratorFramework zkClient;
    private AtomicReference<Map<String, Set<List<Integer>>>> idToExecutors;
    private WorkerTokenManager workerTokenManager;
    private boolean wasLeader;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.storm.daemon.nimbus.Nimbus$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/storm/daemon/nimbus/Nimbus$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$storm$generated$TopologyInitialStatus;
        static final /* synthetic */ int[] $SwitchMap$org$apache$storm$generated$LogLevelAction;
        static final /* synthetic */ int[] $SwitchMap$org$apache$storm$generated$NumErrorsChoice = new int[NumErrorsChoice.values().length];

        static {
            try {
                $SwitchMap$org$apache$storm$generated$NumErrorsChoice[NumErrorsChoice.NONE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$storm$generated$NumErrorsChoice[NumErrorsChoice.ONE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$storm$generated$NumErrorsChoice[NumErrorsChoice.ALL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$org$apache$storm$generated$LogLevelAction = new int[LogLevelAction.values().length];
            try {
                $SwitchMap$org$apache$storm$generated$LogLevelAction[LogLevelAction.UPDATE.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$storm$generated$LogLevelAction[LogLevelAction.REMOVE.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            $SwitchMap$org$apache$storm$generated$TopologyInitialStatus = new int[TopologyInitialStatus.values().length];
            try {
                $SwitchMap$org$apache$storm$generated$TopologyInitialStatus[TopologyInitialStatus.INACTIVE.ordinal()] = 1;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$storm$generated$TopologyInitialStatus[TopologyInitialStatus.ACTIVE.ordinal()] = 2;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/daemon/nimbus/Nimbus$Assoc.class */
    public static final class Assoc<K, V> implements UnaryOperator<Map<K, V>> {
        private final K key;
        private final V value;

        Assoc(K k, V v) {
            this.key = k;
            this.value = v;
        }

        @Override // java.util.function.Function
        public Map<K, V> apply(Map<K, V> map) {
            HashMap hashMap = new HashMap(map);
            hashMap.put(this.key, this.value);
            return hashMap;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/daemon/nimbus/Nimbus$ClusterSummaryMetricSet.class */
    public class ClusterSummaryMetricSet implements Runnable {
        private static final int CACHING_WINDOW = 5;
        private final ClusterSummaryMetrics clusterSummaryMetrics = new ClusterSummaryMetrics(null);
        private final Function<String, Histogram> registerHistogram = str -> {
            Metric histogram = new Histogram(new SlidingTimeWindowReservoir(2L, TimeUnit.SECONDS));
            this.clusterSummaryMetrics.put(str, histogram);
            return histogram;
        };
        private volatile boolean active = false;
        private final Histogram nimbusUptime = this.registerHistogram.apply("nimbuses:uptime-secs");
        private final Histogram supervisorsUptime = this.registerHistogram.apply("supervisors:uptime-secs");
        private final Histogram supervisorsNumWorkers = this.registerHistogram.apply("supervisors:num-workers");
        private final Histogram supervisorsNumUsedWorkers = this.registerHistogram.apply("supervisors:num-used-workers");
        private final Histogram supervisorsUsedMem = this.registerHistogram.apply("supervisors:used-mem");
        private final Histogram supervisorsUsedCpu = this.registerHistogram.apply("supervisors:used-cpu");
        private final Histogram supervisorsFragmentedMem = this.registerHistogram.apply("supervisors:fragmented-mem");
        private final Histogram supervisorsFragmentedCpu = this.registerHistogram.apply("supervisors:fragmented-cpu");
        private final Histogram topologiesNumTasks = this.registerHistogram.apply("topologies:num-tasks");
        private final Histogram topologiesNumExecutors = this.registerHistogram.apply("topologies:num-executors");
        private final Histogram topologiesNumWorker = this.registerHistogram.apply("topologies:num-workers");
        private final Histogram topologiesUptime = this.registerHistogram.apply("topologies:uptime-secs");
        private final Histogram topologiesReplicationCount = this.registerHistogram.apply("topologies:replication-count");
        private final Histogram topologiesRequestedMemOnHeap = this.registerHistogram.apply("topologies:requested-mem-on-heap");
        private final Histogram topologiesRequestedMemOffHeap = this.registerHistogram.apply("topologies:requested-mem-off-heap");
        private final Histogram topologiesRequestedCpu = this.registerHistogram.apply("topologies:requested-cpu");
        private final Histogram topologiesAssignedMemOnHeap = this.registerHistogram.apply("topologies:assigned-mem-on-heap");
        private final Histogram topologiesAssignedMemOffHeap = this.registerHistogram.apply("topologies:assigned-mem-off-heap");
        private final Histogram topologiesAssignedCpu = this.registerHistogram.apply("topologies:assigned-cpu");
        private final StormMetricsRegistry metricsRegistry;
        static final /* synthetic */ boolean $assertionsDisabled;

        ClusterSummaryMetricSet(StormMetricsRegistry stormMetricsRegistry) {
            this.metricsRegistry = stormMetricsRegistry;
            if (!$assertionsDisabled && (ClusterSummary._Fields.values().length != 3 || ClusterSummary._Fields.findByName("supervisors") != ClusterSummary._Fields.SUPERVISORS || ClusterSummary._Fields.findByName("topologies") != ClusterSummary._Fields.TOPOLOGIES || ClusterSummary._Fields.findByName("nimbuses") != ClusterSummary._Fields.NIMBUSES)) {
                throw new AssertionError();
            }
            CachedGauge<ClusterSummary> cachedGauge = new CachedGauge<ClusterSummary>(5L, TimeUnit.SECONDS) { // from class: org.apache.storm.daemon.nimbus.Nimbus.ClusterSummaryMetricSet.1
                /* JADX INFO: Access modifiers changed from: protected */
                /* renamed from: loadValue, reason: merged with bridge method [inline-methods] */
                public ClusterSummary m30loadValue() {
                    try {
                        ClusterSummary clusterInfoImpl = Nimbus.this.getClusterInfoImpl();
                        Nimbus.LOG.debug("The new summary is {}", clusterInfoImpl);
                        ClusterSummaryMetricSet.this.updateHistogram(clusterInfoImpl);
                        return clusterInfoImpl;
                    } catch (Exception e) {
                        Nimbus.LOG.warn("Get cluster info exception.", e);
                        throw new RuntimeException(e);
                    }
                }
            };
            this.clusterSummaryMetrics.put("cluster:num-nimbus-leaders", new DerivativeGauge<ClusterSummary, Long>(cachedGauge) { // from class: org.apache.storm.daemon.nimbus.Nimbus.ClusterSummaryMetricSet.2
                /* JADX INFO: Access modifiers changed from: protected */
                public Long transform(ClusterSummary clusterSummary) {
                    return Long.valueOf(clusterSummary.get_nimbuses().stream().filter((v0) -> {
                        return v0.is_isLeader();
                    }).count());
                }
            });
            this.clusterSummaryMetrics.put("cluster:num-nimbuses", new DerivativeGauge<ClusterSummary, Integer>(cachedGauge) { // from class: org.apache.storm.daemon.nimbus.Nimbus.ClusterSummaryMetricSet.3
                /* JADX INFO: Access modifiers changed from: protected */
                public Integer transform(ClusterSummary clusterSummary) {
                    return Integer.valueOf(clusterSummary.get_nimbuses_size());
                }
            });
            this.clusterSummaryMetrics.put("cluster:num-supervisors", new DerivativeGauge<ClusterSummary, Integer>(cachedGauge) { // from class: org.apache.storm.daemon.nimbus.Nimbus.ClusterSummaryMetricSet.4
                /* JADX INFO: Access modifiers changed from: protected */
                public Integer transform(ClusterSummary clusterSummary) {
                    return Integer.valueOf(clusterSummary.get_supervisors_size());
                }
            });
            this.clusterSummaryMetrics.put("cluster:num-topologies", new DerivativeGauge<ClusterSummary, Integer>(cachedGauge) { // from class: org.apache.storm.daemon.nimbus.Nimbus.ClusterSummaryMetricSet.5
                /* JADX INFO: Access modifiers changed from: protected */
                public Integer transform(ClusterSummary clusterSummary) {
                    return Integer.valueOf(clusterSummary.get_topologies_size());
                }
            });
            this.clusterSummaryMetrics.put("cluster:num-total-workers", new DerivativeGauge<ClusterSummary, Integer>(cachedGauge) { // from class: org.apache.storm.daemon.nimbus.Nimbus.ClusterSummaryMetricSet.6
                /* JADX INFO: Access modifiers changed from: protected */
                public Integer transform(ClusterSummary clusterSummary) {
                    return Integer.valueOf(clusterSummary.get_supervisors().stream().mapToInt((v0) -> {
                        return v0.get_num_workers();
                    }).sum());
                }
            });
            this.clusterSummaryMetrics.put("cluster:num-total-used-workers", new DerivativeGauge<ClusterSummary, Integer>(cachedGauge) { // from class: org.apache.storm.daemon.nimbus.Nimbus.ClusterSummaryMetricSet.7
                /* JADX INFO: Access modifiers changed from: protected */
                public Integer transform(ClusterSummary clusterSummary) {
                    return Integer.valueOf(clusterSummary.get_supervisors().stream().mapToInt((v0) -> {
                        return v0.get_num_used_workers();
                    }).sum());
                }
            });
            this.clusterSummaryMetrics.put("cluster:total-fragmented-memory-non-negative", new DerivativeGauge<ClusterSummary, Double>(cachedGauge) { // from class: org.apache.storm.daemon.nimbus.Nimbus.ClusterSummaryMetricSet.8
                /* JADX INFO: Access modifiers changed from: protected */
                public Double transform(ClusterSummary clusterSummary) {
                    return Double.valueOf(clusterSummary.get_supervisors().stream().mapToDouble(supervisorSummary -> {
                        return Math.max(supervisorSummary.get_fragmented_mem(), 0.0d);
                    }).sum());
                }
            });
            this.clusterSummaryMetrics.put("cluster:total-fragmented-cpu-non-negative", new DerivativeGauge<ClusterSummary, Double>(cachedGauge) { // from class: org.apache.storm.daemon.nimbus.Nimbus.ClusterSummaryMetricSet.9
                /* JADX INFO: Access modifiers changed from: protected */
                public Double transform(ClusterSummary clusterSummary) {
                    return Double.valueOf(clusterSummary.get_supervisors().stream().mapToDouble(supervisorSummary -> {
                        return Math.max(supervisorSummary.get_fragmented_cpu(), 0.0d);
                    }).sum());
                }
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void updateHistogram(ClusterSummary clusterSummary) {
            Iterator it = clusterSummary.get_nimbuses().iterator();
            while (it.hasNext()) {
                this.nimbusUptime.update(((NimbusSummary) it.next()).get_uptime_secs());
            }
            for (SupervisorSummary supervisorSummary : clusterSummary.get_supervisors()) {
                this.supervisorsUptime.update(supervisorSummary.get_uptime_secs());
                this.supervisorsNumWorkers.update(supervisorSummary.get_num_workers());
                this.supervisorsNumUsedWorkers.update(supervisorSummary.get_num_used_workers());
                this.supervisorsUsedMem.update(Math.round(supervisorSummary.get_used_mem()));
                this.supervisorsUsedCpu.update(Math.round(supervisorSummary.get_used_cpu()));
                this.supervisorsFragmentedMem.update(Math.round(supervisorSummary.get_fragmented_mem()));
                this.supervisorsFragmentedCpu.update(Math.round(supervisorSummary.get_fragmented_cpu()));
            }
            for (TopologySummary topologySummary : clusterSummary.get_topologies()) {
                this.topologiesNumTasks.update(topologySummary.get_num_tasks());
                this.topologiesNumExecutors.update(topologySummary.get_num_executors());
                this.topologiesNumWorker.update(topologySummary.get_num_workers());
                this.topologiesUptime.update(topologySummary.get_uptime_secs());
                this.topologiesReplicationCount.update(topologySummary.get_replication_count());
                this.topologiesRequestedMemOnHeap.update(Math.round(topologySummary.get_requested_memonheap()));
                this.topologiesRequestedMemOffHeap.update(Math.round(topologySummary.get_requested_memoffheap()));
                this.topologiesRequestedCpu.update(Math.round(topologySummary.get_requested_cpu()));
                this.topologiesAssignedMemOnHeap.update(Math.round(topologySummary.get_assigned_memonheap()));
                this.topologiesAssignedMemOffHeap.update(Math.round(topologySummary.get_assigned_memoffheap()));
                this.topologiesAssignedCpu.update(Math.round(topologySummary.get_assigned_cpu()));
            }
        }

        void setActive(boolean z) {
            if (this.active != z) {
                this.active = z;
                if (z) {
                    this.metricsRegistry.registerAll(this.clusterSummaryMetrics);
                } else {
                    this.metricsRegistry.removeAll(this.clusterSummaryMetrics);
                }
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                setActive(Nimbus.this.isLeader());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        static {
            $assertionsDisabled = !Nimbus.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/daemon/nimbus/Nimbus$ClusterSummaryMetrics.class */
    public static class ClusterSummaryMetrics implements MetricSet {
        private static final String SUMMARY = "summary";
        private final Map<String, Metric> metrics;

        private ClusterSummaryMetrics() {
            this.metrics = new HashMap();
        }

        public Metric put(String str, Metric metric) {
            return this.metrics.put(MetricRegistry.name(SUMMARY, new String[]{str}), metric);
        }

        public Map<String, Metric> getMetrics() {
            return this.metrics;
        }

        /* synthetic */ ClusterSummaryMetrics(AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/daemon/nimbus/Nimbus$CommonTopoInfo.class */
    public static class CommonTopoInfo {
        public Map<String, Object> topoConf;
        public String topoName;
        public StormTopology topology;
        public Map<Integer, String> taskToComponent;
        public StormBase base;
        public int launchTimeSecs;
        public Assignment assignment;
        public Map<List<Integer>, Map<String, Object>> beats;
        public HashSet<String> allComponents;

        private CommonTopoInfo() {
        }

        /* synthetic */ CommonTopoInfo(AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/daemon/nimbus/Nimbus$Dissoc.class */
    public static final class Dissoc<K, V> implements UnaryOperator<Map<K, V>> {
        private final K key;

        Dissoc(K k) {
            this.key = k;
        }

        @Override // java.util.function.Function
        public Map<K, V> apply(Map<K, V> map) {
            HashMap hashMap = new HashMap(map);
            hashMap.remove(this.key);
            return hashMap;
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/storm/daemon/nimbus/Nimbus$StandaloneINimbus.class */
    public static class StandaloneINimbus implements INimbus {
        @Override // org.apache.storm.scheduler.INimbus
        public void prepare(Map<String, Object> map, String str) {
        }

        @Override // org.apache.storm.scheduler.INimbus
        public Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> collection, Topologies topologies, Set<String> set) {
            HashSet hashSet = new HashSet();
            for (SupervisorDetails supervisorDetails : collection) {
                String id = supervisorDetails.getId();
                Iterator it = ((Collection) supervisorDetails.getMeta()).iterator();
                while (it.hasNext()) {
                    hashSet.add(new WorkerSlot(id, (Number) it.next()));
                }
            }
            return hashSet;
        }

        @Override // org.apache.storm.scheduler.INimbus
        public void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> map) {
        }

        @Override // org.apache.storm.scheduler.INimbus
        public String getHostName(Map<String, SupervisorDetails> map, String str) {
            SupervisorDetails supervisorDetails = map.get(str);
            if (supervisorDetails != null) {
                return supervisorDetails.getHost();
            }
            return null;
        }

        @Override // org.apache.storm.scheduler.INimbus
        public IScheduler getForcedScheduler() {
            return null;
        }
    }

    public static List<ACL> getNimbusAcls(Map<String, Object> map) {
        List<ACL> list = null;
        if (Utils.isZkAuthenticationConfiguredStormServer(map)) {
            list = ZK_ACLS;
        }
        return list;
    }

    public Nimbus(Map<String, Object> map, INimbus iNimbus, StormMetricsRegistry stormMetricsRegistry) throws Exception {
        this(map, iNimbus, null, null, null, null, null, stormMetricsRegistry);
    }

    public Nimbus(Map<String, Object> map, INimbus iNimbus, IStormClusterState iStormClusterState, NimbusInfo nimbusInfo, BlobStore blobStore, ILeaderElector iLeaderElector, IGroupMappingServiceProvider iGroupMappingServiceProvider, StormMetricsRegistry stormMetricsRegistry) throws Exception {
        this(map, iNimbus, iStormClusterState, nimbusInfo, blobStore, null, iLeaderElector, iGroupMappingServiceProvider, stormMetricsRegistry);
    }

    public Nimbus(Map<String, Object> map, INimbus iNimbus, IStormClusterState iStormClusterState, NimbusInfo nimbusInfo, BlobStore blobStore, TopoCache topoCache, ILeaderElector iLeaderElector, IGroupMappingServiceProvider iGroupMappingServiceProvider, StormMetricsRegistry stormMetricsRegistry) throws Exception {
        this.submitLock = new Object();
        this.schedLock = new Object();
        this.credUpdateLock = new Object();
        this.schedulingStartTimeNs = new AtomicReference<>(null);
        this.longestSchedulingTime = new AtomicLong();
        this.wasLeader = false;
        this.conf = map;
        this.metricsRegistry = stormMetricsRegistry;
        this.resourceMetrics = new ResourceMetrics(stormMetricsRegistry);
        this.submitTopologyWithOptsCalls = stormMetricsRegistry.registerMeter("nimbus:num-submitTopologyWithOpts-calls");
        this.submitTopologyCalls = stormMetricsRegistry.registerMeter("nimbus:num-submitTopology-calls");
        this.killTopologyWithOptsCalls = stormMetricsRegistry.registerMeter("nimbus:num-killTopologyWithOpts-calls");
        this.killTopologyCalls = stormMetricsRegistry.registerMeter("nimbus:num-killTopology-calls");
        this.rebalanceCalls = stormMetricsRegistry.registerMeter("nimbus:num-rebalance-calls");
        this.activateCalls = stormMetricsRegistry.registerMeter("nimbus:num-activate-calls");
        this.deactivateCalls = stormMetricsRegistry.registerMeter("nimbus:num-deactivate-calls");
        this.debugCalls = stormMetricsRegistry.registerMeter("nimbus:num-debug-calls");
        this.setWorkerProfilerCalls = stormMetricsRegistry.registerMeter("nimbus:num-setWorkerProfiler-calls");
        this.getComponentPendingProfileActionsCalls = stormMetricsRegistry.registerMeter("nimbus:num-getComponentPendingProfileActions-calls");
        this.setLogConfigCalls = stormMetricsRegistry.registerMeter("nimbus:num-setLogConfig-calls");
        this.uploadNewCredentialsCalls = stormMetricsRegistry.registerMeter("nimbus:num-uploadNewCredentials-calls");
        this.beginFileUploadCalls = stormMetricsRegistry.registerMeter("nimbus:num-beginFileUpload-calls");
        this.uploadChunkCalls = stormMetricsRegistry.registerMeter("nimbus:num-uploadChunk-calls");
        this.finishFileUploadCalls = stormMetricsRegistry.registerMeter("nimbus:num-finishFileUpload-calls");
        this.downloadChunkCalls = stormMetricsRegistry.registerMeter("nimbus:num-downloadChunk-calls");
        this.getNimbusConfCalls = stormMetricsRegistry.registerMeter("nimbus:num-getNimbusConf-calls");
        this.getLogConfigCalls = stormMetricsRegistry.registerMeter("nimbus:num-getLogConfig-calls");
        this.getTopologyConfCalls = stormMetricsRegistry.registerMeter("nimbus:num-getTopologyConf-calls");
        this.getTopologyCalls = stormMetricsRegistry.registerMeter("nimbus:num-getTopology-calls");
        this.getUserTopologyCalls = stormMetricsRegistry.registerMeter("nimbus:num-getUserTopology-calls");
        this.getClusterInfoCalls = stormMetricsRegistry.registerMeter("nimbus:num-getClusterInfo-calls");
        this.getTopologySummariesCalls = stormMetricsRegistry.registerMeter("nimbus:num-getTopologySummaries-calls");
        this.getTopologySummaryCalls = stormMetricsRegistry.registerMeter("nimbus:num-getTopologySummary-calls");
        this.getTopologySummaryByNameCalls = stormMetricsRegistry.registerMeter("nimbus:num-getTopologySummaryByName-calls");
        this.getLeaderCalls = stormMetricsRegistry.registerMeter("nimbus:num-getLeader-calls");
        this.isTopologyNameAllowedCalls = stormMetricsRegistry.registerMeter("nimbus:num-isTopologyNameAllowed-calls");
        this.getTopologyInfoWithOptsCalls = stormMetricsRegistry.registerMeter("nimbus:num-getTopologyInfoWithOpts-calls");
        this.getTopologyInfoCalls = stormMetricsRegistry.registerMeter("nimbus:num-getTopologyInfo-calls");
        this.getTopologyInfoByNameCalls = stormMetricsRegistry.registerMeter("nimbus:num-getTopologyInfoByName-calls");
        this.getTopologyInfoByNameWithOptsCalls = stormMetricsRegistry.registerMeter("nimbus:num-getTopologyInfoByNameWithOpts-calls");
        this.getTopologyPageInfoCalls = stormMetricsRegistry.registerMeter("nimbus:num-getTopologyPageInfo-calls");
        this.getSupervisorPageInfoCalls = stormMetricsRegistry.registerMeter("nimbus:num-getSupervisorPageInfo-calls");
        this.getComponentPageInfoCalls = stormMetricsRegistry.registerMeter("nimbus:num-getComponentPageInfo-calls");
        this.getOwnerResourceSummariesCalls = stormMetricsRegistry.registerMeter("nimbus:num-getOwnerResourceSummaries-calls");
        this.shutdownCalls = stormMetricsRegistry.registerMeter("nimbus:num-shutdown-calls");
        this.processWorkerMetricsCalls = stormMetricsRegistry.registerMeter("nimbus:process-worker-metric-calls");
        this.mkAssignmentsErrors = stormMetricsRegistry.registerMeter("nimbus:mkAssignments-Errors");
        this.sendAssignmentExceptions = stormMetricsRegistry.registerMeter("nimbus:num-send-assignment-exceptions");
        this.fileUploadDuration = stormMetricsRegistry.registerTimer("nimbus:files-upload-duration-ms");
        this.schedulingDuration = stormMetricsRegistry.registerTimer("nimbus:topology-scheduling-duration-ms");
        this.numAddedExecPerScheduling = stormMetricsRegistry.registerHistogram("nimbus:num-added-executors-per-scheduling");
        this.numAddedSlotPerScheduling = stormMetricsRegistry.registerHistogram("nimbus:num-added-slots-per-scheduling");
        this.numRemovedExecPerScheduling = stormMetricsRegistry.registerHistogram("nimbus:num-removed-executors-per-scheduling");
        this.numRemovedSlotPerScheduling = stormMetricsRegistry.registerHistogram("nimbus:num-removed-slots-per-scheduling");
        this.numNetExecIncreasePerScheduling = stormMetricsRegistry.registerHistogram("nimbus:num-net-executors-increase-per-scheduling");
        this.numNetSlotIncreasePerScheduling = stormMetricsRegistry.registerHistogram("nimbus:num-net-slots-increase-per-scheduling");
        this.metricsStore = null;
        try {
            this.metricsStore = MetricStoreConfig.configure(map, stormMetricsRegistry);
        } catch (Exception e) {
            LOG.error("Failed to initialize metric store", e);
        }
        this.nimbusHostPortInfo = nimbusInfo == null ? NimbusInfo.fromConf(map) : nimbusInfo;
        if (iNimbus != null) {
            iNimbus.prepare(map, ServerConfigUtils.masterInimbusDir(map));
        }
        this.inimbus = iNimbus;
        this.authorizationHandler = StormCommon.mkAuthorizationHandler((String) map.get(DaemonConfig.NIMBUS_AUTHORIZER), map);
        this.impersonationAuthorizationHandler = StormCommon.mkAuthorizationHandler((String) map.get(DaemonConfig.NIMBUS_IMPERSONATION_AUTHORIZER), map);
        this.submittedCount = new AtomicLong(0L);
        iStormClusterState = iStormClusterState == null ? makeStormClusterState(map) : iStormClusterState;
        this.stormClusterState = iStormClusterState;
        this.heartbeatsCache = new HeartbeatCache();
        this.heartbeatsReadyFlag = new AtomicBoolean(false);
        this.heartbeatsRecoveryStrategy = WorkerHeartbeatsRecoveryStrategyFactory.getStrategy(map);
        this.downloaders = fileCacheMap(map);
        this.uploaders = fileCacheMap(map);
        this.blobDownloaders = makeBlobCacheMap(map);
        this.blobUploaders = makeBlobCacheMap(map);
        this.blobListers = makeBlobListCacheMap(map);
        this.uptime = Utils.makeUptimeComputer();
        this.validator = (ITopologyValidator) ReflectionUtils.newInstance((String) map.getOrDefault(DaemonConfig.NIMBUS_TOPOLOGY_VALIDATOR, DefaultTopologyValidator.class.getName()));
        this.timer = new StormTimer((String) null, (thread, th) -> {
            LOG.error("Error while processing event", th);
            Utils.exitProcess(20, "Error while processing event");
        });
        this.underlyingScheduler = makeScheduler(map, iNimbus);
        this.scheduler = wrapAsBlacklistScheduler(map, this.underlyingScheduler, stormMetricsRegistry);
        this.zkClient = makeZKClient(map);
        this.idToExecutors = new AtomicReference<>(new HashMap());
        blobStore = blobStore == null ? ServerUtils.getNimbusBlobStore(map, this.nimbusHostPortInfo, null) : blobStore;
        this.blobStore = blobStore;
        topoCache = topoCache == null ? new TopoCache(blobStore, map) : topoCache;
        this.leaderElector = iLeaderElector == null ? Zookeeper.zkLeaderElector(map, this.zkClient, blobStore, topoCache, iStormClusterState, getNimbusAcls(map), stormMetricsRegistry) : iLeaderElector;
        this.blobStore.setLeaderElector(this.leaderElector);
        this.topoCache = topoCache;
        this.assignmentsDistributer = AssignmentDistributionService.getInstance(map, this.scheduler);
        this.idToSchedStatus = new AtomicReference<>(new HashMap());
        this.nodeIdToResources = new AtomicReference<>(new HashMap());
        this.idToResources = new AtomicReference<>(new HashMap());
        this.idToWorkerResources = new AtomicReference<>(new HashMap());
        this.credRenewers = ClientAuthUtils.getCredentialRenewers(map);
        this.topologyHistoryLock = new Object();
        this.topologyHistoryState = ServerConfigUtils.nimbusTopoHistoryState(map);
        this.nimbusAutocredPlugins = ClientAuthUtils.getNimbusAutoCredPlugins(map);
        this.nimbusTopologyActionNotifier = createTopologyActionNotifier(map);
        this.clusterConsumerExceutors = makeClusterMetricsConsumerExecutors(map);
        this.groupMapper = iGroupMappingServiceProvider == null ? ClientAuthUtils.getGroupMappingServiceProviderPlugin(map) : iGroupMappingServiceProvider;
        this.principalToLocal = ClientAuthUtils.getPrincipalToLocalPlugin(map);
        this.supervisorClasspaths = Collections.unmodifiableNavigableMap(Utils.getConfiguredClasspathVersions(map, EMPTY_STRING_LIST));
        this.clusterMetricSet = new ClusterSummaryMetricSet(stormMetricsRegistry);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static StormBase make(TopologyStatus topologyStatus) {
        StormBase stormBase = new StormBase();
        stormBase.set_status(topologyStatus);
        stormBase.set_component_executors(Collections.emptyMap());
        stormBase.set_component_debug(Collections.emptyMap());
        return stormBase;
    }

    private static <T extends AutoCloseable> TimeCacheMap<String, T> fileCacheMap(Map<String, Object> map) {
        return new TimeCacheMap<>(ObjectReader.getInt(map.get(DaemonConfig.NIMBUS_FILE_COPY_EXPIRATION_SECS), Integer.valueOf(StatsUtil.TEN_MIN_IN_SECONDS)).intValue(), (str, autoCloseable) -> {
            try {
                autoCloseable.close();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    private static <K, V> Map<K, V> mapDiff(Map<? extends K, ? extends V> map, Map<? extends K, ? extends V> map2) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<? extends K, ? extends V> entry : map2.entrySet()) {
            if (!entry.getValue().equals(map.get(entry.getKey()))) {
                hashMap.put(entry.getKey(), entry.getValue());
            }
        }
        return hashMap;
    }

    private static IScheduler wrapAsBlacklistScheduler(Map<String, Object> map, IScheduler iScheduler, StormMetricsRegistry stormMetricsRegistry) {
        BlacklistScheduler blacklistScheduler = new BlacklistScheduler(iScheduler);
        blacklistScheduler.prepare(map, stormMetricsRegistry);
        return blacklistScheduler;
    }

    private static IScheduler makeScheduler(Map<String, Object> map, INimbus iNimbus) {
        String str = (String) map.get(DaemonConfig.STORM_SCHEDULER);
        IScheduler forcedScheduler = iNimbus == null ? null : iNimbus.getForcedScheduler();
        if (forcedScheduler != null) {
            LOG.info("Using forced scheduler from INimbus {} {}", forcedScheduler.getClass(), forcedScheduler);
        } else if (str != null) {
            LOG.info("Using custom scheduler: {}", str);
            forcedScheduler = (IScheduler) ReflectionUtils.newInstance(str);
        } else {
            LOG.info("Using default scheduler");
            forcedScheduler = new DefaultScheduler();
        }
        return forcedScheduler;
    }

    private static <T extends AutoCloseable> TimeCacheMap<String, T> makeBlobCacheMap(Map<String, Object> map) {
        return new TimeCacheMap<>(ObjectReader.getInt(map.get(DaemonConfig.NIMBUS_BLOBSTORE_EXPIRATION_SECS), Integer.valueOf(StatsUtil.TEN_MIN_IN_SECONDS)).intValue(), (str, autoCloseable) -> {
            try {
                if (autoCloseable instanceof AtomicOutputStream) {
                    ((AtomicOutputStream) autoCloseable).cancel();
                } else {
                    autoCloseable.close();
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    private static TimeCacheMap<String, Iterator<String>> makeBlobListCacheMap(Map<String, Object> map) {
        return new TimeCacheMap<>(ObjectReader.getInt(map.get(DaemonConfig.NIMBUS_BLOBSTORE_EXPIRATION_SECS), Integer.valueOf(StatsUtil.TEN_MIN_IN_SECONDS)).intValue());
    }

    private static ITopologyActionNotifierPlugin createTopologyActionNotifier(Map<String, Object> map) {
        String str = (String) map.get(DaemonConfig.NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN);
        ITopologyActionNotifierPlugin iTopologyActionNotifierPlugin = null;
        if (str != null && !str.isEmpty()) {
            iTopologyActionNotifierPlugin = (ITopologyActionNotifierPlugin) ReflectionUtils.newInstance(str);
            try {
                iTopologyActionNotifierPlugin.prepare(map);
            } catch (Exception e) {
                LOG.warn("Ignoring exception, Could not initialize {}", str, e);
                iTopologyActionNotifierPlugin = null;
            }
        }
        return iTopologyActionNotifierPlugin;
    }

    private static List<ClusterMetricsConsumerExecutor> makeClusterMetricsConsumerExecutors(Map<String, Object> map) {
        Collection<Map> collection = (Collection) map.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_REGISTER);
        ArrayList arrayList = new ArrayList();
        if (collection != null) {
            for (Map map2 : collection) {
                arrayList.add(new ClusterMetricsConsumerExecutor((String) map2.get("class"), map2.get("argument")));
            }
        }
        return arrayList;
    }

    private static Subject getSubject() {
        return ReqContext.context().subject();
    }

    static Map<String, Object> readTopoConf(String str, TopoCache topoCache) throws KeyNotFoundException, AuthorizationException, IOException {
        return topoCache.readTopoConf(str, getSubject());
    }

    static List<String> getKeyListFromId(Map<String, Object> map, String str) {
        ArrayList arrayList = new ArrayList(3);
        arrayList.add(ConfigUtils.masterStormCodeKey(str));
        arrayList.add(ConfigUtils.masterStormConfKey(str));
        if (!ConfigUtils.isLocalMode(map)) {
            arrayList.add(ConfigUtils.masterStormJarKey(str));
        }
        return arrayList;
    }

    public static int getVersionForKey(String str, NimbusInfo nimbusInfo, CuratorFramework curatorFramework) throws KeyNotFoundException {
        return new KeySequenceNumber(str, nimbusInfo).getKeySequenceNumber(curatorFramework);
    }

    private static StormTopology readStormTopology(String str, TopoCache topoCache) throws KeyNotFoundException, AuthorizationException, IOException {
        return topoCache.readTopology(str, getSubject());
    }

    private static Map<String, Object> readTopoConfAsNimbus(String str, TopoCache topoCache) throws KeyNotFoundException, AuthorizationException, IOException {
        return topoCache.readTopoConf(str, NIMBUS_SUBJECT);
    }

    private static StormTopology readStormTopologyAsNimbus(String str, TopoCache topoCache) throws KeyNotFoundException, AuthorizationException, IOException {
        return topoCache.readTopology(str, NIMBUS_SUBJECT);
    }

    private static Map<String, Map<List<Long>, List<Object>>> computeTopoToExecToNodePort(Map<String, SchedulerAssignment> map, List<String> list) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, SchedulerAssignment> entry : map.entrySet()) {
            HashMap hashMap2 = new HashMap();
            for (Map.Entry<ExecutorDetails, WorkerSlot> entry2 : entry.getValue().getExecutorToSlot().entrySet()) {
                hashMap2.put(entry2.getKey().toList(), entry2.getValue().toList());
            }
            hashMap.put(entry.getKey(), hashMap2);
        }
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            hashMap.putIfAbsent(it.next(), null);
        }
        return hashMap;
    }

    private static int numUsedWorkers(SchedulerAssignment schedulerAssignment) {
        if (schedulerAssignment == null) {
            return 0;
        }
        return schedulerAssignment.getSlots().size();
    }

    private static Map<String, Map<WorkerSlot, WorkerResources>> computeTopoToNodePortToResources(Map<String, SchedulerAssignment> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, SchedulerAssignment> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue().getScheduledResources());
        }
        return hashMap;
    }

    private boolean auditAssignmentChanges(Map<String, Assignment> map, Map<String, Assignment> map2) {
        if (!$assertionsDisabled && (map == null || map2 == null)) {
            throw new AssertionError();
        }
        boolean isEmpty = map.isEmpty() ^ map2.isEmpty();
        long j = 0;
        long j2 = 0;
        long j3 = 0;
        long j4 = 0;
        if (map.isEmpty()) {
            for (Map.Entry<String, Assignment> entry : map2.entrySet()) {
                Map map3 = entry.getValue().get_executor_node_port();
                long size = new HashSet(map3.values()).size();
                LOG.info("Assigning {} to {} slots", entry.getKey(), Long.valueOf(size));
                LOG.info("Assign executors: {}", map3.keySet());
                j4 += size;
                j3 += map3.size();
            }
        } else if (map2.isEmpty()) {
            for (Map.Entry<String, Assignment> entry2 : map.entrySet()) {
                Map map4 = entry2.getValue().get_executor_node_port();
                long size2 = new HashSet(map4.values()).size();
                LOG.info("Removing {} from {} slots", entry2.getKey(), Long.valueOf(size2));
                LOG.info("Remove executors: {}", map4.keySet());
                j2 += size2;
                j += map4.size();
            }
        } else {
            MapDifference difference = Maps.difference(map, map2);
            boolean z = !difference.areEqual();
            isEmpty = z;
            if (z) {
                for (Map.Entry entry3 : difference.entriesOnlyOnLeft().entrySet()) {
                    Map map5 = ((Assignment) entry3.getValue()).get_executor_node_port();
                    long size3 = new HashSet(map5.values()).size();
                    LOG.info("Removing {} from {} slots", entry3.getKey(), Long.valueOf(size3));
                    LOG.info("Remove executors: {}", map5.keySet());
                    j2 += size3;
                    j += map5.size();
                }
                for (Map.Entry entry4 : difference.entriesOnlyOnRight().entrySet()) {
                    Map map6 = ((Assignment) entry4.getValue()).get_executor_node_port();
                    long size4 = new HashSet(map6.values()).size();
                    LOG.info("Assigning {} to {} slots", entry4.getKey(), Long.valueOf(size4));
                    LOG.info("Assign executors: {}", map6.keySet());
                    j4 += size4;
                    j3 += map6.size();
                }
                for (Map.Entry entry5 : difference.entriesDiffering().entrySet()) {
                    Map map7 = ((Assignment) ((MapDifference.ValueDifference) entry5.getValue()).rightValue()).get_executor_node_port();
                    LOG.info("Reassigning {} to {} slots", entry5.getKey(), Integer.valueOf(new HashSet(map7.values()).size()));
                    LOG.info("Reassign executors: {}", map7.keySet());
                    Map map8 = ((Assignment) ((MapDifference.ValueDifference) entry5.getValue()).leftValue()).get_executor_node_port();
                    long j5 = 0;
                    HashSet hashSet = new HashSet(map7.size());
                    for (Map.Entry entry6 : map7.entrySet()) {
                        if (((NodeInfo) entry6.getValue()).equals((NodeInfo) map8.get(entry6.getKey()))) {
                            j5++;
                            hashSet.add(entry6.getValue());
                        }
                    }
                    long size5 = hashSet.size();
                    j2 += new HashSet(map8.values()).size() - size5;
                    j += map8.size() - j5;
                    j4 += r0.size() - size5;
                    j3 += map7.size() - j5;
                }
            }
            LOG.debug("{} assignments unchanged: {}", Integer.valueOf(difference.entriesInCommon().size()), difference.entriesInCommon().keySet());
        }
        this.numAddedExecPerScheduling.update(j3);
        this.numAddedSlotPerScheduling.update(j4);
        this.numRemovedExecPerScheduling.update(j);
        this.numRemovedSlotPerScheduling.update(j2);
        this.numNetExecIncreasePerScheduling.update(j3 - j);
        this.numNetSlotIncreasePerScheduling.update(j4 - j2);
        if (isEmpty) {
            LOG.info("Fragmentation after scheduling is: {} MB, {} PCore CPUs", Double.valueOf(fragmentedMemory()), Integer.valueOf(fragmentedCpu()));
            this.nodeIdToResources.get().forEach((str, supervisorResources) -> {
                double availableMem = supervisorResources.getAvailableMem();
                if (availableMem < 0.0d) {
                    LOG.warn("Memory over-scheduled on {}", str, Double.valueOf(availableMem));
                }
                double availableCpu = supervisorResources.getAvailableCpu();
                if (availableCpu < 0.0d) {
                    LOG.warn("CPU over-scheduled on {}", str, Double.valueOf(availableCpu));
                }
                LOG.info("Node Id: {} Total Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used CPU: {}, Available CPU: {}, fragmented: {}", new Object[]{str, Double.valueOf(supervisorResources.getTotalMem()), Double.valueOf(supervisorResources.getUsedMem()), Double.valueOf(availableMem), Double.valueOf(supervisorResources.getTotalCpu()), Double.valueOf(supervisorResources.getUsedCpu()), Double.valueOf(availableCpu), Boolean.valueOf(isFragmented(supervisorResources))});
            });
        }
        return isEmpty;
    }

    private static List<List<Long>> changedExecutors(Map<List<Long>, NodeInfo> map, Map<List<Long>, List<Object>> map2) {
        HashMap hashMap = map == null ? new HashMap() : Utils.reverseMap(map);
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : hashMap.entrySet()) {
            NodeInfo nodeInfo = (NodeInfo) entry.getKey();
            ArrayList arrayList = new ArrayList(2);
            arrayList.add(nodeInfo.get_node());
            arrayList.add(nodeInfo.get_port_iterator().next());
            ArrayList arrayList2 = new ArrayList((Collection) entry.getValue());
            arrayList2.sort(Comparator.comparing(list -> {
                return (Long) list.get(0);
            }));
            hashMap2.put(arrayList, arrayList2);
        }
        HashMap hashMap3 = map2 == null ? new HashMap() : Utils.reverseMap(map2);
        HashMap hashMap4 = new HashMap();
        for (Map.Entry entry2 : hashMap3.entrySet()) {
            ArrayList arrayList3 = new ArrayList((Collection) entry2.getValue());
            arrayList3.sort(Comparator.comparing(list2 -> {
                return (Long) list2.get(0);
            }));
            hashMap4.put(entry2.getKey(), arrayList3);
        }
        Map mapDiff = mapDiff(hashMap2, hashMap4);
        ArrayList arrayList4 = new ArrayList();
        Iterator it = mapDiff.values().iterator();
        while (it.hasNext()) {
            arrayList4.addAll((List) it.next());
        }
        return arrayList4;
    }

    private static Set<WorkerSlot> newlyAddedSlots(Assignment assignment, Assignment assignment2) {
        HashSet hashSet = new HashSet(assignment.get_executor_node_port().values());
        HashSet<NodeInfo> hashSet2 = new HashSet(assignment2.get_executor_node_port().values());
        hashSet2.removeAll(hashSet);
        HashSet hashSet3 = new HashSet();
        for (NodeInfo nodeInfo : hashSet2) {
            hashSet3.add(new WorkerSlot(nodeInfo.get_node(), (Number) nodeInfo.get_port_iterator().next()));
        }
        return hashSet3;
    }

    private static Map<String, SupervisorDetails> basicSupervisorDetailsMap(IStormClusterState iStormClusterState) {
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : iStormClusterState.allSupervisorInfo().entrySet()) {
            String str = (String) entry.getKey();
            SupervisorInfo supervisorInfo = (SupervisorInfo) entry.getValue();
            hashMap.put(str, new SupervisorDetails(str, supervisorInfo.get_server_port(), supervisorInfo.get_hostname(), supervisorInfo.get_scheduler_meta(), null, supervisorInfo.get_resources_map()));
        }
        return hashMap;
    }

    private static boolean isTopologyActive(IStormClusterState iStormClusterState, String str) {
        return iStormClusterState.getTopoId(str).isPresent();
    }

    private static boolean isTopologyActiveOrActivating(IStormClusterState iStormClusterState, String str) {
        return isTopologyActive(iStormClusterState, str) || iStormClusterState.activeStorms().contains(str);
    }

    private static String topologyUsingThisBlob(IStormClusterState iStormClusterState, TopoCache topoCache, String str) {
        Map map;
        for (String str2 : iStormClusterState.activeStorms()) {
            try {
                Map<String, Object> readTopoConfAsNimbus = readTopoConfAsNimbus(str2, topoCache);
                if (readTopoConfAsNimbus != null && (map = (Map) readTopoConfAsNimbus.get("topology.blobstore.map")) != null && map.containsKey(str)) {
                    return str2;
                }
            } catch (KeyNotFoundException | AuthorizationException | IOException e) {
            }
        }
        return null;
    }

    private static Map<String, Object> tryReadTopoConf(String str, TopoCache topoCache) throws NotAliveException, AuthorizationException, IOException {
        try {
            return readTopoConfAsNimbus(str, topoCache);
        } catch (KeyNotFoundException e) {
            if (str == null) {
                throw new NullPointerException();
            }
            throw new WrappedNotAliveException(str);
        }
    }

    private static void rotateTopologyCleanupMap(long j) {
        if (Time.currentTimeMillis() - topologyCleanupRotationTime > j) {
            topologyCleanupDetected.rotate();
            topologyCleanupRotationTime = Time.currentTimeMillis();
        }
    }

    private static long getTopologyCleanupDetectedTime(String str) {
        Long l = (Long) topologyCleanupDetected.get(str);
        if (l == null) {
            l = Long.valueOf(Time.currentTimeMillis());
            topologyCleanupDetected.put(str, l);
        }
        return l.longValue();
    }

    static Set<String> getExpiredTopologyIds(Set<String> set, Map<String, Object> map) {
        HashSet hashSet = new HashSet();
        long intValue = ObjectReader.getInt(map.get(DaemonConfig.NIMBUS_TOPOLOGY_BLOBSTORE_DELETION_DELAY_MS), 300000).intValue();
        for (String str : set) {
            if (Math.max(0L, Time.currentTimeMillis() - getTopologyCleanupDetectedTime(str)) >= intValue) {
                hashSet.add(str);
            }
        }
        rotateTopologyCleanupMap(intValue);
        return hashSet;
    }

    @VisibleForTesting
    public static Set<String> topoIdsToClean(IStormClusterState iStormClusterState, BlobStore blobStore, Map<String, Object> map) {
        HashSet hashSet = new HashSet();
        hashSet.addAll((Collection) Utils.OR(iStormClusterState.heartbeatStorms(), EMPTY_STRING_LIST));
        hashSet.addAll((Collection) Utils.OR(iStormClusterState.errorTopologies(), EMPTY_STRING_LIST));
        hashSet.addAll((Collection) Utils.OR(blobStore.storedTopoIds(), EMPTY_STRING_SET));
        hashSet.addAll((Collection) Utils.OR(iStormClusterState.backpressureTopologies(), EMPTY_STRING_LIST));
        hashSet.addAll((Collection) Utils.OR(iStormClusterState.idsOfTopologiesWithPrivateWorkerKeys(), EMPTY_STRING_SET));
        Set<String> expiredTopologyIds = getExpiredTopologyIds(hashSet, map);
        expiredTopologyIds.removeAll((Collection) Utils.OR(iStormClusterState.activeStorms(), EMPTY_STRING_LIST));
        return expiredTopologyIds;
    }

    private static String extractStatusStr(StormBase stormBase) {
        TopologyStatus topologyStatus;
        String str = null;
        if (stormBase != null && (topologyStatus = stormBase.get_status()) != null) {
            str = topologyStatus.name().toUpperCase();
        }
        return str;
    }

    private static StormTopology normalizeTopology(Map<String, Object> map, StormTopology stormTopology) throws InvalidTopologyException {
        StormTopology deepCopy = stormTopology.deepCopy();
        for (Object obj : StormCommon.allComponents(deepCopy).values()) {
            Map componentConf = StormCommon.componentConf(obj);
            componentConf.put("topology.tasks", Integer.valueOf(ServerUtils.getComponentParallelism(map, obj)));
            StormCommon.getComponentCommon(obj).set_json_conf(JSONValue.toJSONString(componentConf));
        }
        return deepCopy;
    }

    private static void addToDecorators(Set<String> set, List<String> list) {
        if (list != null) {
            set.addAll(list);
        }
    }

    private static void addToSerializers(Map<String, String> map, List<Object> list) {
        if (list != null) {
            for (Object obj : list) {
                if (obj instanceof Map) {
                    map.putAll((Map) obj);
                } else {
                    map.put((String) obj, null);
                }
            }
        }
    }

    private static Map<String, Object> normalizeConf(Map<String, Object> map, Map<String, Object> map2, StormTopology stormTopology) {
        int intValue;
        int intValue2;
        ArrayList<Map> arrayList = new ArrayList();
        Iterator it = StormCommon.allComponents(stormTopology).values().iterator();
        while (it.hasNext()) {
            arrayList.add(StormCommon.componentConf(it.next()));
        }
        HashSet hashSet = new HashSet();
        HashMap hashMap = new HashMap();
        for (Map map3 : arrayList) {
            addToDecorators(hashSet, (List) map3.get("topology.kryo.decorators"));
            addToSerializers(hashMap, (List) map3.get("topology.kryo.register"));
        }
        addToDecorators(hashSet, (List) map2.getOrDefault("topology.kryo.decorators", map.get("topology.kryo.decorators")));
        addToSerializers(hashMap, (List) map2.getOrDefault("topology.kryo.register", map.get("topology.kryo.register")));
        Map merge = Utils.merge(map, map2);
        HashMap hashMap2 = new HashMap(map2);
        hashMap2.put("topology.kryo.register", hashMap);
        hashMap2.put("topology.kryo.decorators", new ArrayList(hashSet));
        hashMap2.put("topology.acker.executors", merge.get("topology.acker.executors"));
        hashMap2.put("topology.eventlogger.executors", merge.get("topology.eventlogger.executors"));
        hashMap2.put("topology.max.task.parallelism", merge.get("topology.max.task.parallelism"));
        boolean booleanValue = map2.containsKey("storm.messaging.netty.authentication") ? ((Boolean) map2.get("storm.messaging.netty.authentication")).booleanValue() || ((Boolean) map.get("storm.messaging.netty.authentication")).booleanValue() : ((Boolean) map.get("storm.messaging.netty.authentication")).booleanValue();
        LOG.debug("For netty authentication, topo conf is: {}, cluster conf is: {}, Enforce netty auth: {}", new Object[]{map2.get("storm.messaging.netty.authentication"), map.get("storm.messaging.netty.authentication"), Boolean.valueOf(booleanValue)});
        hashMap2.put("storm.messaging.netty.authentication", Boolean.valueOf(booleanValue));
        if (!merge.containsKey("topology.metrics.reporters") && merge.containsKey("storm.metrics.reporters")) {
            hashMap2.put("topology.metrics.reporters", merge.get("storm.metrics.reporters"));
        }
        if (map.containsKey("storm.topology.metrics.system.reporters")) {
            ((List) hashMap2.computeIfAbsent("topology.metrics.reporters", str -> {
                return new ArrayList();
            })).addAll((List) map.get("storm.topology.metrics.system.reporters"));
        }
        hashMap2.put("storm.cgroup.hierarchy.dir", map.get("storm.cgroup.hierarchy.dir"));
        hashMap2.put("worker.metrics", map.get("worker.metrics"));
        if (merge.containsKey("topology.worker.timeout.secs") && (intValue = ObjectReader.getInt(merge.get("topology.worker.timeout.secs")).intValue()) > (intValue2 = ObjectReader.getInt(merge.get("worker.max.timeout.secs")).intValue())) {
            hashMap2.put("topology.worker.timeout.secs", Integer.valueOf(intValue2));
            LOG.warn("Topology {} topology.worker.timeout.secs is too large. Reducing from {} to {}", new Object[]{(String) merge.get("storm.id"), Integer.valueOf(intValue), Integer.valueOf(intValue2)});
        }
        return hashMap2;
    }

    private static void rmBlobKey(BlobStore blobStore, String str, IStormClusterState iStormClusterState) {
        try {
            blobStore.deleteBlob(str, NIMBUS_SUBJECT);
        } catch (Exception e) {
            LOG.info("Exception {}", e);
        }
    }

    @VisibleForTesting
    public static void cleanInbox(String str, int i) {
        long currentTimeMillis = Time.currentTimeMillis();
        long secsToMillis = Time.secsToMillis(i);
        for (File file : new File(str).listFiles(file2 -> {
            return file2.isFile() && file2.lastModified() + secsToMillis <= currentTimeMillis;
        })) {
            if (file.delete()) {
                LOG.info("Cleaning inbox ... deleted: {}", file.getName());
            } else {
                LOG.error("Cleaning inbox ... error deleting: {}", file.getName());
            }
        }
    }

    private static ExecutorInfo toExecInfo(List<Long> list) {
        return new ExecutorInfo(list.get(0).intValue(), list.get(1).intValue());
    }

    private static void validateTopologyName(String str) throws InvalidTopologyException {
        try {
            Utils.validateTopologyName(str);
        } catch (IllegalArgumentException e) {
            throw new WrappedInvalidTopologyException(e.getMessage());
        }
    }

    private static StormTopology tryReadTopology(String str, TopoCache topoCache) throws NotAliveException, AuthorizationException, IOException {
        try {
            return readStormTopologyAsNimbus(str, topoCache);
        } catch (KeyNotFoundException e) {
            throw new WrappedNotAliveException(str);
        }
    }

    private static void validateTopologySize(Map<String, Object> map, Map<String, Object> map2, StormTopology stormTopology) throws InvalidTopologyException {
        if (!ServerUtils.isRas(map2)) {
            int intValue = ObjectReader.getInt(map.get("topology.workers"), 1).intValue();
            Integer num = ObjectReader.getInt(map2.get(DaemonConfig.NIMBUS_SLOTS_PER_TOPOLOGY), (Integer) null);
            if (num != null && intValue > num.intValue()) {
                throw new WrappedInvalidTopologyException("Failed to submit topology. Topology requests more than " + num + " workers.");
            }
        }
        int i = 0;
        Iterator it = StormCommon.allComponents(stormTopology).values().iterator();
        while (it.hasNext()) {
            i += StormCommon.numStartExecutors(it.next());
        }
        Integer num2 = ObjectReader.getInt(map2.get(DaemonConfig.NIMBUS_EXECUTORS_PER_TOPOLOGY), (Integer) null);
        if (num2 != null && i > num2.intValue()) {
            throw new WrappedInvalidTopologyException("Failed to submit topology. Topology requests more than " + num2 + " executors.");
        }
    }

    private static void setLoggerTimeouts(LogLevel logLevel) {
        int i = logLevel.get_reset_log_level_timeout_secs();
        if (i > 0) {
            logLevel.set_reset_log_level_timeout_epoch(Time.currentTimeMillis() + Time.secsToMillis(i));
        } else {
            logLevel.unset_reset_log_level_timeout_epoch();
        }
    }

    @VisibleForTesting
    public static List<String> topologiesOnSupervisor(Map<String, Assignment> map, String str) {
        HashSet hashSet = new HashSet();
        for (Map.Entry<String, Assignment> entry : map.entrySet()) {
            Iterator it = entry.getValue().get_executor_node_port().values().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                if (str.equals(((NodeInfo) it.next()).get_node())) {
                    hashSet.add(entry.getKey());
                    break;
                }
            }
        }
        return new ArrayList(hashSet);
    }

    private static IClusterMetricsConsumer.ClusterInfo mkClusterInfo() {
        return new IClusterMetricsConsumer.ClusterInfo(Time.currentTimeSecs());
    }

    private static List<DataPoint> extractClusterMetrics(ClusterSummary clusterSummary) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new DataPoint("supervisors", Integer.valueOf(clusterSummary.get_supervisors_size())));
        arrayList.add(new DataPoint("topologies", Integer.valueOf(clusterSummary.get_topologies_size())));
        int i = 0;
        int i2 = 0;
        for (SupervisorSummary supervisorSummary : clusterSummary.get_supervisors()) {
            i2 += supervisorSummary.get_num_used_workers();
            i += supervisorSummary.get_num_workers();
        }
        arrayList.add(new DataPoint("slotsTotal", Integer.valueOf(i)));
        arrayList.add(new DataPoint("slotsUsed", Integer.valueOf(i2)));
        arrayList.add(new DataPoint("slotsFree", Integer.valueOf(i - i2)));
        int i3 = 0;
        int i4 = 0;
        for (TopologySummary topologySummary : clusterSummary.get_topologies()) {
            i3 += topologySummary.get_num_executors();
            i4 += topologySummary.get_num_tasks();
        }
        arrayList.add(new DataPoint("executorsTotal", Integer.valueOf(i3)));
        arrayList.add(new DataPoint("tasksTotal", Integer.valueOf(i4)));
        return arrayList;
    }

    private static Map<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> extractSupervisorMetrics(ClusterSummary clusterSummary) {
        HashMap hashMap = new HashMap();
        for (SupervisorSummary supervisorSummary : clusterSummary.get_supervisors()) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(new DataPoint("slotsTotal", Integer.valueOf(supervisorSummary.get_num_workers())));
            arrayList.add(new DataPoint("slotsUsed", Integer.valueOf(supervisorSummary.get_num_used_workers())));
            arrayList.add(new DataPoint("totalMem", supervisorSummary.get_total_resources().get("memory.mb")));
            arrayList.add(new DataPoint("totalCpu", supervisorSummary.get_total_resources().get("cpu.pcore.percent")));
            arrayList.add(new DataPoint("usedMem", Double.valueOf(supervisorSummary.get_used_mem())));
            arrayList.add(new DataPoint("usedCpu", Double.valueOf(supervisorSummary.get_used_cpu())));
            hashMap.put(new IClusterMetricsConsumer.SupervisorInfo(supervisorSummary.get_host(), supervisorSummary.get_supervisor_id(), Time.currentTimeSecs()), arrayList);
        }
        return hashMap;
    }

    private static void setResourcesDefaultIfNotSet(Map<String, NormalizedResourceRequest> map, String str, Map<String, Object> map2) {
        if (map.get(str) == null) {
            map.put(str, new NormalizedResourceRequest(map2, str));
        }
    }

    private static void validatePortAvailable(Map<String, Object> map) throws IOException {
        int intValue = ObjectReader.getInt(map.get("nimbus.thrift.port")).intValue();
        try {
            ServerSocket serverSocket = new ServerSocket(intValue);
            Throwable th = null;
            if (serverSocket != null) {
                if (0 != 0) {
                    try {
                        serverSocket.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    serverSocket.close();
                }
            }
        } catch (BindException e) {
            LOG.error("{} is not available. Check if another process is already listening on {}", Integer.valueOf(intValue), Integer.valueOf(intValue));
            System.exit(0);
        }
    }

    @VisibleForTesting
    public void launchServer() throws Exception {
        try {
            IStormClusterState iStormClusterState = this.stormClusterState;
            NimbusInfo nimbusInfo = this.nimbusHostPortInfo;
            LOG.info("Starting Nimbus with conf {}", ConfigUtils.maskPasswords(this.conf));
            this.validator.prepare(this.conf);
            iStormClusterState.addNimbusHost(nimbusInfo.getHost(), new NimbusSummary(nimbusInfo.getHost(), nimbusInfo.getPort(), Time.currentTimeSecs(), false, STORM_VERSION));
            this.leaderElector.addToLeaderLockQueue();
            this.blobStore.startSyncBlobs();
            Iterator<ClusterMetricsConsumerExecutor> it = this.clusterConsumerExceutors.iterator();
            while (it.hasNext()) {
                it.next().prepare();
            }
            this.timer.scheduleRecurring(3, 5, () -> {
                try {
                    boolean isLeader = isLeader();
                    if (isLeader && !this.wasLeader) {
                        Iterator it2 = iStormClusterState.activeStorms().iterator();
                        while (it2.hasNext()) {
                            transition((String) it2.next(), TopologyActions.GAIN_LEADERSHIP, null);
                        }
                        this.clusterMetricSet.setActive(true);
                    }
                    this.wasLeader = isLeader;
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
            boolean booleanValue = ((Boolean) this.conf.getOrDefault(ServerConfigUtils.NIMBUS_DO_NOT_REASSIGN, false)).booleanValue();
            this.timer.scheduleRecurring(0, ObjectReader.getInt(this.conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS)).intValue(), () -> {
                if (!booleanValue) {
                    try {
                        mkAssignments();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
                doCleanup();
            });
            int intValue = ObjectReader.getInt(this.conf.get(DaemonConfig.NIMBUS_INBOX_JAR_EXPIRATION_SECS)).intValue();
            this.timer.scheduleRecurring(0, ObjectReader.getInt(this.conf.get(DaemonConfig.NIMBUS_CLEANUP_INBOX_FREQ_SECS)).intValue(), () -> {
                try {
                    cleanInbox(getInbox(), intValue);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
            Integer num = ObjectReader.getInt(this.conf.get(DaemonConfig.LOGVIEWER_CLEANUP_INTERVAL_SECS), (Integer) null);
            if (num != null) {
                int intValue2 = ObjectReader.getInt(this.conf.get(DaemonConfig.LOGVIEWER_CLEANUP_AGE_MINS)).intValue();
                this.timer.scheduleRecurring(0, num.intValue(), () -> {
                    try {
                        cleanTopologyHistory(intValue2);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
            }
            this.timer.scheduleRecurring(0, ObjectReader.getInt(this.conf.get(DaemonConfig.NIMBUS_CREDENTIAL_RENEW_FREQ_SECS)).intValue(), () -> {
                try {
                    renewCredentials();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
            this.timer.scheduleRecurring(30, ServerConfigUtils.getLocalizerUpdateBlobInterval(this.conf) * 5, () -> {
                try {
                    this.blobStore.validateBlobUpdateTime();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
            this.metricsRegistry.registerGauge("nimbus:total-available-memory-non-negative", () -> {
                return Double.valueOf(this.nodeIdToResources.get().values().parallelStream().mapToDouble(supervisorResources -> {
                    return Math.max(supervisorResources.getAvailableMem(), 0.0d);
                }).sum());
            });
            this.metricsRegistry.registerGauge("nimbus:available-cpu-non-negative", () -> {
                return Double.valueOf(this.nodeIdToResources.get().values().parallelStream().mapToDouble(supervisorResources -> {
                    return Math.max(supervisorResources.getAvailableCpu(), 0.0d);
                }).sum());
            });
            this.metricsRegistry.registerGauge("nimbus:total-memory", () -> {
                return Double.valueOf(this.nodeIdToResources.get().values().parallelStream().mapToDouble((v0) -> {
                    return v0.getTotalMem();
                }).sum());
            });
            this.metricsRegistry.registerGauge("nimbus:total-cpu", () -> {
                return Double.valueOf(this.nodeIdToResources.get().values().parallelStream().mapToDouble((v0) -> {
                    return v0.getTotalCpu();
                }).sum());
            });
            this.metricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
                Long valueOf = Long.valueOf(Time.nanoTime());
                Long l = this.schedulingStartTimeNs.get();
                return Long.valueOf(TimeUnit.NANOSECONDS.toMillis(l == null ? this.longestSchedulingTime.get() : Math.max(valueOf.longValue() - l.longValue(), this.longestSchedulingTime.get())));
            });
            this.metricsRegistry.registerMeter("nimbus:num-launched").mark();
            this.timer.scheduleRecurring(0, ObjectReader.getInt(this.conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)).intValue(), () -> {
                try {
                    if (isLeader()) {
                        sendClusterMetricsToExecutors();
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
            this.timer.scheduleRecurring(5, 5, this.clusterMetricSet);
        } catch (Exception e) {
            if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
                throw e;
            }
            if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, e)) {
                throw e;
            }
            LOG.error("Error on initialization of nimbus", e);
            Utils.exitProcess(13, "Error on initialization of nimbus");
        }
    }

    private static Nimbus launchServer(Map<String, Object> map, INimbus iNimbus) throws Exception {
        StormCommon.validateDistributedMode(map);
        validatePortAvailable(map);
        OciUtils.validateImageInDaemonConf(map);
        StormMetricsRegistry stormMetricsRegistry = new StormMetricsRegistry();
        Nimbus nimbus = new Nimbus(map, iNimbus, stormMetricsRegistry);
        nimbus.launchServer();
        ThriftServer thriftServer = new ThriftServer(map, new Nimbus.Processor(nimbus), ThriftConnectionType.NIMBUS);
        stormMetricsRegistry.startMetricsReporters(map);
        Utils.addShutdownHookWithDelayedForceKill(() -> {
            stormMetricsRegistry.stopMetricsReporters();
            nimbus.shutdown();
            thriftServer.stop();
        }, 10);
        if (ClientAuthUtils.areWorkerTokensEnabledServer(thriftServer, map)) {
            nimbus.initWorkerTokenManager();
        }
        LOG.info("Starting nimbus server for storm version '{}'", STORM_VERSION);
        thriftServer.serve();
        return nimbus;
    }

    public static Nimbus launch(INimbus iNimbus) throws Exception {
        Map merge = Utils.merge(ConfigUtils.readStormConfig(), ConfigUtils.readYamlConfig("storm-cluster-auth.yaml", false));
        boolean booleanValue = ((Boolean) merge.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_FIXUP)).booleanValue();
        if (booleanValue || ((Boolean) merge.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_CHECK)).booleanValue()) {
            AclEnforcement.verifyAcls(merge, booleanValue);
        }
        return launchServer(merge, iNimbus);
    }

    public static void main(String[] strArr) throws Exception {
        Utils.setupDefaultUncaughtExceptionHandler();
        launch(new StandaloneINimbus());
    }

    private static CuratorFramework makeZKClient(Map<String, Object> map) {
        List list = (List) map.get("storm.zookeeper.servers");
        Object obj = map.get("storm.zookeeper.port");
        String str = (String) map.get("storm.zookeeper.root");
        CuratorFramework curatorFramework = null;
        if (list != null && obj != null) {
            curatorFramework = ClientZookeeper.mkClient(map, list, obj, str, new DefaultWatcherCallBack(), map, DaemonType.NIMBUS);
        }
        return curatorFramework;
    }

    private static IStormClusterState makeStormClusterState(Map<String, Object> map) throws Exception {
        return ClusterUtils.mkStormClusterState(map, new ClusterStateContext(DaemonType.NIMBUS, map));
    }

    private static List<Integer> asIntExec(List<Long> list) {
        ArrayList arrayList = new ArrayList(2);
        arrayList.add(Integer.valueOf(list.get(0).intValue()));
        arrayList.add(Integer.valueOf(list.get(1).intValue()));
        return arrayList;
    }

    private static Map<String, String> assignmentChangedNodes(Assignment assignment, Assignment assignment2) {
        Map map = null;
        Map map2 = null;
        HashMap hashMap = new HashMap();
        if (assignment != null) {
            map = assignment.get_executor_node_port();
            hashMap.putAll(assignment.get_node_host());
        }
        if (assignment2 != null) {
            map2 = assignment2.get_executor_node_port();
            hashMap.putAll(assignment2.get_node_host());
        }
        if (assignment == null || assignment2 == null) {
            return hashMap;
        }
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : map2.entrySet()) {
            NodeInfo nodeInfo = (NodeInfo) entry.getValue();
            NodeInfo nodeInfo2 = (NodeInfo) map.get(entry.getKey());
            if (null == nodeInfo2) {
                hashMap2.put(nodeInfo.get_node(), hashMap.get(nodeInfo.get_node()));
            } else if (!nodeInfo2.equals(nodeInfo)) {
                hashMap2.put(nodeInfo2.get_node(), hashMap.get(nodeInfo2.get_node()));
                hashMap2.put(nodeInfo.get_node(), hashMap.get(nodeInfo.get_node()));
            }
        }
        return hashMap2;
    }

    private static Map<String, Assignment> assignmentsForHost(Map<String, Assignment> map, String str) {
        HashMap hashMap = new HashMap();
        map.entrySet().stream().filter(entry -> {
            return ((Assignment) entry.getValue()).get_node_host().values().contains(str);
        }).forEach(entry2 -> {
            hashMap.put(entry2.getKey(), entry2.getValue());
        });
        return hashMap;
    }

    private static Map<String, Assignment> assignmentsForNodeId(Map<String, Assignment> map, String str) {
        HashMap hashMap = new HashMap();
        map.entrySet().stream().filter(entry -> {
            return ((Assignment) entry.getValue()).get_node_host().keySet().contains(str);
        }).forEach(entry2 -> {
            hashMap.put(entry2.getKey(), entry2.getValue());
        });
        return hashMap;
    }

    private static void notifySupervisorsAssignments(Map<String, Assignment> map, AssignmentDistributionService assignmentDistributionService, Map<String, String> map2, Map<String, SupervisorDetails> map3, StormMetricsRegistry stormMetricsRegistry) {
        for (Map.Entry<String, String> entry : map2.entrySet()) {
            try {
                String key = entry.getKey();
                String value = entry.getValue();
                SupervisorAssignments supervisorAssignments = new SupervisorAssignments();
                supervisorAssignments.set_storm_assignment(assignmentsForHost(map, value));
                SupervisorDetails supervisorDetails = map3.get(key);
                assignmentDistributionService.addAssignmentsForNode(key, entry.getValue(), supervisorDetails != null ? Integer.valueOf(supervisorDetails.getServerPort()) : null, supervisorAssignments, stormMetricsRegistry);
            } catch (Throwable th) {
                LOG.error("Exception when add assignments distribution task for node {}", entry.getKey());
            }
        }
    }

    private static void notifySupervisorsAsKilled(IStormClusterState iStormClusterState, Assignment assignment, AssignmentDistributionService assignmentDistributionService, StormMetricsRegistry stormMetricsRegistry) {
        notifySupervisorsAssignments(iStormClusterState.assignmentsInfo(), assignmentDistributionService, assignmentChangedNodes(assignment, null), basicSupervisorDetailsMap(iStormClusterState), stormMetricsRegistry);
    }

    Map<String, Object> getConf() {
        return this.conf;
    }

    @VisibleForTesting
    public void setAuthorizationHandler(IAuthorizer iAuthorizer) {
        this.authorizationHandler = iAuthorizer;
    }

    private IStormClusterState getStormClusterState() {
        return this.stormClusterState;
    }

    private AssignmentDistributionService getAssignmentsDistributer() {
        return this.assignmentsDistributer;
    }

    private StormMetricsRegistry getMetricsRegistry() {
        return this.metricsRegistry;
    }

    @VisibleForTesting
    public HeartbeatCache getHeartbeatsCache() {
        return this.heartbeatsCache;
    }

    public AtomicReference<Map<String, Set<List<Integer>>>> getIdToExecutors() {
        return this.idToExecutors;
    }

    private Set<List<Integer>> getOrUpdateExecutors(String str, StormBase stormBase, Map<String, Object> map, StormTopology stormTopology) throws InvalidTopologyException {
        Set<List<Integer>> set = this.idToExecutors.get().get(str);
        if (null == set) {
            set = new HashSet(computeExecutors(stormBase, map, stormTopology));
            this.idToExecutors.getAndUpdate(new Assoc(str, set));
        }
        return set;
    }

    private BlobStore getBlobStore() {
        return this.blobStore;
    }

    private TopoCache getTopoCache() {
        return this.topoCache;
    }

    @VisibleForTesting
    void initWorkerTokenManager() {
        if (this.workerTokenManager == null) {
            this.workerTokenManager = new WorkerTokenManager(this.conf, getStormClusterState());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isLeader() throws Exception {
        return this.leaderElector.isLeader();
    }

    private void assertIsLeader() throws Exception {
        if (isLeader()) {
            return;
        }
        throw new RuntimeException("not a leader, current leader is " + this.leaderElector.getLeader());
    }

    private String getInbox() throws IOException {
        return ServerConfigUtils.masterInbox(this.conf);
    }

    public void addSupervisor(Supervisor supervisor) {
        this.assignmentsDistributer.addLocalSupervisor(supervisor);
    }

    void delayEvent(String str, int i, TopologyActions topologyActions, Object obj) {
        LOG.info("Delaying event {} for {} secs for {}", new Object[]{topologyActions, Integer.valueOf(i), str});
        this.timer.schedule(i, () -> {
            try {
                transition(str, topologyActions, obj, false);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    void doRebalance(String str, StormBase stormBase) throws Exception {
        RebalanceOptions rebalanceOptions = stormBase.get_topology_action_options().get_rebalance_options();
        StormBase stormBase2 = new StormBase();
        stormBase2.set_topology_action_options((TopologyActionOptions) null);
        stormBase2.set_component_debug(Collections.emptyMap());
        if (rebalanceOptions.is_set_num_executors()) {
            stormBase2.set_component_executors(rebalanceOptions.get_num_executors());
        }
        if (rebalanceOptions.is_set_num_workers()) {
            stormBase2.set_num_workers(rebalanceOptions.get_num_workers());
        }
        this.stormClusterState.updateStorm(str, stormBase2);
        updateBlobStore(str, rebalanceOptions, ServerUtils.principalNameToSubject(rebalanceOptions.get_principal()));
        this.idToExecutors.getAndUpdate(new Dissoc(str));
        mkAssignments(str);
    }

    private String toTopoId(String str) throws NotAliveException {
        return (String) this.stormClusterState.getTopoId(str).orElseThrow(() -> {
            return new WrappedNotAliveException(str + " is not alive");
        });
    }

    private void transitionName(String str, TopologyActions topologyActions, Object obj, boolean z) throws Exception {
        transition(toTopoId(str), topologyActions, obj, z);
    }

    private void transition(String str, TopologyActions topologyActions, Object obj) throws Exception {
        transition(str, topologyActions, obj, false);
    }

    private void transition(String str, TopologyActions topologyActions, Object obj, boolean z) throws Exception {
        LOG.info("TRANSITION: {} {} {} {}", new Object[]{str, topologyActions, obj, Boolean.valueOf(z)});
        assertIsLeader();
        synchronized (this.submitLock) {
            IStormClusterState iStormClusterState = this.stormClusterState;
            StormBase stormBase = iStormClusterState.stormBase(str, (Runnable) null);
            if (stormBase == null || stormBase.get_status() == null) {
                LOG.info("Cannot apply event {} to {} because topology no longer exists", topologyActions, str);
            } else {
                TopologyStatus topologyStatus = stormBase.get_status();
                TopologyStateTransition topologyStateTransition = TOPO_STATE_TRANSITIONS.get(topologyStatus).get(topologyActions);
                if (topologyStateTransition == null) {
                    String str2 = "No transition for event: " + topologyActions + ", status: " + topologyStatus + " storm-id: " + str;
                    if (z) {
                        throw new RuntimeException(str2);
                    }
                    if (TopologyActions.GAIN_LEADERSHIP != topologyActions) {
                        LOG.info(str2);
                    }
                    topologyStateTransition = NOOP_TRANSITION;
                }
                StormBase transition = topologyStateTransition.transition(obj, this, str, stormBase);
                if (transition != null) {
                    iStormClusterState.updateStorm(str, transition);
                }
            }
        }
    }

    private void setupStormCode(Map<String, Object> map, String str, String str2, Map<String, Object> map2, StormTopology stormTopology) throws Exception {
        Subject subject = getSubject();
        IStormClusterState iStormClusterState = this.stormClusterState;
        BlobStore blobStore = this.blobStore;
        String masterStormJarKey = ConfigUtils.masterStormJarKey(str);
        if (str2 != null) {
            FileInputStream fileInputStream = new FileInputStream(str2);
            Throwable th = null;
            try {
                try {
                    blobStore.createBlob(masterStormJarKey, fileInputStream, new SettableBlobMeta(BlobStoreAclHandler.DEFAULT), subject);
                    if (fileInputStream != null) {
                        if (0 != 0) {
                            try {
                                fileInputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fileInputStream.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (fileInputStream != null) {
                    if (th != null) {
                        try {
                            fileInputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        fileInputStream.close();
                    }
                }
                throw th3;
            }
        }
        this.topoCache.addTopoConf(str, subject, map2);
        this.topoCache.addTopology(str, subject, stormTopology);
    }

    private void updateTopologyResources(String str, Map<String, Map<String, Double>> map, Subject subject) throws AuthorizationException, IOException, KeyNotFoundException {
        StormTopology deepCopy = this.topoCache.readTopology(str, subject).deepCopy();
        ResourceUtils.updateStormTopologyResources(deepCopy, map);
        this.topoCache.updateTopology(str, subject, deepCopy);
    }

    private void updateTopologyConf(String str, Map<String, Object> map, Subject subject) throws AuthorizationException, IOException, KeyNotFoundException {
        HashMap hashMap = new HashMap(this.topoCache.readTopoConf(str, subject));
        hashMap.putAll(map);
        this.topoCache.updateTopoConf(str, subject, hashMap);
    }

    private void updateBlobStore(String str, RebalanceOptions rebalanceOptions, Subject subject) throws AuthorizationException, IOException, KeyNotFoundException {
        Map<String, Map<String, Double>> map = rebalanceOptions.get_topology_resources_overrides();
        if (map != null && !map.isEmpty()) {
            updateTopologyResources(str, map, subject);
        }
        String str2 = rebalanceOptions.get_topology_conf_overrides();
        if (str2 == null || str2.isEmpty()) {
            return;
        }
        updateTopologyConf(str, Utils.parseJson(str2), subject);
    }

    private Integer getBlobReplicationCount(String str) throws Exception {
        BlobStore blobStore = this.blobStore;
        if (blobStore != null) {
            return Integer.valueOf(blobStore.getBlobReplication(str, NIMBUS_SUBJECT));
        }
        return null;
    }

    private void waitForDesiredCodeReplication(Map<String, Object> map, String str) throws Exception {
        int intValue = ObjectReader.getInt(map.get("topology.min.replication.count")).intValue();
        int intValue2 = ObjectReader.getInt(map.get("topology.max.replication.wait.time.sec")).intValue();
        int i = intValue;
        if (!ConfigUtils.isLocalMode(map)) {
            i = getBlobReplicationCount(ConfigUtils.masterStormJarKey(str)).intValue();
        }
        int intValue3 = getBlobReplicationCount(ConfigUtils.masterStormCodeKey(str)).intValue();
        int intValue4 = getBlobReplicationCount(ConfigUtils.masterStormConfKey(str)).intValue();
        long j = 0;
        if (this.blobStore != null) {
            while (i < intValue && intValue3 < intValue && intValue4 < intValue) {
                if (intValue2 > 0 && j > intValue2) {
                    LOG.info("desired replication count of {} not achieved for {} but we have hit the max wait time {} so moving on with replication count for conf key = {} for code key = {} for jar key = ", new Object[]{Integer.valueOf(intValue), str, Integer.valueOf(intValue2), Integer.valueOf(intValue4), Integer.valueOf(intValue3), Integer.valueOf(i)});
                    return;
                }
                LOG.debug("Checking if I am still the leader");
                assertIsLeader();
                LOG.info("WAITING... storm-id {}, {} <? {} {} {}", new Object[]{str, Integer.valueOf(intValue), Integer.valueOf(i), Integer.valueOf(intValue3), Integer.valueOf(intValue4)});
                LOG.info("WAITING... {} <? {}", Long.valueOf(j), Integer.valueOf(intValue2));
                Time.sleepSecs(1L);
                j++;
                if (!ConfigUtils.isLocalMode(map)) {
                    i = getBlobReplicationCount(ConfigUtils.masterStormJarKey(str)).intValue();
                }
                intValue3 = getBlobReplicationCount(ConfigUtils.masterStormCodeKey(str)).intValue();
                intValue4 = getBlobReplicationCount(ConfigUtils.masterStormConfKey(str)).intValue();
            }
        }
        LOG.info("desired replication count {} achieved for topology {}, current-replication-count for conf key = {}, current-replication-count for code key = {}, current-replication-count for jar key = {}", new Object[]{Integer.valueOf(intValue), str, Integer.valueOf(intValue4), Integer.valueOf(intValue3), Integer.valueOf(i)});
    }

    private TopologyDetails readTopologyDetails(String str, StormBase stormBase) throws KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException {
        if (!$assertionsDisabled && stormBase == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && str == null) {
            throw new AssertionError();
        }
        Map<String, Object> readTopoConfAsNimbus = readTopoConfAsNimbus(str, this.topoCache);
        StormTopology readStormTopologyAsNimbus = readStormTopologyAsNimbus(str, this.topoCache);
        if (!stormBase.is_set_principal()) {
            fixupBase(stormBase, readTopoConfAsNimbus);
            this.stormClusterState.updateStorm(str, stormBase);
        }
        Map<List<Integer>, String> computeExecutorToComponent = computeExecutorToComponent(str, stormBase, readTopoConfAsNimbus, readStormTopologyAsNimbus);
        HashMap hashMap = new HashMap();
        for (Map.Entry<List<Integer>, String> entry : computeExecutorToComponent.entrySet()) {
            List<Integer> key = entry.getKey();
            hashMap.put(new ExecutorDetails(key.get(0).intValue(), key.get(1).intValue()), entry.getValue());
        }
        return new TopologyDetails(str, readTopoConfAsNimbus, readStormTopologyAsNimbus, stormBase.get_num_workers(), hashMap, stormBase.get_launch_time_secs(), stormBase.get_owner());
    }

    private void updateHeartbeatsFromZkHeartbeat(String str, Set<List<Integer>> set, Assignment assignment) {
        LOG.debug("Updating heartbeats for {} {} (from ZK heartbeat)", str, set);
        this.heartbeatsCache.updateFromZkHeartbeat(str, StatsUtil.convertExecutorBeats(this.stormClusterState.executorBeats(str, assignment.get_executor_node_port())), set, Integer.valueOf(getTopologyHeartbeatTimeoutSecs(str)));
    }

    private void updateAllHeartbeats(Map<String, Assignment> map, Map<String, Set<List<Integer>>> map2, Set<String> set) {
        for (Map.Entry<String, Assignment> entry : map.entrySet()) {
            String key = entry.getKey();
            if (set.contains(key)) {
                updateHeartbeatsFromZkHeartbeat(key, map2.get(key), entry.getValue());
            } else {
                LOG.debug("Timing out old heartbeats for {}", key);
                this.heartbeatsCache.timeoutOldHeartbeats(key, Integer.valueOf(getTopologyHeartbeatTimeoutSecs(key)));
            }
        }
    }

    private void updateCachedHeartbeatsFromWorker(SupervisorWorkerHeartbeat supervisorWorkerHeartbeat, int i) {
        this.heartbeatsCache.updateHeartbeat(supervisorWorkerHeartbeat, Integer.valueOf(i));
    }

    private void updateCachedHeartbeatsFromSupervisor(SupervisorWorkerHeartbeats supervisorWorkerHeartbeats) {
        for (SupervisorWorkerHeartbeat supervisorWorkerHeartbeat : supervisorWorkerHeartbeats.get_worker_heartbeats()) {
            updateCachedHeartbeatsFromWorker(supervisorWorkerHeartbeat, getTopologyHeartbeatTimeoutSecs(supervisorWorkerHeartbeat.get_storm_id()));
        }
        if (this.heartbeatsReadyFlag.get() || Strings.isNullOrEmpty(supervisorWorkerHeartbeats.get_supervisor_id())) {
            return;
        }
        this.heartbeatsRecoveryStrategy.reportNodeId(supervisorWorkerHeartbeats.get_supervisor_id());
    }

    private boolean isHeartbeatsRecovered() {
        if (this.heartbeatsReadyFlag.get()) {
            return true;
        }
        HashSet hashSet = new HashSet();
        Iterator it = this.stormClusterState.assignmentsInfo().entrySet().iterator();
        while (it.hasNext()) {
            hashSet.addAll(((Assignment) ((Map.Entry) it.next()).getValue()).get_node_host().keySet());
        }
        boolean isReady = this.heartbeatsRecoveryStrategy.isReady(hashSet);
        if (isReady) {
            this.heartbeatsReadyFlag.getAndSet(true);
        }
        return isReady;
    }

    private boolean isAssignmentsRecovered() {
        return this.stormClusterState.isAssignmentsBackendSynchronized();
    }

    private Set<List<Integer>> aliveExecutors(String str, Set<List<Integer>> set, Assignment assignment) {
        return this.heartbeatsCache.getAliveExecutors(str, set, assignment, getTopologyLaunchHeartbeatTimeoutSec(str));
    }

    private List<List<Integer>> computeExecutors(StormBase stormBase, Map<String, Object> map, StormTopology stormTopology) throws InvalidTopologyException {
        if (!$assertionsDisabled && stormBase == null) {
            throw new AssertionError();
        }
        Map map2 = stormBase.get_component_executors();
        ArrayList arrayList = new ArrayList();
        if (map2 != null) {
            for (Map.Entry entry : Utils.reverseMap(StormCommon.stormTaskInfo(stormTopology, map)).entrySet()) {
                String str = (String) entry.getKey();
                List list = (List) entry.getValue();
                list.sort(null);
                Integer num = (Integer) map2.get(str);
                if (num != null) {
                    for (List list2 : Utils.partitionFixed(num.intValue(), list)) {
                        arrayList.add(Arrays.asList((Integer) list2.get(0), (Integer) list2.get(list2.size() - 1)));
                    }
                }
            }
        }
        return arrayList;
    }

    private Map<List<Integer>, String> computeExecutorToComponent(String str, StormBase stormBase, Map<String, Object> map, StormTopology stormTopology) throws InvalidTopologyException {
        ArrayList<List> arrayList = new ArrayList(getOrUpdateExecutors(str, stormBase, map, stormTopology));
        Map stormTaskInfo = StormCommon.stormTaskInfo(stormTopology, map);
        HashMap hashMap = new HashMap();
        for (List list : arrayList) {
            hashMap.put(list, stormTaskInfo.get(list.get(0)));
        }
        return hashMap;
    }

    private Map<String, Set<List<Integer>>> computeTopologyToExecutors(Map<String, StormBase> map) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException {
        HashMap hashMap = new HashMap();
        if (map != null) {
            for (Map.Entry<String, StormBase> entry : map.entrySet()) {
                String key = entry.getKey();
                Set<List<Integer>> set = this.idToExecutors.get().get(key);
                if (set == null) {
                    set = getOrUpdateExecutors(key, entry.getValue(), readTopoConfAsNimbus(key, this.topoCache), readStormTopologyAsNimbus(key, this.topoCache));
                }
                hashMap.put(key, set);
            }
        }
        return hashMap;
    }

    private Map<String, Set<List<Integer>>> computeTopologyToAliveExecutors(Map<String, Assignment> map, Map<String, Set<List<Integer>>> map2, String str) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Assignment> entry : map.entrySet()) {
            String key = entry.getKey();
            Assignment value = entry.getValue();
            Set<List<Integer>> set = map2.get(key);
            hashMap.put(key, key.equals(str) ? set : new HashSet<>(aliveExecutors(key, set, value)));
        }
        return hashMap;
    }

    private Map<String, Set<Long>> computeSupervisorToDeadPorts(Map<String, Assignment> map, Map<String, Set<List<Integer>>> map2, Map<String, Set<List<Integer>>> map3) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Assignment> entry : map.entrySet()) {
            String key = entry.getKey();
            Assignment value = entry.getValue();
            Set<List<Integer>> set = map2.get(key);
            Set<List<Integer>> set2 = map3.get(key);
            HashSet hashSet = new HashSet(set);
            hashSet.removeAll(set2);
            for (Map.Entry entry2 : value.get_executor_node_port().entrySet()) {
                if (hashSet.contains(asIntExec((List) entry2.getKey()))) {
                    NodeInfo nodeInfo = (NodeInfo) entry2.getValue();
                    String str = nodeInfo.get_node();
                    Set set3 = (Set) hashMap.get(str);
                    if (set3 == null) {
                        set3 = new HashSet();
                        hashMap.put(str, set3);
                    }
                    set3.addAll(nodeInfo.get_port());
                }
            }
        }
        return hashMap;
    }

    private Map<String, SchedulerAssignmentImpl> computeTopologyToSchedulerAssignment(Map<String, Assignment> map, Map<String, Set<List<Integer>>> map2) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Assignment> entry : map.entrySet()) {
            String key = entry.getKey();
            Assignment value = entry.getValue();
            Set<List<Integer>> set = map2.get(key);
            Map map3 = value.get_executor_node_port();
            Map map4 = value.get_worker_resources();
            HashMap hashMap2 = new HashMap();
            HashMap hashMap3 = new HashMap();
            for (Map.Entry entry2 : map4.entrySet()) {
                NodeInfo nodeInfo = (NodeInfo) entry2.getKey();
                WorkerResources workerResources = (WorkerResources) entry2.getValue();
                WorkerSlot workerSlot = new WorkerSlot(nodeInfo.get_node(), (Number) nodeInfo.get_port_iterator().next());
                hashMap2.put(nodeInfo, workerSlot);
                hashMap3.put(workerSlot, workerResources);
            }
            HashMap hashMap4 = new HashMap();
            for (Map.Entry entry3 : map3.entrySet()) {
                List<Integer> asIntExec = asIntExec((List) entry3.getKey());
                NodeInfo nodeInfo2 = (NodeInfo) entry3.getValue();
                if (set.contains(asIntExec)) {
                    hashMap4.put(new ExecutorDetails(asIntExec.get(0).intValue(), asIntExec.get(1).intValue()), hashMap2.get(nodeInfo2));
                }
            }
            hashMap.put(key, new SchedulerAssignmentImpl(key, hashMap4, hashMap3, null));
        }
        return hashMap;
    }

    private Map<String, SupervisorDetails> readAllSupervisorDetails(Map<String, Set<Long>> map, Topologies topologies, Collection<String> collection) {
        HashMap hashMap = new HashMap();
        Map allSupervisorInfo = this.stormClusterState.allSupervisorInfo();
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : allSupervisorInfo.entrySet()) {
            SupervisorInfo supervisorInfo = (SupervisorInfo) entry.getValue();
            arrayList.add(new SupervisorDetails((String) entry.getKey(), supervisorInfo.get_meta(), (Map<String, Double>) supervisorInfo.get_resources_map()));
        }
        HashMap hashMap2 = new HashMap();
        Iterator<WorkerSlot> it = this.inimbus.allSlotsAvailableForScheduling(arrayList, topologies, new HashSet(collection)).iterator();
        while (it.hasNext()) {
            String nodeId = it.next().getNodeId();
            Set set = (Set) hashMap2.get(nodeId);
            if (set == null) {
                set = new HashSet();
                hashMap2.put(nodeId, set);
            }
            set.add(Long.valueOf(r0.getPort()));
        }
        for (Map.Entry entry2 : allSupervisorInfo.entrySet()) {
            String str = (String) entry2.getKey();
            SupervisorInfo supervisorInfo2 = (SupervisorInfo) entry2.getValue();
            String str2 = supervisorInfo2.get_hostname();
            Set<Long> set2 = map.get(str);
            Set set3 = (Set) hashMap2.get(str);
            HashSet hashSet = set3 == null ? new HashSet() : new HashSet(set3);
            if (set2 != null) {
                hashSet.removeAll(set2);
            }
            hashMap.put(str, new SupervisorDetails(str, str2, supervisorInfo2.get_scheduler_meta(), hashSet, supervisorInfo2.get_resources_map()));
        }
        return hashMap;
    }

    private boolean isFragmented(SupervisorResources supervisorResources) {
        return ObjectReader.getDouble(this.conf.get("topology.component.resources.onheap.memory.mb"), Double.valueOf(256.0d)).doubleValue() + ObjectReader.getDouble(this.conf.get("topology.acker.resources.onheap.memory.mb"), Double.valueOf(128.0d)).doubleValue() > supervisorResources.getAvailableMem() || ObjectReader.getDouble(this.conf.get("topology.component.cpu.pcore.percent"), Double.valueOf(50.0d)).doubleValue() + ObjectReader.getDouble(this.conf.get("topology.acker.cpu.pcore.percent"), Double.valueOf(50.0d)).doubleValue() > supervisorResources.getAvailableCpu();
    }

    private double fragmentedMemory() {
        return Double.valueOf(this.nodeIdToResources.get().values().parallelStream().filter(this::isFragmented).mapToDouble((v0) -> {
            return v0.getAvailableMem();
        }).filter(d -> {
            return d > 0.0d;
        }).sum()).intValue();
    }

    private int fragmentedCpu() {
        return Double.valueOf(this.nodeIdToResources.get().values().parallelStream().filter(this::isFragmented).mapToDouble((v0) -> {
            return v0.getAvailableCpu();
        }).filter(d -> {
            return d > 0.0d;
        }).sum()).intValue();
    }

    private Map<String, SchedulerAssignment> computeNewSchedulerAssignments(Map<String, Assignment> map, Topologies topologies, Map<String, StormBase> map2, String str) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException {
        Map<String, Set<List<Integer>>> computeTopologyToExecutors = computeTopologyToExecutors(map2);
        updateAllHeartbeats(map, computeTopologyToExecutors, (Set) topologies.getTopologies().stream().filter(topologyDetails -> {
            return !supportRpcHeartbeat(topologyDetails);
        }).map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toSet()));
        Map<String, Set<List<Integer>>> computeTopologyToAliveExecutors = computeTopologyToAliveExecutors(map, computeTopologyToExecutors, str);
        Map<String, Set<Long>> computeSupervisorToDeadPorts = computeSupervisorToDeadPorts(map, computeTopologyToExecutors, computeTopologyToAliveExecutors);
        Map<String, SchedulerAssignmentImpl> computeTopologyToSchedulerAssignment = computeTopologyToSchedulerAssignment(map, computeTopologyToAliveExecutors);
        HashSet hashSet = new HashSet();
        for (TopologyDetails topologyDetails2 : topologies.getTopologies()) {
            String id = topologyDetails2.getId();
            Set<List<Integer>> set = computeTopologyToExecutors.get(id);
            Set<List<Integer>> set2 = computeTopologyToAliveExecutors.get(id);
            int numWorkers = topologyDetails2.getNumWorkers();
            int numUsedWorkers = numUsedWorkers(computeTopologyToSchedulerAssignment.get(id));
            if (set == null || set.isEmpty() || !set.equals(set2) || numWorkers > numUsedWorkers) {
                hashSet.add(id);
            }
        }
        Cluster cluster = new Cluster(this.inimbus, this.resourceMetrics, readAllSupervisorDetails(computeSupervisorToDeadPorts, topologies, hashSet), computeTopologyToSchedulerAssignment, topologies, this.conf);
        cluster.setStatusMap(this.idToSchedStatus.get());
        this.schedulingStartTimeNs.set(Long.valueOf(Time.nanoTime()));
        this.scheduler.schedule(topologies, cluster);
        long nanoTime = Time.nanoTime() - this.schedulingStartTimeNs.getAndSet(null).longValue();
        this.longestSchedulingTime.accumulateAndGet(nanoTime, Math::max);
        this.schedulingDuration.update(nanoTime, TimeUnit.NANOSECONDS);
        LOG.debug("Scheduling took {} ms for {} topologies", Long.valueOf(TimeUnit.NANOSECONDS.toMillis(nanoTime)), Integer.valueOf(topologies.getTopologies().size()));
        this.idToSchedStatus.set(Utils.merge(this.idToSchedStatus.get(), cluster.getStatusMap()));
        this.nodeIdToResources.set(cluster.getSupervisorsResourcesMap());
        this.idToResources.getAndAccumulate(cluster.getTopologyResourcesMap(), (map3, map4) -> {
            return Utils.merge(map3, map4);
        });
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Map<WorkerSlot, WorkerResources>> entry : cluster.getWorkerResourcesMap().entrySet()) {
            HashMap hashMap2 = new HashMap();
            for (Map.Entry<WorkerSlot, WorkerResources> entry2 : entry.getValue().entrySet()) {
                hashMap2.put(entry2.getKey(), entry2.getValue());
            }
            hashMap.put(entry.getKey(), hashMap2);
        }
        this.idToWorkerResources.getAndAccumulate(hashMap, (map5, map6) -> {
            return Utils.merge(map5, map6);
        });
        return cluster.getAssignments();
    }

    private boolean supportRpcHeartbeat(TopologyDetails topologyDetails) {
        if (this.stormClusterState.isPacemakerStateStore()) {
            return false;
        }
        return !topologyDetails.getTopology().is_set_storm_version() || new SimpleVersion(topologyDetails.getTopology().get_storm_version()).compareTo(MIN_VERSION_SUPPORT_RPC_HEARTBEAT) >= 0;
    }

    private TopologyResources getResourcesForTopology(String str, StormBase stormBase) throws NotAliveException, AuthorizationException, InvalidTopologyException, IOException {
        TopologyResources topologyResources = this.idToResources.get().get(str);
        if (topologyResources == null) {
            try {
                topologyResources = new TopologyResources(readTopologyDetails(str, stormBase), this.stormClusterState.assignmentInfo(str, (Runnable) null));
            } catch (KeyNotFoundException e) {
                LOG.error("Failed to get topology details", e);
                topologyResources = new TopologyResources();
            }
        }
        return topologyResources;
    }

    private Map<WorkerSlot, WorkerResources> getWorkerResourcesForTopology(String str) {
        Map map = this.idToWorkerResources.get().get(str);
        if (map == null) {
            IStormClusterState iStormClusterState = this.stormClusterState;
            map = new HashMap();
            Assignment assignmentInfo = iStormClusterState.assignmentInfo(str, (Runnable) null);
            if (assignmentInfo != null && assignmentInfo.is_set_worker_resources()) {
                for (Map.Entry entry : assignmentInfo.get_worker_resources().entrySet()) {
                    NodeInfo nodeInfo = (NodeInfo) entry.getKey();
                    map.put(new WorkerSlot(nodeInfo.get_node(), (Number) nodeInfo.get_port_iterator().next()), entry.getValue());
                }
                this.idToWorkerResources.getAndUpdate(new Assoc(str, map));
            }
        }
        return map;
    }

    private boolean isReadyForMKAssignments() throws Exception {
        if (!isLeader()) {
            LOG.info("not a leader, skipping assignments");
            return false;
        }
        if (isHeartbeatsRecovered()) {
            if (isAssignmentsRecovered()) {
                return true;
            }
            LOG.warn("waiting for assignments recovery, skipping assignments");
        }
        LOG.warn("waiting for worker heartbeats recovery, skipping assignments");
        return false;
    }

    private void mkAssignments() throws Exception {
        mkAssignments(null);
    }

    private void mkAssignments(String str) throws Exception {
        Map<String, StormBase> map;
        TopologyDetails topologyDetails;
        try {
            if (isReadyForMKAssignments()) {
                IStormClusterState iStormClusterState = this.stormClusterState;
                HashMap hashMap = new HashMap();
                synchronized (this.submitLock) {
                    map = iStormClusterState.topologyBases();
                    Iterator<Map.Entry<String, StormBase>> it = map.entrySet().iterator();
                    while (it.hasNext()) {
                        Map.Entry<String, StormBase> next = it.next();
                        String key = next.getKey();
                        try {
                            hashMap.put(key, readTopologyDetails(key, next.getValue()));
                        } catch (KeyNotFoundException e) {
                            it.remove();
                        }
                    }
                }
                List<String> assignments = iStormClusterState.assignments((Runnable) null);
                HashMap hashMap2 = new HashMap();
                for (String str2 : assignments) {
                    if (!str2.equals(str)) {
                        Assignment assignmentInfo = iStormClusterState.assignmentInfo(str2, (Runnable) null);
                        if (!assignmentInfo.is_set_owner() && (topologyDetails = hashMap.get(str2)) != null) {
                            assignmentInfo.set_owner(topologyDetails.getTopologySubmitter());
                            iStormClusterState.setAssignment(str2, assignmentInfo, topologyDetails.getConf());
                        }
                        hashMap2.put(str2, assignmentInfo);
                    }
                }
                lockingMkAssignments(hashMap2, map, str, assignments, iStormClusterState, hashMap);
            }
        } catch (Exception e2) {
            this.mkAssignmentsErrors.mark();
            throw e2;
        }
    }

    private void lockingMkAssignments(Map<String, Assignment> map, Map<String, StormBase> map2, String str, List<String> list, IStormClusterState iStormClusterState, Map<String, TopologyDetails> map3) throws Exception {
        Topologies topologies = new Topologies(map3);
        synchronized (this.schedLock) {
            Map<String, SchedulerAssignment> computeNewSchedulerAssignments = computeNewSchedulerAssignments(map, topologies, map2, str);
            Map<String, Map<List<Long>, List<Object>>> computeTopoToExecToNodePort = computeTopoToExecToNodePort(computeNewSchedulerAssignments, list);
            Map<String, Map<WorkerSlot, WorkerResources>> computeTopoToNodePortToResources = computeTopoToNodePortToResources(computeNewSchedulerAssignments);
            int currentTimeSecs = Time.currentTimeSecs();
            Map<String, SupervisorDetails> basicSupervisorDetailsMap = basicSupervisorDetailsMap(iStormClusterState);
            HashMap hashMap = new HashMap();
            for (Map.Entry<String, Map<List<Long>, List<Object>>> entry : computeTopoToExecToNodePort.entrySet()) {
                String key = entry.getKey();
                Map<List<Long>, List<Object>> value = entry.getValue();
                if (value == null) {
                    value = new HashMap();
                }
                HashSet<String> hashSet = new HashSet();
                Iterator<List<Object>> it = value.values().iterator();
                while (it.hasNext()) {
                    hashSet.add((String) it.next().get(0));
                }
                HashMap hashMap2 = new HashMap();
                Assignment assignment = map.get(key);
                if (assignment != null) {
                    hashMap2.putAll(assignment.get_node_host());
                }
                for (String str2 : hashSet) {
                    String hostName = this.inimbus.getHostName(basicSupervisorDetailsMap, str2);
                    if (hostName != null) {
                        hashMap2.put(str2, hostName);
                    }
                }
                List<List<Long>> changedExecutors = changedExecutors(assignment != null ? assignment.get_executor_node_port() : null, value);
                HashMap hashMap3 = new HashMap();
                if (assignment != null) {
                    hashMap3.putAll(assignment.get_executor_start_time_secs());
                }
                Iterator<List<Long>> it2 = changedExecutors.iterator();
                while (it2.hasNext()) {
                    hashMap3.put(it2.next(), Long.valueOf(currentTimeSecs));
                }
                Map<WorkerSlot, WorkerResources> map4 = computeTopoToNodePortToResources.get(key);
                if (map4 == null) {
                    map4 = new HashMap();
                }
                Assignment assignment2 = new Assignment((String) this.conf.get("storm.local.dir"));
                HashMap hashMap4 = new HashMap(hashMap2);
                hashMap4.keySet().retainAll(hashSet);
                assignment2.set_node_host(hashMap4);
                HashMap hashMap5 = new HashMap();
                for (Map.Entry<List<Long>, List<Object>> entry2 : value.entrySet()) {
                    List<Object> value2 = entry2.getValue();
                    NodeInfo nodeInfo = new NodeInfo();
                    nodeInfo.set_node((String) value2.get(0));
                    nodeInfo.add_to_port(((Long) value2.get(1)).longValue());
                    hashMap5.put(entry2.getKey(), nodeInfo);
                }
                assignment2.set_executor_node_port(hashMap5);
                assignment2.set_executor_start_time_secs(hashMap3);
                HashMap hashMap6 = new HashMap();
                for (Map.Entry<WorkerSlot, WorkerResources> entry3 : map4.entrySet()) {
                    WorkerSlot key2 = entry3.getKey();
                    NodeInfo nodeInfo2 = new NodeInfo();
                    nodeInfo2.set_node(key2.getNodeId());
                    nodeInfo2.add_to_port(key2.getPort());
                    hashMap6.put(nodeInfo2, entry3.getValue());
                }
                assignment2.set_worker_resources(hashMap6);
                assignment2.set_owner(map3.get(key).getTopologySubmitter());
                hashMap.put(key, assignment2);
            }
            if (auditAssignmentChanges(map, hashMap)) {
                LOG.debug("RESETTING id->resources and id->worker-resources cache!");
                this.idToResources.set(new HashMap());
                this.idToWorkerResources.set(new HashMap());
            }
            for (Map.Entry<String, Assignment> entry4 : hashMap.entrySet()) {
                String key3 = entry4.getKey();
                Assignment value3 = entry4.getValue();
                Assignment assignment3 = map.get(key3);
                TopologyDetails byId = topologies.getById(key3);
                if (value3.equals(assignment3)) {
                    LOG.debug("Assignment for {} hasn't changed", key3);
                } else {
                    LOG.info("Setting new assignment for topology id {}: {}", key3, value3);
                    iStormClusterState.setAssignment(key3, value3, byId.getConf());
                }
            }
            HashMap hashMap7 = new HashMap();
            for (Map.Entry<String, Assignment> entry5 : hashMap.entrySet()) {
                hashMap7.putAll(assignmentChangedNodes(map.get(entry5.getKey()), entry5.getValue()));
            }
            notifySupervisorsAssignments(hashMap, this.assignmentsDistributer, hashMap7, basicSupervisorDetailsMap, getMetricsRegistry());
            HashMap hashMap8 = new HashMap();
            for (Map.Entry<String, Assignment> entry6 : hashMap.entrySet()) {
                String key4 = entry6.getKey();
                Assignment value4 = entry6.getValue();
                Assignment assignment4 = map.get(key4);
                if (assignment4 == null) {
                    assignment4 = new Assignment();
                    assignment4.set_executor_node_port(new HashMap());
                    assignment4.set_executor_start_time_secs(new HashMap());
                }
                hashMap8.put(key4, newlyAddedSlots(assignment4, value4));
            }
            this.inimbus.assignSlots(topologies, hashMap8);
        }
    }

    private void notifyTopologyActionListener(String str, String str2) {
        ITopologyActionNotifierPlugin iTopologyActionNotifierPlugin = this.nimbusTopologyActionNotifier;
        if (iTopologyActionNotifierPlugin != null) {
            try {
                iTopologyActionNotifierPlugin.notify(str, str2);
            } catch (Exception e) {
                LOG.warn("Ignoring exception from Topology action notifier for storm-Id {}", str, e);
            }
        }
    }

    private void fixupBase(StormBase stormBase, Map<String, Object> map) {
        stormBase.set_owner((String) map.get("topology.submitter.user"));
        stormBase.set_principal((String) map.get("topology.submitter.principal"));
    }

    private int getTopologyHeartbeatTimeoutSecs(Map<String, Object> map) {
        int intValue = ObjectReader.getInt(this.conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)).intValue();
        return map.containsKey("topology.worker.timeout.secs") ? Math.max(ObjectReader.getInt(map.get("topology.worker.timeout.secs")).intValue(), intValue) : intValue;
    }

    private int getTopologyHeartbeatTimeoutSecs(String str) {
        try {
            return getTopologyHeartbeatTimeoutSecs(tryReadTopoConf(str, this.topoCache));
        } catch (Exception e) {
            LOG.warn("Exception when getting heartbeat timeout.", e.getMessage());
            return ObjectReader.getInt(this.conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)).intValue();
        }
    }

    private int getTopologyLaunchHeartbeatTimeoutSec(String str) {
        return Math.max(ObjectReader.getInt(this.conf.get(DaemonConfig.NIMBUS_TASK_LAUNCH_SECS)).intValue(), getTopologyHeartbeatTimeoutSecs(str));
    }

    private void startTopology(String str, String str2, TopologyStatus topologyStatus, String str3, String str4, Map<String, Object> map, StormTopology stormTopology) throws InvalidTopologyException {
        if (!$assertionsDisabled && TopologyStatus.ACTIVE != topologyStatus && TopologyStatus.INACTIVE != topologyStatus) {
            throw new AssertionError();
        }
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : StormCommon.allComponents(StormCommon.systemTopology(map, stormTopology)).entrySet()) {
            hashMap.put(entry.getKey(), Integer.valueOf(StormCommon.numStartExecutors(entry.getValue())));
        }
        LOG.info("Activating {}: {}", str, str2);
        StormBase stormBase = new StormBase();
        stormBase.set_name(str);
        if (map.containsKey("topology.version")) {
            stormBase.set_topology_version(ObjectReader.getString(map.get("topology.version")));
        }
        stormBase.set_launch_time_secs(Time.currentTimeSecs());
        stormBase.set_status(topologyStatus);
        stormBase.set_num_workers(ObjectReader.getInt(map.get("topology.workers"), 0).intValue());
        stormBase.set_component_executors(hashMap);
        stormBase.set_owner(str3);
        stormBase.set_principal(str4);
        stormBase.set_component_debug(new HashMap());
        this.stormClusterState.activateStorm(str2, stormBase, map);
        this.idToExecutors.getAndUpdate(new Assoc(str2, new HashSet(computeExecutors(stormBase, map, stormTopology))));
        notifyTopologyActionListener(str, "activate");
    }

    private void assertTopoActive(String str, boolean z) throws NotAliveException, AlreadyAliveException {
        if (isTopologyActive(this.stormClusterState, str) != z) {
            if (!z) {
                throw new WrappedAlreadyAliveException(str + " is already alive");
            }
            throw new WrappedNotAliveException(str + " is not alive");
        }
    }

    private Map<String, Object> tryReadTopoConfFromName(String str) throws NotAliveException, AuthorizationException, IOException {
        return tryReadTopoConf(toTopoId(str), this.topoCache);
    }

    private StormTopology tryReadTopologyFromName(String str) throws NotAliveException, AuthorizationException, IOException {
        return tryReadTopology(toTopoId(str), this.topoCache);
    }

    @VisibleForTesting
    public void checkAuthorization(String str, Map<String, Object> map, String str2) throws AuthorizationException {
        checkAuthorization(str, map, str2, null);
    }

    @VisibleForTesting
    public void checkAuthorization(String str, Map<String, Object> map, String str2, ReqContext reqContext) throws AuthorizationException {
        IAuthorizer iAuthorizer = this.impersonationAuthorizationHandler;
        if (reqContext == null) {
            reqContext = ReqContext.context();
        }
        HashMap hashMap = new HashMap();
        if (map != null) {
            hashMap.putAll(map);
        } else if (str != null) {
            hashMap.put("topology.name", str);
        }
        if (reqContext.isImpersonating()) {
            LOG.info("principal: {} is trying to impersonate principal: {}", reqContext.realPrincipal(), reqContext.principal());
            if (iAuthorizer == null) {
                LOG.warn("impersonation attempt but {} has no authorizer configured. potential security risk, please see SECURITY.MD to learn how to configure impersonation authorizer.", DaemonConfig.NIMBUS_IMPERSONATION_AUTHORIZER);
            } else if (!iAuthorizer.permit(reqContext, str2, hashMap)) {
                ThriftAccessLogger.logAccess(Integer.valueOf(reqContext.requestID()), reqContext.remoteAddress(), reqContext.principal(), str2, str, "access-denied");
                throw new WrappedAuthorizationException("principal " + reqContext.realPrincipal() + " is not authorized to impersonate principal " + reqContext.principal() + " from host " + reqContext.remoteAddress() + " Please see SECURITY.MD to learn how to configure impersonation acls.");
            }
        }
        IAuthorizer iAuthorizer2 = this.authorizationHandler;
        if (iAuthorizer2 != null) {
            if (iAuthorizer2.permit(reqContext, str2, hashMap)) {
                ThriftAccessLogger.logAccess(Integer.valueOf(reqContext.requestID()), reqContext.remoteAddress(), reqContext.principal(), str2, str, "access-granted");
            } else {
                ThriftAccessLogger.logAccess(Integer.valueOf(reqContext.requestID()), reqContext.remoteAddress(), reqContext.principal(), str2, str, "access-denied");
                throw new WrappedAuthorizationException(str2 + (str != null ? " on topology " + str : "") + " is not authorized");
            }
        }
    }

    private boolean isAuthorized(String str, String str2) throws NotAliveException, AuthorizationException, IOException {
        Map<String, Object> merge = Utils.merge(this.conf, tryReadTopoConf(str2, this.topoCache));
        try {
            checkAuthorization((String) merge.get("topology.name"), merge, str);
            return true;
        } catch (AuthorizationException e) {
            return false;
        }
    }

    @VisibleForTesting
    public Set<String> filterAuthorized(String str, Collection<String> collection) throws NotAliveException, AuthorizationException, IOException {
        HashSet hashSet = new HashSet();
        for (String str2 : collection) {
            if (isAuthorized(str, str2)) {
                hashSet.add(str2);
            }
        }
        return hashSet;
    }

    @VisibleForTesting
    public void rmDependencyJarsInTopology(String str) {
        try {
            BlobStore blobStore = this.blobStore;
            IStormClusterState iStormClusterState = this.stormClusterState;
            List list = readStormTopologyAsNimbus(str, this.topoCache).get_dependency_jars();
            LOG.info("Removing dependency jars from blobs - {}", list);
            if (list != null && !list.isEmpty()) {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    rmBlobKey(blobStore, (String) it.next(), iStormClusterState);
                }
            }
        } catch (Exception e) {
            LOG.info("Exception {}", e);
        }
    }

    @VisibleForTesting
    public void rmTopologyKeys(String str) {
        BlobStore blobStore = this.blobStore;
        IStormClusterState iStormClusterState = this.stormClusterState;
        try {
            this.topoCache.deleteTopoConf(str, NIMBUS_SUBJECT);
        } catch (Exception e) {
        }
        try {
            this.topoCache.deleteTopology(str, NIMBUS_SUBJECT);
        } catch (Exception e2) {
        }
        rmBlobKey(blobStore, ConfigUtils.masterStormJarKey(str), iStormClusterState);
    }

    @VisibleForTesting
    public void forceDeleteTopoDistDir(String str) throws IOException {
        Utils.forceDelete(ServerConfigUtils.masterStormDistRoot(this.conf, str));
    }

    @VisibleForTesting
    public void doCleanup() throws Exception {
        Set<String> set;
        if (!isLeader()) {
            LOG.info("not a leader, skipping cleanup");
            return;
        }
        IStormClusterState iStormClusterState = this.stormClusterState;
        synchronized (this.submitLock) {
            set = topoIdsToClean(iStormClusterState, this.blobStore, this.conf);
        }
        if (set != null) {
            for (String str : set) {
                LOG.info("Cleaning up {}", str);
                iStormClusterState.teardownHeartbeats(str);
                iStormClusterState.teardownTopologyErrors(str);
                iStormClusterState.removeAllPrivateWorkerKeys(str);
                iStormClusterState.removeBackpressure(str);
                rmDependencyJarsInTopology(str);
                forceDeleteTopoDistDir(str);
                rmTopologyKeys(str);
                this.heartbeatsCache.removeTopo(str);
                this.idToExecutors.getAndUpdate(new Dissoc(str));
            }
        }
    }

    private void cleanTopologyHistory(int i) {
        int currentTimeSecs = Time.currentTimeSecs() - (i * 60);
        synchronized (this.topologyHistoryLock) {
            this.topologyHistoryState.filterOldTopologies(currentTimeSecs);
        }
    }

    private void addTopoToHistoryLog(String str, Map<String, Object> map) {
        LOG.info("Adding topo to history log: {}", str);
        LocalState localState = this.topologyHistoryState;
        List<String> topoLogsUsers = ServerConfigUtils.getTopoLogsUsers(map);
        List<String> topoLogsGroups = ServerConfigUtils.getTopoLogsGroups(map);
        synchronized (this.topologyHistoryLock) {
            localState.addTopologyHistory(new LSTopoHistory(str, Time.currentTimeSecs(), topoLogsUsers, topoLogsGroups));
        }
    }

    private Set<String> userGroups(String str) throws IOException {
        return (str == null || str.isEmpty()) ? Collections.emptySet() : this.groupMapper.getGroups(str);
    }

    private boolean isUserPartOf(String str, Collection<String> collection) throws IOException {
        HashSet hashSet = new HashSet(userGroups(str));
        hashSet.retainAll(collection);
        return !hashSet.isEmpty();
    }

    private List<String> readTopologyHistory(String str, Collection<String> collection) throws IOException {
        List<LSTopoHistory> topoHistoryList = this.topologyHistoryState.getTopoHistoryList();
        if (topoHistoryList == null || topoHistoryList.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        for (LSTopoHistory lSTopoHistory : topoHistoryList) {
            if (str == null || collection.contains(str) || isUserPartOf(str, lSTopoHistory.get_groups()) || lSTopoHistory.get_users().contains(str)) {
                arrayList.add(lSTopoHistory.get_topology_id());
            }
        }
        return arrayList;
    }

    private void renewCredentials() throws Exception {
        if (!isLeader()) {
            LOG.info("not a leader, skipping credential renewal.");
            return;
        }
        IStormClusterState iStormClusterState = this.stormClusterState;
        Collection<ICredentialsRenewer> collection = this.credRenewers;
        Map map = iStormClusterState.topologyBases();
        if (map != null) {
            for (Map.Entry entry : map.entrySet()) {
                String str = (String) entry.getKey();
                String str2 = ((StormBase) entry.getValue()).get_principal();
                Map unmodifiableMap = Collections.unmodifiableMap(Utils.merge(this.conf, tryReadTopoConf(str, this.topoCache)));
                synchronized (this.credUpdateLock) {
                    Credentials credentials = iStormClusterState.credentials(str, (Runnable) null);
                    if (credentials != null) {
                        Map map2 = credentials.get_creds();
                        HashMap hashMap = new HashMap(map2);
                        for (ICredentialsRenewer iCredentialsRenewer : collection) {
                            LOG.info("Renewing Creds For {} with {} owned by {}", new Object[]{str, iCredentialsRenewer, str2});
                            iCredentialsRenewer.renew(hashMap, unmodifiableMap, str2);
                        }
                        upsertWorkerTokensInCreds(hashMap, str2, str);
                        if (!hashMap.equals(map2)) {
                            iStormClusterState.setCredentials(str, new Credentials(hashMap), unmodifiableMap);
                        }
                    }
                }
            }
        }
    }

    private SupervisorSummary makeSupervisorSummary(String str, SupervisorInfo supervisorInfo) {
        Set<String> emptySet = Collections.emptySet();
        if (this.scheduler instanceof BlacklistScheduler) {
            emptySet = ((BlacklistScheduler) this.scheduler).getBlacklistSupervisorIds();
        }
        LOG.debug("INFO: {} ID: {}", supervisorInfo, str);
        int i = 0;
        if (supervisorInfo.is_set_meta()) {
            i = supervisorInfo.get_meta_size();
        }
        int i2 = 0;
        if (supervisorInfo.is_set_used_ports()) {
            i2 = supervisorInfo.get_used_ports_size();
        }
        LOG.debug("NUM PORTS: {}", Integer.valueOf(i));
        SupervisorSummary supervisorSummary = new SupervisorSummary(supervisorInfo.get_hostname(), (int) supervisorInfo.get_uptime_secs(), i, i2, str);
        supervisorSummary.set_total_resources(supervisorInfo.get_resources_map());
        SupervisorResources supervisorResources = this.nodeIdToResources.get().get(str);
        if (supervisorResources != null && (this.underlyingScheduler instanceof ResourceAwareScheduler)) {
            supervisorSummary.set_used_mem(supervisorResources.getUsedMem());
            supervisorSummary.set_used_cpu(supervisorResources.getUsedCpu());
            supervisorSummary.set_used_generic_resources(supervisorResources.getUsedGenericResources());
            if (isFragmented(supervisorResources)) {
                double availableCpu = supervisorResources.getAvailableCpu();
                if (availableCpu < 0.0d) {
                    LOG.warn("Negative fragmented CPU on {}", str);
                }
                supervisorSummary.set_fragmented_cpu(availableCpu);
                double availableMem = supervisorResources.getAvailableMem();
                if (availableMem < 0.0d) {
                    LOG.warn("Negative fragmented Mem on {}", str);
                }
                supervisorSummary.set_fragmented_mem(availableMem);
            }
        }
        if (supervisorInfo.is_set_version()) {
            supervisorSummary.set_version(supervisorInfo.get_version());
        }
        if (emptySet.contains(str)) {
            supervisorSummary.set_blacklisted(true);
        } else {
            supervisorSummary.set_blacklisted(false);
        }
        return supervisorSummary;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ClusterSummary getClusterInfoImpl() throws Exception {
        IStormClusterState iStormClusterState = this.stormClusterState;
        Map allSupervisorInfo = iStormClusterState.allSupervisorInfo();
        ArrayList arrayList = new ArrayList(allSupervisorInfo.size());
        for (Map.Entry entry : allSupervisorInfo.entrySet()) {
            arrayList.add(makeSupervisorSummary((String) entry.getKey(), (SupervisorInfo) entry.getValue()));
        }
        this.uptime.upTime();
        List<NimbusSummary> nimbuses = iStormClusterState.nimbuses();
        NimbusInfo leader = this.leaderElector.getLeader();
        for (NimbusSummary nimbusSummary : nimbuses) {
            nimbusSummary.set_uptime_secs(Time.deltaSecs(nimbusSummary.get_uptime_secs()));
            boolean z = leader.getHost().equals(nimbusSummary.get_host()) && leader.getPort() == nimbusSummary.get_port();
            if (z && this.nimbusHostPortInfo.getHost().equals(leader.getHost()) && !isLeader()) {
                z = false;
            }
            nimbusSummary.set_isLeader(z);
        }
        return new ClusterSummary(arrayList, getTopologySummariesImpl(), nimbuses);
    }

    private List<TopologySummary> getTopologySummariesImpl() throws IOException, TException {
        IStormClusterState iStormClusterState = this.stormClusterState;
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : iStormClusterState.topologyBases().entrySet()) {
            StormBase stormBase = (StormBase) entry.getValue();
            if (stormBase != null) {
                arrayList.add(getTopologySummaryImpl((String) entry.getKey(), stormBase));
            }
        }
        return arrayList;
    }

    private TopologySummary getTopologySummaryImpl(String str, StormBase stormBase) throws IOException, TException {
        Assignment assignmentInfo = this.stormClusterState.assignmentInfo(str, (Runnable) null);
        int i = 0;
        int i2 = 0;
        int i3 = 0;
        if (assignmentInfo != null && assignmentInfo.is_set_executor_node_port()) {
            Iterator it = assignmentInfo.get_executor_node_port().keySet().iterator();
            while (it.hasNext()) {
                i += StormCommon.executorIdToTasks((List) it.next()).size();
            }
            i2 = assignmentInfo.get_executor_node_port_size();
            i3 = new HashSet(assignmentInfo.get_executor_node_port().values()).size();
        }
        TopologySummary topologySummary = new TopologySummary(str, stormBase.get_name(), i, i2, i3, Time.deltaSecs(stormBase.get_launch_time_secs()), extractStatusStr(stormBase));
        try {
            StormTopology tryReadTopology = tryReadTopology(str, this.topoCache);
            if (tryReadTopology != null && tryReadTopology.is_set_storm_version()) {
                topologySummary.set_storm_version(tryReadTopology.get_storm_version());
            }
        } catch (NotAliveException e) {
        }
        if (stormBase.is_set_owner()) {
            topologySummary.set_owner(stormBase.get_owner());
        }
        if (stormBase.is_set_topology_version()) {
            topologySummary.set_topology_version(stormBase.get_topology_version());
        }
        String str2 = this.idToSchedStatus.get().get(str);
        if (str2 != null) {
            topologySummary.set_sched_status(str2);
        }
        TopologyResources resourcesForTopology = getResourcesForTopology(str, stormBase);
        if (resourcesForTopology != null) {
            topologySummary.set_requested_memonheap(resourcesForTopology.getRequestedMemOnHeap());
            topologySummary.set_requested_memoffheap(resourcesForTopology.getRequestedMemOffHeap());
            topologySummary.set_requested_cpu(resourcesForTopology.getRequestedCpu());
            topologySummary.set_requested_generic_resources(resourcesForTopology.getRequestedGenericResources());
            topologySummary.set_assigned_memonheap(resourcesForTopology.getAssignedMemOnHeap());
            topologySummary.set_assigned_memoffheap(resourcesForTopology.getAssignedMemOffHeap());
            topologySummary.set_assigned_cpu(resourcesForTopology.getAssignedCpu());
            topologySummary.set_assigned_generic_resources(resourcesForTopology.getAssignedGenericResources());
        }
        try {
            topologySummary.set_replication_count(getBlobReplicationCount(ConfigUtils.masterStormCodeKey(str)).intValue());
        } catch (Exception e2) {
            LOG.error("Unable to find blob entry", e2);
        }
        return topologySummary;
    }

    private void sendClusterMetricsToExecutors() throws Exception {
        IClusterMetricsConsumer.ClusterInfo mkClusterInfo = mkClusterInfo();
        ClusterSummary clusterInfoImpl = getClusterInfoImpl();
        List<DataPoint> extractClusterMetrics = extractClusterMetrics(clusterInfoImpl);
        Map<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> extractSupervisorMetrics = extractSupervisorMetrics(clusterInfoImpl);
        for (ClusterMetricsConsumerExecutor clusterMetricsConsumerExecutor : this.clusterConsumerExceutors) {
            clusterMetricsConsumerExecutor.handleDataPoints(mkClusterInfo, extractClusterMetrics);
            for (Map.Entry<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> entry : extractSupervisorMetrics.entrySet()) {
                clusterMetricsConsumerExecutor.handleDataPoints(entry.getKey(), entry.getValue());
            }
        }
    }

    private CommonTopoInfo getCommonTopoInfo(String str, String str2) throws NotAliveException, AuthorizationException, IOException, InvalidTopologyException {
        CommonTopoInfo commonTopoInfo = new CommonTopoInfo(null);
        commonTopoInfo.topoConf = tryReadTopoConf(str, this.topoCache);
        commonTopoInfo.topoName = (String) commonTopoInfo.topoConf.get("topology.name");
        checkAuthorization(commonTopoInfo.topoName, commonTopoInfo.topoConf, str2);
        StormTopology tryReadTopology = tryReadTopology(str, this.topoCache);
        commonTopoInfo.topology = StormCommon.systemTopology(commonTopoInfo.topoConf, tryReadTopology);
        commonTopoInfo.taskToComponent = StormCommon.stormTaskInfo(tryReadTopology, commonTopoInfo.topoConf);
        IStormClusterState iStormClusterState = this.stormClusterState;
        commonTopoInfo.base = iStormClusterState.stormBase(str, (Runnable) null);
        if (commonTopoInfo.base == null || !commonTopoInfo.base.is_set_launch_time_secs()) {
            commonTopoInfo.launchTimeSecs = 0;
        } else {
            commonTopoInfo.launchTimeSecs = commonTopoInfo.base.get_launch_time_secs();
        }
        commonTopoInfo.assignment = iStormClusterState.assignmentInfo(str, (Runnable) null);
        commonTopoInfo.beats = commonTopoInfo.assignment != null ? StatsUtil.convertExecutorBeats(iStormClusterState.executorBeats(str, commonTopoInfo.assignment.get_executor_node_port())) : Collections.emptyMap();
        commonTopoInfo.allComponents = new HashSet<>(commonTopoInfo.taskToComponent.values());
        return commonTopoInfo;
    }

    @VisibleForTesting
    public boolean awaitLeadership(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.leaderElector.awaitLeadership(j, timeUnit);
    }

    public void submitTopology(String str, String str2, String str3, StormTopology stormTopology) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, TException {
        this.submitTopologyCalls.mark();
        submitTopologyWithOpts(str, str2, str3, stormTopology, new SubmitOptions(TopologyInitialStatus.ACTIVE));
    }

    private void upsertWorkerTokensInCreds(Map<String, String> map, String str, String str2) {
        if (this.workerTokenManager != null) {
            this.workerTokenManager.upsertWorkerTokensInCredsForTopo(map, str, str2);
        }
        this.stormClusterState.removeExpiredPrivateWorkerKeys(str2);
    }

    public void submitTopologyWithOpts(String str, String str2, String str3, StormTopology stormTopology, SubmitOptions submitOptions) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, TException {
        TopologyStatus topologyStatus;
        Map map;
        try {
            this.submitTopologyWithOptsCalls.mark();
            assertIsLeader();
            if (!$assertionsDisabled && submitOptions == null) {
                throw new AssertionError();
            }
            validateTopologyName(str);
            checkAuthorization(str, null, "submitTopology");
            assertTopoActive(str, false);
            Map<String, Object> map2 = (Map) JSONValue.parse(str3);
            try {
                ConfigValidation.validateTopoConf(map2);
                this.validator.validate(str, map2, stormTopology);
                if (((Boolean) this.conf.getOrDefault("storm.disable.symlinks", false)).booleanValue() && (map = (Map) map2.get("topology.blobstore.map")) != null && !map.isEmpty()) {
                    throw new WrappedInvalidTopologyException("symlinks are disabled so blobs are not supported but topology.blobstore.map = " + map);
                }
                ServerUtils.validateTopologyWorkerMaxHeapSizeConfigs(map2, stormTopology, ObjectReader.getDouble(this.conf.get("topology.worker.max.heap.size.mb")).doubleValue());
                Utils.validateTopologyBlobStoreMap(map2, this.blobStore);
                String str4 = str + "-" + this.submittedCount.incrementAndGet() + "-" + Time.currentTimeSecs();
                Map<String, String> map3 = null;
                if (submitOptions.is_set_creds()) {
                    map3 = submitOptions.get_creds().get_creds();
                }
                map2.put("storm.id", str4);
                map2.put("topology.name", str);
                Map<String, Object> normalizeConf = normalizeConf(this.conf, map2, stormTopology);
                OciUtils.adjustImageConfigForTopo(this.conf, normalizeConf, str4);
                Principal principal = ReqContext.context().principal();
                String principal2 = principal == null ? null : principal.toString();
                HashSet hashSet = new HashSet(ObjectReader.getStrings(normalizeConf.get("topology.users")));
                hashSet.add(principal2);
                String local = this.principalToLocal.toLocal(principal);
                hashSet.add(local);
                String str5 = (String) Utils.OR(principal2, "");
                normalizeConf.put("topology.submitter.principal", str5);
                String str6 = (String) Utils.OR(local, System.getProperty("user.name"));
                normalizeConf.put("topology.submitter.user", str6);
                normalizeConf.put("topology.users", new ArrayList(hashSet));
                normalizeConf.put("storm.zookeeper.superACL", this.conf.get("storm.zookeeper.superACL"));
                if (!Utils.isZkAuthenticationConfiguredStormServer(this.conf)) {
                    normalizeConf.remove("storm.zookeeper.topology.auth.scheme");
                    normalizeConf.remove("storm.zookeeper.topology.auth.payload");
                }
                if (!((Boolean) this.conf.getOrDefault(DaemonConfig.STORM_TOPOLOGY_CLASSPATH_BEGINNING_ENABLED, false)).booleanValue()) {
                    normalizeConf.remove("topology.classpath.beginning");
                }
                String str7 = stormTopology.get_storm_version();
                if (str7 == null) {
                    str7 = (String) this.conf.getOrDefault("supervisor.worker.default.version", VersionInfo.getVersion());
                }
                List list = (List) Utils.getCompatibleVersion(this.supervisorClasspaths, new SimpleVersion(str7), "classpath", (Object) null);
                if (list == null) {
                    throw new WrappedInvalidTopologyException("Topology submitted with storm version " + str7 + " but could not find a configured compatible version to use " + this.supervisorClasspaths.keySet());
                }
                Map<String, Object> merge = Utils.merge(Utils.getConfigFromClasspath(list, this.conf), normalizeConf);
                Map<String, Object> merge2 = Utils.merge(this.conf, merge);
                StormTopology normalizeTopology = normalizeTopology(merge2, stormTopology);
                if (ServerUtils.isRas(this.conf)) {
                    int estimatedWorkerCountForRasTopo = ServerUtils.getEstimatedWorkerCountForRasTopo(merge2, normalizeTopology);
                    setUpAckerExecutorConfigs(str, merge, merge2, estimatedWorkerCountForRasTopo);
                    ServerUtils.validateTopologyAckerBundleResource(merge, normalizeTopology, str);
                    int intValue = ObjectReader.getInt(merge2.get("topology.eventlogger.executors"), Integer.valueOf(estimatedWorkerCountForRasTopo)).intValue();
                    merge.put("topology.eventlogger.executors", Integer.valueOf(intValue));
                    LOG.debug("Config {} set to: {} for topology: {}", new Object[]{"topology.eventlogger.executors", Integer.valueOf(intValue), str});
                }
                merge.remove("storm.local.hostname");
                IStormClusterState iStormClusterState = this.stormClusterState;
                if (map3 == null && this.workerTokenManager != null) {
                    map3 = new HashMap();
                }
                if (map3 != null) {
                    Map unmodifiableMap = Collections.unmodifiableMap(normalizeConf);
                    Iterator<INimbusCredentialPlugin> it = this.nimbusAutocredPlugins.iterator();
                    while (it.hasNext()) {
                        it.next().populateCredentials(map3, unmodifiableMap);
                    }
                    upsertWorkerTokensInCreds(map3, str5, str4);
                }
                if (ObjectReader.getBoolean(this.conf.get("supervisor.run.worker.as.user"), false) && (local == null || local.isEmpty())) {
                    throw new WrappedAuthorizationException("Could not determine the user to run this topology as.");
                }
                StormCommon.systemTopology(merge2, normalizeTopology);
                validateTopologySize(normalizeConf, this.conf, normalizeTopology);
                if (Utils.isZkAuthenticationConfiguredStormServer(this.conf) && !Utils.isZkAuthenticationConfiguredTopology(normalizeConf)) {
                    throw new IllegalArgumentException("The cluster is configured for zookeeper authentication, but no payload was provided.");
                }
                LOG.info("Received topology submission for {} (storm-{} JDK-{}) with conf {}", new Object[]{str, str7, normalizeTopology.get_jdk_version(), ConfigUtils.maskPasswords(normalizeConf)});
                synchronized (this.submitLock) {
                    assertTopoActive(str, false);
                    if (map3 != null) {
                        iStormClusterState.setCredentials(str4, new Credentials(map3), normalizeConf);
                    }
                    LOG.info("uploadedJar {} for {}", str2, str);
                    setupStormCode(this.conf, str4, str2, merge, normalizeTopology);
                    waitForDesiredCodeReplication(merge2, str4);
                    iStormClusterState.setupHeatbeats(str4, normalizeConf);
                    iStormClusterState.setupErrors(str4, normalizeConf);
                    if (ObjectReader.getBoolean(merge2.get("topology.backpressure.enable"), false)) {
                        iStormClusterState.setupBackpressure(str4, normalizeConf);
                    }
                    notifyTopologyActionListener(str, "submitTopology");
                    switch (AnonymousClass1.$SwitchMap$org$apache$storm$generated$TopologyInitialStatus[submitOptions.get_initial_status().ordinal()]) {
                        case 1:
                            topologyStatus = TopologyStatus.INACTIVE;
                            break;
                        case 2:
                            topologyStatus = TopologyStatus.ACTIVE;
                            break;
                        default:
                            throw new IllegalArgumentException("Inital Status of " + submitOptions.get_initial_status() + " is not allowed.");
                    }
                    startTopology(str, str4, topologyStatus, str6, str5, merge, normalizeTopology);
                }
            } catch (IllegalArgumentException e) {
                throw new WrappedInvalidTopologyException(e.getMessage());
            }
        } catch (Exception e2) {
            LOG.warn("Topology submission exception. (topology name='{}')", str, e2);
            if (!(e2 instanceof TException)) {
                throw new RuntimeException((Throwable) e2);
            }
            throw e2;
        }
    }

    @VisibleForTesting
    public static void setUpAckerExecutorConfigs(String str, Map<String, Object> map, Map<String, Object> map2, int i) {
        int intValue;
        int ceil;
        if (map2.get("topology.acker.executors") == null) {
            ceil = ObjectReader.getInt(map2.get("topology.ras.acker.executors.per.worker")).intValue();
            intValue = i * ceil;
        } else {
            intValue = ObjectReader.getInt(map2.get("topology.acker.executors")).intValue();
            ceil = i == 0 ? 0 : (int) Math.ceil(intValue / i);
        }
        map.put("topology.ras.acker.executors.per.worker", Integer.valueOf(ceil));
        map.put("topology.acker.executors", Integer.valueOf(intValue));
        LOG.info("Config {} set to: {} for topology: {}", new Object[]{"topology.ras.acker.executors.per.worker", Integer.valueOf(ceil), str});
        LOG.info("Config {} set to: {} for topology: {}", new Object[]{"topology.acker.executors", Integer.valueOf(intValue), str});
    }

    public void killTopology(String str) throws NotAliveException, AuthorizationException, TException {
        this.killTopologyCalls.mark();
        killTopologyWithOpts(str, new KillOptions());
    }

    public void killTopologyWithOpts(String str, KillOptions killOptions) throws NotAliveException, AuthorizationException, TException {
        this.killTopologyWithOptsCalls.mark();
        assertTopoActive(str, true);
        try {
            Map<String, Object> merge = Utils.merge(this.conf, tryReadTopoConfFromName(str));
            checkAuthorization(str, merge, "killTopology");
            Integer num = null;
            if (killOptions.is_set_wait_secs()) {
                num = Integer.valueOf(killOptions.get_wait_secs());
            }
            transitionName(str, TopologyActions.KILL, num, true);
            notifyTopologyActionListener(str, "killTopology");
            addTopoToHistoryLog((String) merge.get("storm.id"), merge);
        } catch (Exception e) {
            LOG.warn("Kill topology exception. (topology name='{}')", str, e);
            if (!(e instanceof TException)) {
                throw new RuntimeException((Throwable) e);
            }
            throw e;
        }
    }

    public void activate(String str) throws NotAliveException, AuthorizationException, TException {
        this.activateCalls.mark();
        try {
            checkAuthorization(str, Utils.merge(this.conf, tryReadTopoConfFromName(str)), "activate");
            transitionName(str, TopologyActions.ACTIVATE, null, true);
            notifyTopologyActionListener(str, "activate");
        } catch (Exception e) {
            LOG.warn("Activate topology exception. (topology name='{}')", str, e);
            if (!(e instanceof TException)) {
                throw new RuntimeException((Throwable) e);
            }
            throw e;
        }
    }

    public void deactivate(String str) throws NotAliveException, AuthorizationException, TException {
        this.deactivateCalls.mark();
        try {
            checkAuthorization(str, Utils.merge(this.conf, tryReadTopoConfFromName(str)), "deactivate");
            transitionName(str, TopologyActions.INACTIVATE, null, true);
            notifyTopologyActionListener(str, "deactivate");
        } catch (Exception e) {
            LOG.warn("Deactivate topology exception. (topology name='{}')", str, e);
            if (!(e instanceof TException)) {
                throw new RuntimeException((Throwable) e);
            }
            throw e;
        }
    }

    public void rebalance(String str, RebalanceOptions rebalanceOptions) throws NotAliveException, InvalidTopologyException, AuthorizationException, TException {
        this.rebalanceCalls.mark();
        assertTopoActive(str, true);
        try {
            checkAuthorization(str, Utils.merge(this.conf, tryReadTopoConfFromName(str)), "rebalance");
            rebalanceOptions.set_principal((String) null);
            StormTopology tryReadTopologyFromName = tryReadTopologyFromName(str);
            TreeSet treeSet = new TreeSet();
            treeSet.addAll(tryReadTopologyFromName.get_spouts().keySet());
            treeSet.addAll(tryReadTopologyFromName.get_bolts().keySet());
            for (Map.Entry entry : (rebalanceOptions.is_set_num_executors() ? rebalanceOptions.get_num_executors() : Collections.emptyMap()).entrySet()) {
                String str2 = (String) entry.getKey();
                if (!Utils.isSystemId(str2) && !treeSet.contains(str2)) {
                    throw new WrappedInvalidTopologyException(String.format("Invalid component %s for topology %s, valid values are %s", str2, str, String.join(",", treeSet)));
                }
                Integer num = (Integer) entry.getValue();
                if (num == null || num.intValue() <= 0) {
                    throw new WrappedInvalidTopologyException("Number of executors must be greater than 0");
                }
            }
            if (rebalanceOptions.is_set_topology_conf_overrides()) {
                Map parseJson = Utils.parseJson(rebalanceOptions.get_topology_conf_overrides());
                parseJson.remove("topology.submitter.principal");
                parseJson.remove("topology.submitter.user");
                parseJson.remove("storm.zookeeper.superACL");
                parseJson.remove("storm.zookeeper.topology.auth.scheme");
                parseJson.remove("storm.zookeeper.topology.auth.payload");
                if (((Boolean) this.conf.getOrDefault(DaemonConfig.STORM_TOPOLOGY_CLASSPATH_BEGINNING_ENABLED, false)).booleanValue()) {
                    parseJson.remove("topology.classpath.beginning");
                }
                parseJson.remove("storm.local.hostname");
                rebalanceOptions.set_topology_conf_overrides(JSONValue.toJSONString(parseJson));
            }
            Subject subject = getSubject();
            if (subject != null) {
                rebalanceOptions.set_principal(subject.getPrincipals().iterator().next().getName());
            }
            transitionName(str, TopologyActions.REBALANCE, rebalanceOptions, true);
            notifyTopologyActionListener(str, "rebalance");
        } catch (Exception e) {
            LOG.warn("rebalance topology exception. (topology name='{}')", str, e);
            if (!(e instanceof TException)) {
                throw new RuntimeException((Throwable) e);
            }
            throw e;
        }
    }

    public void setLogConfig(String str, LogConfig logConfig) throws TException {
        try {
            this.setLogConfigCalls.mark();
            Map<String, Object> merge = Utils.merge(this.conf, tryReadTopoConf(str, this.topoCache));
            String str2 = (String) merge.get("topology.name");
            checkAuthorization(str2, merge, "setLogConfig");
            IStormClusterState iStormClusterState = this.stormClusterState;
            LogConfig logConfig2 = iStormClusterState.topologyLogConfig(str, (Runnable) null);
            if (logConfig2 == null) {
                logConfig2 = new LogConfig();
            }
            if (logConfig2.is_set_named_logger_level()) {
                Iterator it = logConfig2.get_named_logger_level().values().iterator();
                while (it.hasNext()) {
                    ((LogLevel) it.next()).set_action(LogLevelAction.UNCHANGED);
                }
            }
            if (logConfig.is_set_named_logger_level()) {
                for (Map.Entry entry : logConfig.get_named_logger_level().entrySet()) {
                    LogLevel logLevel = (LogLevel) entry.getValue();
                    String str3 = (String) entry.getKey();
                    LogLevelAction logLevelAction = logLevel.get_action();
                    if (str3.isEmpty()) {
                        throw new RuntimeException("Named loggers need a valid name. Use ROOT for the root logger");
                    }
                    switch (AnonymousClass1.$SwitchMap$org$apache$storm$generated$LogLevelAction[logLevelAction.ordinal()]) {
                        case 1:
                            setLoggerTimeouts(logLevel);
                            logConfig2.put_to_named_logger_level(str3, logLevel);
                            break;
                        case 2:
                            Map map = logConfig2.get_named_logger_level();
                            if (map != null) {
                                map.remove(str3);
                                break;
                            } else {
                                break;
                            }
                    }
                }
            }
            LOG.info("Setting log config for {}:{}", str2, logConfig2);
            iStormClusterState.setTopologyLogConfig(str, logConfig2, merge);
        } catch (Exception e) {
            LOG.warn("set log config topology exception. (topology id='{}')", str, e);
            if (!(e instanceof TException)) {
                throw new RuntimeException((Throwable) e);
            }
            throw e;
        }
    }

    public LogConfig getLogConfig(String str) throws TException {
        try {
            this.getLogConfigCalls.mark();
            Map<String, Object> merge = Utils.merge(this.conf, tryReadTopoConf(str, this.topoCache));
            checkAuthorization((String) merge.get("topology.name"), merge, "getLogConfig");
            LogConfig logConfig = this.stormClusterState.topologyLogConfig(str, (Runnable) null);
            if (logConfig == null) {
                logConfig = new LogConfig();
            }
            return logConfig;
        } catch (Exception e) {
            LOG.warn("get log conf topology exception. (topology id='{}')", str, e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public void debug(String str, String str2, boolean z, double d) throws NotAliveException, AuthorizationException, TException {
        this.debugCalls.mark();
        try {
            IStormClusterState iStormClusterState = this.stormClusterState;
            String topoId = toTopoId(str);
            Map<String, Object> merge = Utils.merge(this.conf, tryReadTopoConf(topoId, this.topoCache));
            double max = Math.max(Math.min(d, 100.0d), 0.0d);
            checkAuthorization(str, merge, "debug");
            if (topoId == null) {
                throw new WrappedNotAliveException(str);
            }
            DebugOptions debugOptions = new DebugOptions();
            debugOptions.set_enable(z);
            if (z) {
                debugOptions.set_samplingpct(max);
            }
            StormBase stormBase = new StormBase();
            stormBase.set_component_executors(Collections.emptyMap());
            boolean z2 = (str2 == null || str2.isEmpty()) ? false : true;
            stormBase.put_to_component_debug(z2 ? str2 : topoId, debugOptions);
            LOG.info("Nimbus setting debug to {} for storm-name '{}' storm-id '{}' sanpling pct '{}'" + (z2 ? " component-id '" + str2 + "'" : ""), new Object[]{Boolean.valueOf(z), str, topoId, Double.valueOf(max)});
            synchronized (this.submitLock) {
                iStormClusterState.updateStorm(topoId, stormBase);
            }
        } catch (Exception e) {
            LOG.warn("debug topology exception. (topology name='{}')", str, e);
            if (!(e instanceof TException)) {
                throw new RuntimeException((Throwable) e);
            }
            throw e;
        }
    }

    public void setWorkerProfiler(String str, ProfileRequest profileRequest) throws TException {
        try {
            this.setWorkerProfilerCalls.mark();
            Map<String, Object> merge = Utils.merge(this.conf, tryReadTopoConf(str, this.topoCache));
            checkAuthorization((String) merge.get("topology.name"), merge, "setWorkerProfiler");
            this.stormClusterState.setWorkerProfileRequest(str, profileRequest);
        } catch (Exception e) {
            LOG.warn("set worker profiler topology exception. (topology id='{}')", str, e);
            if (!(e instanceof TException)) {
                throw new RuntimeException((Throwable) e);
            }
            throw e;
        }
    }

    public List<ProfileRequest> getComponentPendingProfileActions(String str, String str2, ProfileAction profileAction) throws TException {
        try {
            this.getComponentPendingProfileActionsCalls.mark();
            CommonTopoInfo commonTopoInfo = getCommonTopoInfo(str, "getComponentPendingProfileActions");
            HashMap hashMap = new HashMap();
            if (commonTopoInfo.assignment != null) {
                Map map = commonTopoInfo.assignment.get_node_host();
                for (Map.Entry entry : commonTopoInfo.assignment.get_executor_node_port().entrySet()) {
                    NodeInfo nodeInfo = (NodeInfo) entry.getValue();
                    hashMap.put(entry.getKey(), Arrays.asList(map.get(nodeInfo.get_node()), Integer.valueOf(((Long) nodeInfo.get_port_iterator().next()).intValue())));
                }
            }
            List<Map<String, Object>> extractNodeInfosFromHbForComp = StatsUtil.extractNodeInfosFromHbForComp(hashMap, commonTopoInfo.taskToComponent, false, str2);
            ArrayList arrayList = new ArrayList();
            for (Map<String, Object> map2 : extractNodeInfosFromHbForComp) {
                String str3 = (String) map2.get("host");
                int intValue = ((Integer) map2.get("port")).intValue();
                ProfileRequest profileRequest = null;
                long j = -1;
                for (ProfileRequest profileRequest2 : this.stormClusterState.getTopologyProfileRequests(str)) {
                    String str4 = profileRequest2.get_nodeInfo().get_node();
                    int intValue2 = ((Long) profileRequest2.get_nodeInfo().get_port_iterator().next()).intValue();
                    ProfileAction profileAction2 = profileRequest2.get_action();
                    if (str3.equals(str4) && intValue == intValue2 && profileAction == profileAction2) {
                        long j2 = profileRequest2.get_time_stamp();
                        if (j2 > j) {
                            j = j2;
                            profileRequest = profileRequest2;
                        }
                    }
                }
                if (profileRequest != null) {
                    arrayList.add(profileRequest);
                }
            }
            LOG.info("Latest profile actions for topology {} component {} {}", new Object[]{str, str2, arrayList});
            return arrayList;
        } catch (Exception e) {
            LOG.warn("Get comp actions topology exception. (topology id='{}')", str, e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public void uploadNewCredentials(String str, Credentials credentials) throws NotAliveException, InvalidTopologyException, AuthorizationException, TException {
        try {
            this.uploadNewCredentialsCalls.mark();
            IStormClusterState iStormClusterState = this.stormClusterState;
            String topoId = toTopoId(str);
            if (topoId == null) {
                throw new WrappedNotAliveException(str + " is not alive");
            }
            Map<String, Object> merge = Utils.merge(this.conf, tryReadTopoConf(topoId, this.topoCache));
            if (credentials == null) {
                credentials = new Credentials(Collections.emptyMap());
            }
            checkAuthorization(str, merge, "uploadNewCredentials");
            String str2 = (String) merge.get("topology.submitter.principal");
            String str3 = (String) merge.get("topology.submitter.user");
            String str4 = null;
            if (credentials.is_set_topoOwner()) {
                str4 = credentials.get_topoOwner();
            } else {
                Principal principal = ReqContext.context().principal();
                if (principal != null) {
                    str4 = principal.getName();
                }
            }
            if (str4 == null) {
                LOG.warn("Please check you settings. Credentials are being uploaded to {} with security disabled.", topoId);
            } else if (!str2.equals(str4) && !str3.equals(str4)) {
                throw new AuthorizationException(topoId + " is expected to be owned by " + str4 + " but is actually owned by " + str2);
            }
            synchronized (this.credUpdateLock) {
                Credentials credentials2 = iStormClusterState.credentials(topoId, (Runnable) null);
                if (credentials2 != null) {
                    Map map = credentials2.get_creds();
                    map.putAll(credentials.get_creds());
                    credentials.set_creds(map);
                }
                iStormClusterState.setCredentials(topoId, credentials, merge);
            }
        } catch (Exception e) {
            LOG.warn("Upload Creds topology exception. (topology name='{}')", str, e);
            if (!(e instanceof TException)) {
                throw new RuntimeException((Throwable) e);
            }
            throw e;
        }
    }

    public String beginCreateBlob(String str, SettableBlobMeta settableBlobMeta) throws AuthorizationException, KeyAlreadyExistsException, TException {
        try {
            String uuid = Utils.uuid();
            this.blobUploaders.put(uuid, this.blobStore.createBlob(str, settableBlobMeta, getSubject()));
            LOG.info("Created blob {} for session {}", str, uuid);
            return uuid;
        } catch (Exception e) {
            LOG.warn("begin create blob exception.", e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public String beginUpdateBlob(String str) throws AuthorizationException, KeyNotFoundException, TException {
        try {
            String uuid = Utils.uuid();
            this.blobUploaders.put(uuid, this.blobStore.updateBlob(str, getSubject()));
            LOG.info("Created upload session for {}", str);
            return uuid;
        } catch (Exception e) {
            LOG.warn("begin update blob exception.", e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public void uploadBlobChunk(String str, ByteBuffer byteBuffer) throws AuthorizationException, TException {
        try {
            OutputStream outputStream = (OutputStream) this.blobUploaders.get(str);
            if (outputStream == null) {
                throw new RuntimeException("Blob for session " + str + " does not exist (or timed out)");
            }
            outputStream.write(byteBuffer.array(), byteBuffer.arrayOffset() + byteBuffer.position(), byteBuffer.remaining());
            this.blobUploaders.put(str, outputStream);
        } catch (Exception e) {
            LOG.warn("upload blob chunk exception.", e);
            if (!(e instanceof TException)) {
                throw new RuntimeException((Throwable) e);
            }
            throw e;
        }
    }

    public void finishBlobUpload(String str) throws AuthorizationException, TException {
        try {
            OutputStream outputStream = (OutputStream) this.blobUploaders.get(str);
            if (outputStream == null) {
                throw new RuntimeException("Blob for session " + str + " does not exist (or timed out)");
            }
            outputStream.close();
            LOG.info("Finished uploading blob for session {}. Closing session.", str);
            this.blobUploaders.remove(str);
            this.blobStore.updateLastBlobUpdateTime();
        } catch (Exception e) {
            LOG.warn("finish blob upload exception.", e);
            if (!(e instanceof TException)) {
                throw new RuntimeException((Throwable) e);
            }
            throw e;
        }
    }

    public void cancelBlobUpload(String str) throws AuthorizationException, TException {
        try {
            AtomicOutputStream atomicOutputStream = (AtomicOutputStream) this.blobUploaders.get(str);
            if (atomicOutputStream == null) {
                throw new RuntimeException("Blob for session " + str + " does not exist (or timed out)");
            }
            atomicOutputStream.cancel();
            LOG.info("Canceled uploading blob for session {}. Closing session.", str);
            this.blobUploaders.remove(str);
        } catch (Exception e) {
            LOG.warn("finish blob upload exception.", e);
            if (!(e instanceof TException)) {
                throw new RuntimeException((Throwable) e);
            }
            throw e;
        }
    }

    public ReadableBlobMeta getBlobMeta(String str) throws AuthorizationException, KeyNotFoundException, TException {
        try {
            return this.blobStore.getBlobMeta(str, getSubject());
        } catch (Exception e) {
            LOG.warn("get blob meta exception.", e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public void setBlobMeta(String str, SettableBlobMeta settableBlobMeta) throws AuthorizationException, KeyNotFoundException, TException {
        try {
            this.blobStore.setBlobMeta(str, settableBlobMeta, getSubject());
            this.blobStore.updateLastBlobUpdateTime();
        } catch (Exception e) {
            LOG.warn("set blob meta exception.", e);
            if (!(e instanceof TException)) {
                throw new RuntimeException((Throwable) e);
            }
            throw e;
        }
    }

    public BeginDownloadResult beginBlobDownload(String str) throws AuthorizationException, KeyNotFoundException, TException {
        try {
            InputStreamWithMeta blob = this.blobStore.getBlob(str, getSubject());
            String uuid = Utils.uuid();
            BeginDownloadResult beginDownloadResult = new BeginDownloadResult(blob.getVersion(), uuid);
            beginDownloadResult.set_data_size(blob.getFileLength());
            this.blobDownloaders.put(uuid, new BufferInputStream(blob, ((Integer) this.conf.getOrDefault("storm.blobstore.inputstream.buffer.size.bytes", 65536)).intValue()));
            LOG.info("Created download session {} for {}", uuid, str);
            return beginDownloadResult;
        } catch (Exception e) {
            LOG.warn("begin blob download exception.", e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public ByteBuffer downloadBlobChunk(String str) throws AuthorizationException, TException {
        try {
            BufferInputStream bufferInputStream = (BufferInputStream) this.blobDownloaders.get(str);
            if (bufferInputStream == null) {
                throw new RuntimeException("Blob for session " + str + " does not exist (or timed out)");
            }
            byte[] read = bufferInputStream.read();
            if (read.length == 0) {
                bufferInputStream.close();
                this.blobDownloaders.remove(str);
            } else {
                this.blobDownloaders.put(str, bufferInputStream);
            }
            LOG.debug("Sending {} bytes", Integer.valueOf(read.length));
            return ByteBuffer.wrap(read);
        } catch (Exception e) {
            LOG.warn("download blob chunk exception.", e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public void deleteBlob(String str) throws AuthorizationException, KeyNotFoundException, IllegalStateException, TException {
        try {
            String idFromBlobKey = ConfigUtils.getIdFromBlobKey(str);
            if (idFromBlobKey != null && isTopologyActiveOrActivating(this.stormClusterState, idFromBlobKey)) {
                String str2 = "Attempting to delete blob " + str + " from under active topology " + idFromBlobKey;
                LOG.warn(str2);
                throw new WrappedIllegalStateException(str2);
            }
            String str3 = topologyUsingThisBlob(this.stormClusterState, this.topoCache, str);
            if (str3 != null) {
                String str4 = "Attempting to delete active blob " + str + " used by topology " + str3;
                LOG.warn(str4);
                throw new WrappedIllegalStateException(str4);
            }
            this.blobStore.deleteBlob(str, getSubject());
            LOG.info("Deleted blob for key {} with {}", str, ReqContext.context());
        } catch (Exception e) {
            LOG.warn("delete blob exception.", e);
            if (!(e instanceof TException)) {
                throw new RuntimeException((Throwable) e);
            }
            throw e;
        }
    }

    /* JADX WARN: Removed duplicated region for block: B:6:0x002a A[Catch: Exception -> 0x00c7, TryCatch #0 {Exception -> 0x00c7, blocks: (B:24:0x0004, B:26:0x001a, B:6:0x002a, B:7:0x004b, B:9:0x004c, B:11:0x0055, B:13:0x0076, B:16:0x0088, B:18:0x0091, B:20:0x00a2, B:3:0x000b), top: B:23:0x0004 }] */
    /* JADX WARN: Removed duplicated region for block: B:9:0x004c A[Catch: Exception -> 0x00c7, TryCatch #0 {Exception -> 0x00c7, blocks: (B:24:0x0004, B:26:0x001a, B:6:0x002a, B:7:0x004b, B:9:0x004c, B:11:0x0055, B:13:0x0076, B:16:0x0088, B:18:0x0091, B:20:0x00a2, B:3:0x000b), top: B:23:0x0004 }] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public org.apache.storm.generated.ListBlobsResult listBlobs(java.lang.String r6) throws org.apache.storm.thrift.TException {
        /*
            Method dump skipped, instructions count: 233
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.storm.daemon.nimbus.Nimbus.listBlobs(java.lang.String):org.apache.storm.generated.ListBlobsResult");
    }

    public int getBlobReplication(String str) throws AuthorizationException, KeyNotFoundException, TException {
        try {
            return this.blobStore.getBlobReplication(str, getSubject());
        } catch (Exception e) {
            LOG.warn("get blob replication exception.", e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public int updateBlobReplication(String str, int i) throws AuthorizationException, KeyNotFoundException, TException {
        try {
            int updateBlobReplication = this.blobStore.updateBlobReplication(str, i, getSubject());
            this.blobStore.updateLastBlobUpdateTime();
            return updateBlobReplication;
        } catch (Exception e) {
            LOG.warn("update blob replication exception.", e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public void createStateInZookeeper(String str) throws TException {
        try {
            IStormClusterState iStormClusterState = this.stormClusterState;
            BlobStore blobStore = this.blobStore;
            NimbusInfo nimbusInfo = this.nimbusHostPortInfo;
            if (blobStore instanceof LocalFsBlobStore) {
                iStormClusterState.setupBlob(str, nimbusInfo, Integer.valueOf(getVersionForKey(str, nimbusInfo, this.zkClient)));
            }
            LOG.debug("Created state in zookeeper {} {} {}", new Object[]{iStormClusterState, blobStore, nimbusInfo});
        } catch (Exception e) {
            LOG.warn("Exception while creating state in zookeeper - key: " + str, e);
            if (!(e instanceof TException)) {
                throw new RuntimeException((Throwable) e);
            }
            throw e;
        }
    }

    public String beginFileUpload() throws AuthorizationException, TException {
        try {
            this.beginFileUploadCalls.mark();
            assertIsLeader();
            checkAuthorization(null, null, "fileUpload");
            String str = getInbox() + "/stormjar-" + Utils.uuid() + ".jar";
            this.uploaders.put(str, new TimedWritableByteChannel(Channels.newChannel(new FileOutputStream(str)), this.fileUploadDuration));
            LOG.info("Uploading file from client to {}", str);
            return str;
        } catch (Exception e) {
            LOG.warn("Begin file upload exception", e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public void uploadChunk(String str, ByteBuffer byteBuffer) throws AuthorizationException, TException {
        try {
            this.uploadChunkCalls.mark();
            checkAuthorization(null, null, "fileUpload");
            WritableByteChannel writableByteChannel = (WritableByteChannel) this.uploaders.get(str);
            if (writableByteChannel == null) {
                throw new RuntimeException("File for that location does not exist (or timed out)");
            }
            writableByteChannel.write(byteBuffer);
            this.uploaders.put(str, writableByteChannel);
        } catch (Exception e) {
            LOG.warn("uploadChunk exception.", e);
            if (!(e instanceof TException)) {
                throw new RuntimeException((Throwable) e);
            }
            throw e;
        }
    }

    public void finishFileUpload(String str) throws AuthorizationException, TException {
        try {
            this.finishFileUploadCalls.mark();
            checkAuthorization(null, null, "fileUpload");
            WritableByteChannel writableByteChannel = (WritableByteChannel) this.uploaders.get(str);
            if (writableByteChannel == null) {
                throw new RuntimeException("File for that location does not exist (or timed out)");
            }
            writableByteChannel.close();
            LOG.info("Finished uploading file from client: {}", str);
            this.uploaders.remove(str);
            this.blobStore.updateLastBlobUpdateTime();
        } catch (Exception e) {
            LOG.warn("finish file upload exception.", e);
            if (!(e instanceof TException)) {
                throw new RuntimeException((Throwable) e);
            }
            throw e;
        }
    }

    public ByteBuffer downloadChunk(String str) throws AuthorizationException, TException {
        try {
            this.downloadChunkCalls.mark();
            checkAuthorization(null, null, "fileDownload");
            BufferInputStream bufferInputStream = (BufferInputStream) this.downloaders.get(str);
            if (bufferInputStream == null) {
                throw new RuntimeException("Could not find input stream for id " + str);
            }
            byte[] read = bufferInputStream.read();
            if (read.length == 0) {
                bufferInputStream.close();
                this.downloaders.remove(str);
            }
            return ByteBuffer.wrap(read);
        } catch (Exception e) {
            LOG.warn("download chunk exception.", e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public String getNimbusConf() throws AuthorizationException, TException {
        try {
            this.getNimbusConfCalls.mark();
            checkAuthorization(null, null, "getNimbusConf");
            return JSONValue.toJSONString(this.conf);
        } catch (Exception e) {
            LOG.warn("get nimbus conf exception.", e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public TopologyInfo getTopologyInfo(String str) throws NotAliveException, AuthorizationException, TException {
        try {
            this.getTopologyInfoCalls.mark();
            GetInfoOptions getInfoOptions = new GetInfoOptions();
            getInfoOptions.set_num_err_choice(NumErrorsChoice.ALL);
            return getTopologyInfoWithOptsImpl(str, getInfoOptions);
        } catch (Exception e) {
            LOG.warn("get topology ino exception. (topology id={})", str, e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public TopologyInfo getTopologyInfoByName(String str) throws TException {
        try {
            this.getTopologyInfoByNameCalls.mark();
            GetInfoOptions getInfoOptions = new GetInfoOptions();
            getInfoOptions.set_num_err_choice(NumErrorsChoice.ALL);
            return getTopologyInfoByNameImpl(str, getInfoOptions);
        } catch (Exception e) {
            LOG.warn("get topology info exception. (topology name={})", str, e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    private TopologyInfo getTopologyInfoByNameImpl(String str, GetInfoOptions getInfoOptions) throws NotAliveException, AuthorizationException, Exception {
        return getTopologyInfoWithOptsImpl((String) this.stormClusterState.getTopoId(str).orElseThrow(() -> {
            return new WrappedNotAliveException(str + " is not alive");
        }), getInfoOptions);
    }

    public TopologyInfo getTopologyInfoByNameWithOpts(String str, GetInfoOptions getInfoOptions) throws NotAliveException, AuthorizationException, TException {
        try {
            this.getTopologyInfoByNameWithOptsCalls.mark();
            return getTopologyInfoByNameImpl(str, getInfoOptions);
        } catch (Exception e) {
            LOG.warn("get topology info withOpts by name exception. (topology name={})", str, e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public TopologyInfo getTopologyInfoWithOpts(String str, GetInfoOptions getInfoOptions) throws NotAliveException, AuthorizationException, TException {
        this.getTopologyInfoWithOptsCalls.mark();
        try {
            return getTopologyInfoWithOptsImpl(str, getInfoOptions);
        } catch (Exception e) {
            LOG.warn("Get topo info exception. (topology id='{}')", str, e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    private TopologyInfo getTopologyInfoWithOptsImpl(String str, GetInfoOptions getInfoOptions) throws NotAliveException, AuthorizationException, InvalidTopologyException, Exception {
        CommonTopoInfo commonTopoInfo = getCommonTopoInfo(str, "getTopologyInfo");
        if (commonTopoInfo.base == null) {
            throw new WrappedNotAliveException(str);
        }
        IStormClusterState iStormClusterState = this.stormClusterState;
        NumErrorsChoice numErrorsChoice = (NumErrorsChoice) Utils.OR(getInfoOptions.get_num_err_choice(), NumErrorsChoice.ALL);
        HashMap hashMap = new HashMap();
        Iterator<String> it = commonTopoInfo.allComponents.iterator();
        while (it.hasNext()) {
            String next = it.next();
            switch (AnonymousClass1.$SwitchMap$org$apache$storm$generated$NumErrorsChoice[numErrorsChoice.ordinal()]) {
                case 1:
                    hashMap.put(next, Collections.emptyList());
                    break;
                case 2:
                    ArrayList arrayList = new ArrayList();
                    ErrorInfo lastError = iStormClusterState.lastError(str, next);
                    if (lastError != null) {
                        arrayList.add(lastError);
                    }
                    hashMap.put(next, arrayList);
                    break;
                case 3:
                    hashMap.put(next, iStormClusterState.errors(str, next));
                    break;
                default:
                    LOG.warn("Got invalid NumErrorsChoice '{}'", numErrorsChoice);
                    hashMap.put(next, iStormClusterState.errors(str, next));
                    break;
            }
        }
        ArrayList arrayList2 = new ArrayList();
        if (commonTopoInfo.assignment != null) {
            for (Map.Entry entry : commonTopoInfo.assignment.get_executor_node_port().entrySet()) {
                NodeInfo nodeInfo = (NodeInfo) entry.getValue();
                ExecutorInfo execInfo = toExecInfo((List) entry.getKey());
                Map map = commonTopoInfo.assignment.get_node_host();
                Map<String, Object> map2 = commonTopoInfo.beats.get(ClientStatsUtil.convertExecutor((List) entry.getKey()));
                if (map2 == null) {
                    map2 = Collections.emptyMap();
                }
                ExecutorSummary executorSummary = new ExecutorSummary(execInfo, commonTopoInfo.taskToComponent.get(Integer.valueOf(execInfo.get_task_start())), (String) map.get(nodeInfo.get_node()), ((Long) nodeInfo.get_port_iterator().next()).intValue(), ((Integer) map2.getOrDefault("uptime", 0)).intValue());
                Map map3 = (Map) map2.get("stats");
                if (map3 != null) {
                    executorSummary.set_stats(StatsUtil.thriftifyExecutorStats(map3));
                }
                arrayList2.add(executorSummary);
            }
        }
        TopologyInfo topologyInfo = new TopologyInfo(str, commonTopoInfo.topoName, Time.deltaSecs(commonTopoInfo.launchTimeSecs), arrayList2, extractStatusStr(commonTopoInfo.base), hashMap);
        if (commonTopoInfo.topology.is_set_storm_version()) {
            topologyInfo.set_storm_version(commonTopoInfo.topology.get_storm_version());
        }
        if (commonTopoInfo.base.is_set_owner()) {
            topologyInfo.set_owner(commonTopoInfo.base.get_owner());
        }
        String str2 = this.idToSchedStatus.get().get(str);
        if (str2 != null) {
            topologyInfo.set_sched_status(str2);
        }
        TopologyResources resourcesForTopology = getResourcesForTopology(str, commonTopoInfo.base);
        if (resourcesForTopology != null && (this.underlyingScheduler instanceof ResourceAwareScheduler)) {
            topologyInfo.set_requested_memonheap(resourcesForTopology.getRequestedMemOnHeap());
            topologyInfo.set_requested_memoffheap(resourcesForTopology.getRequestedMemOffHeap());
            topologyInfo.set_requested_cpu(resourcesForTopology.getRequestedCpu());
            topologyInfo.set_assigned_memonheap(resourcesForTopology.getAssignedMemOnHeap());
            topologyInfo.set_assigned_memoffheap(resourcesForTopology.getAssignedMemOffHeap());
            topologyInfo.set_assigned_cpu(resourcesForTopology.getAssignedCpu());
        }
        if (commonTopoInfo.base.is_set_component_debug()) {
            topologyInfo.set_component_debug(commonTopoInfo.base.get_component_debug());
        }
        topologyInfo.set_replication_count(getBlobReplicationCount(ConfigUtils.masterStormCodeKey(str)).intValue());
        return topologyInfo;
    }

    public TopologyPageInfo getTopologyPageInfo(String str, String str2, boolean z) throws NotAliveException, AuthorizationException, TException {
        DebugOptions debugOptions;
        try {
            this.getTopologyPageInfoCalls.mark();
            CommonTopoInfo commonTopoInfo = getCommonTopoInfo(str, "getTopologyPageInfo");
            String str3 = commonTopoInfo.topoName;
            IStormClusterState iStormClusterState = this.stormClusterState;
            Assignment assignment = commonTopoInfo.assignment;
            Map<List<Integer>, Map<String, Object>> map = commonTopoInfo.beats;
            Map<Integer, String> map2 = commonTopoInfo.taskToComponent;
            StormTopology stormTopology = commonTopoInfo.topology;
            StormBase stormBase = commonTopoInfo.base;
            if (stormBase == null) {
                throw new WrappedNotAliveException(str);
            }
            String str4 = stormBase.get_owner();
            Map<WorkerSlot, WorkerResources> workerResourcesForTopology = getWorkerResourcesForTopology(str);
            List<WorkerSummary> list = null;
            HashMap hashMap = new HashMap();
            if (assignment != null) {
                Map map3 = assignment.get_executor_node_port();
                Map map4 = assignment.get_node_host();
                for (Map.Entry entry : map3.entrySet()) {
                    NodeInfo nodeInfo = (NodeInfo) entry.getValue();
                    hashMap.put(entry.getKey(), Arrays.asList(nodeInfo.get_node(), nodeInfo.get_port_iterator().next()));
                }
                list = StatsUtil.aggWorkerStats(str, str3, map2, map, hashMap, map4, workerResourcesForTopology, z, true, null, str4);
            }
            TopologyPageInfo aggTopoExecsStats = StatsUtil.aggTopoExecsStats(str, hashMap, map2, map, stormTopology, str2, z, iStormClusterState);
            if (stormTopology.is_set_storm_version()) {
                aggTopoExecsStats.set_storm_version(stormTopology.get_storm_version());
            }
            Map<String, Object> merge = Utils.merge(this.conf, commonTopoInfo.topoConf);
            addSpoutAggStats(aggTopoExecsStats, stormTopology, merge);
            addBoltAggStats(aggTopoExecsStats, stormTopology, merge, z);
            if (list != null) {
                aggTopoExecsStats.set_workers(list);
            }
            if (stormBase.is_set_owner()) {
                aggTopoExecsStats.set_owner(stormBase.get_owner());
            }
            if (stormBase.is_set_topology_version()) {
                aggTopoExecsStats.set_topology_version(stormBase.get_topology_version());
            }
            String str5 = this.idToSchedStatus.get().get(str);
            if (str5 != null) {
                aggTopoExecsStats.set_sched_status(str5);
            }
            TopologyResources resourcesForTopology = getResourcesForTopology(str, stormBase);
            if (resourcesForTopology != null && (this.underlyingScheduler instanceof ResourceAwareScheduler)) {
                aggTopoExecsStats.set_requested_memonheap(resourcesForTopology.getRequestedMemOnHeap());
                aggTopoExecsStats.set_requested_memoffheap(resourcesForTopology.getRequestedMemOffHeap());
                aggTopoExecsStats.set_requested_cpu(resourcesForTopology.getRequestedCpu());
                aggTopoExecsStats.set_assigned_memonheap(resourcesForTopology.getAssignedMemOnHeap());
                aggTopoExecsStats.set_assigned_memoffheap(resourcesForTopology.getAssignedMemOffHeap());
                aggTopoExecsStats.set_assigned_cpu(resourcesForTopology.getAssignedCpu());
                aggTopoExecsStats.set_requested_shared_off_heap_memory(resourcesForTopology.getRequestedSharedMemOffHeap());
                aggTopoExecsStats.set_requested_regular_off_heap_memory(resourcesForTopology.getRequestedNonSharedMemOffHeap());
                aggTopoExecsStats.set_requested_shared_on_heap_memory(resourcesForTopology.getRequestedSharedMemOnHeap());
                aggTopoExecsStats.set_requested_regular_on_heap_memory(resourcesForTopology.getRequestedNonSharedMemOnHeap());
                aggTopoExecsStats.set_assigned_shared_off_heap_memory(resourcesForTopology.getAssignedSharedMemOffHeap());
                aggTopoExecsStats.set_assigned_regular_off_heap_memory(resourcesForTopology.getAssignedNonSharedMemOffHeap());
                aggTopoExecsStats.set_assigned_shared_on_heap_memory(resourcesForTopology.getAssignedSharedMemOnHeap());
                aggTopoExecsStats.set_assigned_regular_on_heap_memory(resourcesForTopology.getAssignedNonSharedMemOnHeap());
                aggTopoExecsStats.set_assigned_generic_resources(resourcesForTopology.getAssignedGenericResources());
                aggTopoExecsStats.set_requested_generic_resources(resourcesForTopology.getRequestedGenericResources());
            }
            int i = commonTopoInfo.launchTimeSecs;
            aggTopoExecsStats.set_name(str3);
            aggTopoExecsStats.set_status(extractStatusStr(stormBase));
            aggTopoExecsStats.set_uptime_secs(Time.deltaSecs(i));
            aggTopoExecsStats.set_topology_conf(JSONValue.toJSONString(merge));
            aggTopoExecsStats.set_replication_count(getBlobReplicationCount(ConfigUtils.masterStormCodeKey(str)).intValue());
            if (stormBase.is_set_component_debug() && (debugOptions = (DebugOptions) stormBase.get_component_debug().get(str)) != null) {
                aggTopoExecsStats.set_debug_options(debugOptions);
            }
            return aggTopoExecsStats;
        } catch (Exception e) {
            LOG.warn("Get topo page info exception. (topology id='{}')", str, e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    private void addSpoutAggStats(TopologyPageInfo topologyPageInfo, StormTopology stormTopology, Map<String, Object> map) {
        Map<String, NormalizedResourceRequest> spoutsResources = ResourceUtils.getSpoutsResources(stormTopology, map);
        if (!topologyPageInfo.get_id_to_spout_agg_stats().isEmpty()) {
            for (Map.Entry entry : topologyPageInfo.get_id_to_spout_agg_stats().entrySet()) {
                CommonAggregateStats commonAggregateStats = ((ComponentAggregateStats) entry.getValue()).get_common_stats();
                setResourcesDefaultIfNotSet(spoutsResources, (String) entry.getKey(), map);
                commonAggregateStats.set_resources_map(spoutsResources.get(entry.getKey()).toNormalizedMap());
            }
            return;
        }
        for (Map.Entry entry2 : stormTopology.get_spouts().entrySet()) {
            String str = (String) entry2.getKey();
            SpoutSpec spoutSpec = (SpoutSpec) entry2.getValue();
            ComponentAggregateStats componentAggregateStats = new ComponentAggregateStats();
            componentAggregateStats.set_type(ComponentType.SPOUT);
            CommonAggregateStats placeholderCommonAggregateStats = getPlaceholderCommonAggregateStats(spoutSpec);
            placeholderCommonAggregateStats.set_resources_map(spoutsResources.getOrDefault(str, new NormalizedResourceRequest()).toNormalizedMap());
            componentAggregateStats.set_common_stats(placeholderCommonAggregateStats);
            SpoutAggregateStats spoutAggregateStats = new SpoutAggregateStats();
            spoutAggregateStats.set_complete_latency_ms(0.0d);
            SpecificAggregateStats specificAggregateStats = new SpecificAggregateStats();
            specificAggregateStats.set_spout(spoutAggregateStats);
            componentAggregateStats.set_specific_stats(specificAggregateStats);
            topologyPageInfo.get_id_to_spout_agg_stats().put(str, componentAggregateStats);
        }
    }

    private void addBoltAggStats(TopologyPageInfo topologyPageInfo, StormTopology stormTopology, Map<String, Object> map, boolean z) {
        Map<String, NormalizedResourceRequest> boltsResources = ResourceUtils.getBoltsResources(stormTopology, map);
        if (!topologyPageInfo.get_id_to_bolt_agg_stats().isEmpty()) {
            for (Map.Entry entry : topologyPageInfo.get_id_to_bolt_agg_stats().entrySet()) {
                CommonAggregateStats commonAggregateStats = ((ComponentAggregateStats) entry.getValue()).get_common_stats();
                setResourcesDefaultIfNotSet(boltsResources, (String) entry.getKey(), map);
                commonAggregateStats.set_resources_map(boltsResources.get(entry.getKey()).toNormalizedMap());
            }
            return;
        }
        for (Map.Entry entry2 : stormTopology.get_bolts().entrySet()) {
            String str = (String) entry2.getKey();
            Bolt bolt = (Bolt) entry2.getValue();
            if (z || !Utils.isSystemId(str)) {
                if (!str.equals("__system")) {
                    ComponentAggregateStats componentAggregateStats = new ComponentAggregateStats();
                    componentAggregateStats.set_type(ComponentType.BOLT);
                    CommonAggregateStats placeholderCommonAggregateStats = getPlaceholderCommonAggregateStats(bolt);
                    placeholderCommonAggregateStats.set_resources_map(boltsResources.getOrDefault(str, new NormalizedResourceRequest()).toNormalizedMap());
                    componentAggregateStats.set_common_stats(placeholderCommonAggregateStats);
                    BoltAggregateStats boltAggregateStats = new BoltAggregateStats();
                    boltAggregateStats.set_execute_latency_ms(0.0d);
                    boltAggregateStats.set_process_latency_ms(0.0d);
                    boltAggregateStats.set_executed(0L);
                    boltAggregateStats.set_capacity(0.0d);
                    SpecificAggregateStats specificAggregateStats = new SpecificAggregateStats();
                    specificAggregateStats.set_bolt(boltAggregateStats);
                    componentAggregateStats.set_specific_stats(specificAggregateStats);
                    topologyPageInfo.get_id_to_bolt_agg_stats().put(str, componentAggregateStats);
                }
            }
        }
    }

    private CommonAggregateStats getPlaceholderCommonAggregateStats(Object obj) {
        CommonAggregateStats commonAggregateStats = new CommonAggregateStats();
        int i = 0;
        try {
            i = StormCommon.numStartExecutors(obj);
        } catch (InvalidTopologyException e) {
        }
        int intValue = ObjectReader.getInt(StormCommon.componentConf(obj).getOrDefault("topology.tasks", Integer.valueOf(i))).intValue();
        commonAggregateStats.set_num_executors(i);
        commonAggregateStats.set_num_tasks(intValue);
        commonAggregateStats.set_emitted(0L);
        commonAggregateStats.set_transferred(0L);
        commonAggregateStats.set_acked(0L);
        return commonAggregateStats;
    }

    public SupervisorPageInfo getSupervisorPageInfo(String str, String str2, boolean z) throws NotAliveException, AuthorizationException, TException {
        Map emptyMap;
        try {
            this.getSupervisorPageInfoCalls.mark();
            IStormClusterState iStormClusterState = this.stormClusterState;
            Map allSupervisorInfo = iStormClusterState.allSupervisorInfo();
            HashMap hashMap = new HashMap();
            for (Map.Entry entry : allSupervisorInfo.entrySet()) {
                String str3 = ((SupervisorInfo) entry.getValue()).get_hostname();
                List list = (List) hashMap.get(str3);
                if (list == null) {
                    list = new ArrayList();
                    hashMap.put(str3, list);
                }
                list.add(entry.getKey());
            }
            List<String> asList = str == null ? (List) hashMap.get(str2) : Arrays.asList(str);
            SupervisorPageInfo supervisorPageInfo = new SupervisorPageInfo();
            Map assignmentsInfo = iStormClusterState.assignmentsInfo();
            for (String str4 : asList) {
                SupervisorInfo supervisorInfo = (SupervisorInfo) allSupervisorInfo.get(str4);
                LOG.info("SIDL {} SI: {} ALL: {}", new Object[]{str4, supervisorInfo, allSupervisorInfo});
                supervisorPageInfo.add_to_supervisor_summaries(makeSupervisorSummary(str4, supervisorInfo));
                List<String> list2 = topologiesOnSupervisor(assignmentsInfo, str4);
                Set<String> filterAuthorized = filterAuthorized("getTopology", list2);
                for (String str5 : list2) {
                    CommonTopoInfo commonTopoInfo = getCommonTopoInfo(str5, "getSupervisorPageInfo");
                    String str6 = commonTopoInfo.topoName;
                    Assignment assignment = commonTopoInfo.assignment;
                    Map<List<Integer>, Map<String, Object>> map = commonTopoInfo.beats;
                    Map<Integer, String> map2 = commonTopoInfo.taskToComponent;
                    HashMap hashMap2 = new HashMap();
                    if (assignment != null) {
                        for (Map.Entry entry2 : assignment.get_executor_node_port().entrySet()) {
                            NodeInfo nodeInfo = (NodeInfo) entry2.getValue();
                            hashMap2.put(entry2.getKey(), Arrays.asList(nodeInfo.get_node(), nodeInfo.get_port_iterator().next()));
                        }
                        emptyMap = assignment.get_node_host();
                    } else {
                        emptyMap = Collections.emptyMap();
                    }
                    Iterator<WorkerSummary> it = StatsUtil.aggWorkerStats(str5, str6, map2, map, hashMap2, emptyMap, getWorkerResourcesForTopology(str5), z, filterAuthorized.contains(str5), str4, commonTopoInfo.base == null ? null : commonTopoInfo.base.get_owner()).iterator();
                    while (it.hasNext()) {
                        supervisorPageInfo.add_to_worker_summaries(it.next());
                    }
                }
            }
            return supervisorPageInfo;
        } catch (Exception e) {
            LOG.warn("Get super page info exception. (super id='{}')", str, e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public ComponentPageInfo getComponentPageInfo(String str, String str2, String str3, boolean z) throws NotAliveException, AuthorizationException, TException {
        DebugOptions debugOptions;
        try {
            this.getComponentPageInfoCalls.mark();
            CommonTopoInfo commonTopoInfo = getCommonTopoInfo(str, "getComponentPageInfo");
            if (commonTopoInfo.base == null) {
                throw new WrappedNotAliveException(str);
            }
            StormTopology stormTopology = commonTopoInfo.topology;
            Map merge = Utils.merge(this.conf, commonTopoInfo.topoConf);
            Assignment assignment = commonTopoInfo.assignment;
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            if (assignment != null) {
                Map map = assignment.get_executor_node_port();
                Map map2 = assignment.get_node_host();
                for (Map.Entry entry : map.entrySet()) {
                    NodeInfo nodeInfo = (NodeInfo) entry.getValue();
                    List asList = Arrays.asList(nodeInfo.get_node(), nodeInfo.get_port_iterator().next());
                    List asList2 = Arrays.asList(map2.get(nodeInfo.get_node()), nodeInfo.get_port_iterator().next());
                    hashMap.put(entry.getKey(), asList);
                    hashMap2.put(entry.getKey(), asList2);
                }
            } else {
                Collections.emptyMap();
            }
            ComponentPageInfo aggCompExecsStats = StatsUtil.aggCompExecsStats(hashMap2, commonTopoInfo.taskToComponent, commonTopoInfo.beats, str3, z, str, stormTopology, str2);
            if (aggCompExecsStats.get_component_type() == ComponentType.SPOUT) {
                NormalizedResourceRequest spoutResources = ResourceUtils.getSpoutResources(stormTopology, merge, str2);
                if (spoutResources == null) {
                    spoutResources = new NormalizedResourceRequest((Map<String, Object>) merge, str2);
                }
                aggCompExecsStats.set_resources_map(spoutResources.toNormalizedMap());
            } else {
                NormalizedResourceRequest boltResources = ResourceUtils.getBoltResources(stormTopology, merge, str2);
                if (boltResources == null) {
                    boltResources = new NormalizedResourceRequest((Map<String, Object>) merge, str2);
                }
                aggCompExecsStats.set_resources_map(boltResources.toNormalizedMap());
            }
            aggCompExecsStats.set_topology_name(commonTopoInfo.topoName);
            aggCompExecsStats.set_errors(this.stormClusterState.errors(str, str2));
            aggCompExecsStats.set_topology_status(extractStatusStr(commonTopoInfo.base));
            if (commonTopoInfo.base.is_set_component_debug() && (debugOptions = (DebugOptions) commonTopoInfo.base.get_component_debug().get(str2)) != null) {
                aggCompExecsStats.set_debug_options(debugOptions);
            }
            HashMap reverseMap = Utils.reverseMap(commonTopoInfo.taskToComponent);
            if (reverseMap.containsKey("__eventlogger")) {
                List list = (List) reverseMap.get("__eventlogger");
                list.sort(null);
                int intValue = ((Integer) list.get(TupleUtils.chooseTaskIndex(Collections.singletonList(str2), list.size()))).intValue();
                String str4 = null;
                Integer num = null;
                Iterator it = hashMap2.entrySet().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    Map.Entry entry2 = (Map.Entry) it.next();
                    int intValue2 = ((Long) ((List) entry2.getKey()).get(0)).intValue();
                    int intValue3 = ((Long) ((List) entry2.getKey()).get(1)).intValue();
                    if (intValue >= intValue2 && intValue <= intValue3) {
                        str4 = (String) ((List) entry2.getValue()).get(0);
                        num = Integer.valueOf(((Number) ((List) entry2.getValue()).get(1)).intValue());
                        break;
                    }
                }
                if (str4 != null && num != null) {
                    aggCompExecsStats.set_eventlog_host(str4);
                    aggCompExecsStats.set_eventlog_port(num.intValue());
                }
            }
            return aggCompExecsStats;
        } catch (Exception e) {
            LOG.warn("getComponentPageInfo exception. (topo id='{}')", str, e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public String getTopologyConf(String str) throws NotAliveException, AuthorizationException, TException {
        try {
            this.getTopologyConfCalls.mark();
            Map<String, Object> tryReadTopoConf = tryReadTopoConf(str, this.topoCache);
            Map<String, Object> merge = Utils.merge(this.conf, tryReadTopoConf);
            checkAuthorization((String) merge.get("topology.name"), merge, "getTopologyConf");
            return JSONValue.toJSONString(tryReadTopoConf);
        } catch (Exception e) {
            LOG.warn("Get topo conf exception. (topology id='{}')", str, e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public StormTopology getTopology(String str) throws NotAliveException, AuthorizationException, TException {
        try {
            this.getTopologyCalls.mark();
            Map<String, Object> merge = Utils.merge(this.conf, tryReadTopoConf(str, this.topoCache));
            checkAuthorization((String) merge.get("topology.name"), merge, "getTopology");
            return StormCommon.systemTopology(merge, tryReadTopology(str, this.topoCache));
        } catch (Exception e) {
            LOG.warn("Get topology exception. (topology id='{}')", str, e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public StormTopology getUserTopology(String str) throws NotAliveException, AuthorizationException, TException {
        try {
            this.getUserTopologyCalls.mark();
            Map<String, Object> merge = Utils.merge(this.conf, tryReadTopoConf(str, this.topoCache));
            checkAuthorization((String) merge.get("topology.name"), merge, "getUserTopology");
            return tryReadTopology(str, this.topoCache);
        } catch (Exception e) {
            LOG.warn("Get user topology exception. (topology id='{}')", str, e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public TopologyHistoryInfo getTopologyHistory(String str) throws AuthorizationException, TException {
        try {
            List list = (List) this.conf.getOrDefault("nimbus.admins", Collections.emptyList());
            Collection<String> collection = (List) this.conf.getOrDefault("nimbus.admins.groups", Collections.emptyList());
            List<String> assignments = this.stormClusterState.assignments((Runnable) null);
            HashSet hashSet = new HashSet();
            boolean contains = list.contains(str);
            for (String str2 : assignments) {
                Map merge = Utils.merge(this.conf, tryReadTopoConf(str2, this.topoCache));
                Collection<String> topoLogsGroups = ServerConfigUtils.getTopoLogsGroups(merge);
                List<String> topoLogsUsers = ServerConfigUtils.getTopoLogsUsers(merge);
                if (str == null || contains || isUserPartOf(str, topoLogsGroups) || isUserPartOf(str, collection) || topoLogsUsers.contains(str)) {
                    hashSet.add(str2);
                }
            }
            hashSet.addAll(readTopologyHistory(str, list));
            return new TopologyHistoryInfo(new ArrayList(hashSet));
        } catch (Exception e) {
            LOG.warn("Get topology history. (user='{}')", str, e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public ClusterSummary getClusterInfo() throws AuthorizationException, TException {
        try {
            this.getClusterInfoCalls.mark();
            checkAuthorization(null, null, "getClusterInfo");
            return getClusterInfoImpl();
        } catch (Exception e) {
            LOG.warn("Get cluster info exception.", e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public List<TopologySummary> getTopologySummaries() throws AuthorizationException, TException {
        try {
            this.getTopologySummariesCalls.mark();
            checkAuthorization(null, null, "getTopologySummaries");
            return getTopologySummariesImpl();
        } catch (Exception e) {
            LOG.warn("Get TopologySummary info exception.", e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public TopologySummary getTopologySummaryByName(String str) throws NotAliveException, AuthorizationException, TException {
        try {
            this.getTopologySummaryByNameCalls.mark();
            checkAuthorization(str, null, "getTopologySummaryByName");
            IStormClusterState iStormClusterState = this.stormClusterState;
            String str2 = (String) iStormClusterState.getTopoId(str).orElseThrow(() -> {
                return new WrappedNotAliveException(str + " is not alive");
            });
            return getTopologySummaryImpl(str2, (StormBase) iStormClusterState.topologyBases().get(str2));
        } catch (Exception e) {
            LOG.warn("Get TopologySummaryByName info exception.", e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public TopologySummary getTopologySummary(String str) throws NotAliveException, AuthorizationException, TException {
        try {
            this.getTopologySummaryCalls.mark();
            StormBase stormBase = (StormBase) this.stormClusterState.topologyBases().get(str);
            if (stormBase == null) {
                throw new WrappedNotAliveException(str + " is not alive");
            }
            checkAuthorization(stormBase.get_name(), null, "getTopologySummary");
            return getTopologySummaryImpl(str, stormBase);
        } catch (Exception e) {
            LOG.warn("Get TopologySummaryById info exception.", e);
            if (e instanceof TException) {
                throw e;
            }
            throw new RuntimeException((Throwable) e);
        }
    }

    public NimbusSummary getLeader() throws AuthorizationException, TException {
        this.getLeaderCalls.mark();
        checkAuthorization(null, null, "getLeader");
        List<NimbusSummary> nimbuses = this.stormClusterState.nimbuses();
        NimbusInfo leader = this.leaderElector.getLeader();
        for (NimbusSummary nimbusSummary : nimbuses) {
            if (leader.getHost().equals(nimbusSummary.get_host()) && leader.getPort() == nimbusSummary.get_port()) {
                nimbusSummary.set_uptime_secs(Time.deltaSecs(nimbusSummary.get_uptime_secs()));
                nimbusSummary.set_isLeader(true);
                return nimbusSummary;
            }
        }
        return null;
    }

    public boolean isTopologyNameAllowed(String str) throws AuthorizationException, TException {
        this.isTopologyNameAllowedCalls.mark();
        try {
            checkAuthorization(str, null, "isTopologyNameAllowed");
            validateTopologyName(str);
            assertTopoActive(str, false);
            return true;
        } catch (InvalidTopologyException | AlreadyAliveException e) {
            return false;
        }
    }

    public List<OwnerResourceSummary> getOwnerResourceSummaries(String str) throws AuthorizationException, TException {
        try {
            this.getOwnerResourceSummariesCalls.mark();
            checkAuthorization(null, null, "getOwnerResourceSummaries");
            IStormClusterState iStormClusterState = this.stormClusterState;
            Map assignmentsInfo = iStormClusterState.assignmentsInfo();
            Map map = iStormClusterState.topologyBases();
            Map config = this.scheduler.config();
            HashMap hashMap = new HashMap();
            if (str == null) {
                for (StormBase stormBase : map.values()) {
                    String str2 = stormBase.get_owner();
                    if (hashMap.containsKey(str2)) {
                        ((List) hashMap.get(str2)).add(stormBase);
                    } else {
                        ArrayList arrayList = new ArrayList();
                        arrayList.add(stormBase);
                        hashMap.put(str2, arrayList);
                    }
                }
                for (String str3 : new ArrayList(config.keySet())) {
                    if (!hashMap.containsKey(str3)) {
                        hashMap.put(str3, new ArrayList());
                    }
                }
            } else {
                ArrayList arrayList2 = new ArrayList();
                for (StormBase stormBase2 : map.values()) {
                    if (str.equals(stormBase2.get_owner())) {
                        arrayList2.add(stormBase2);
                    }
                }
                hashMap.put(str, arrayList2);
            }
            ArrayList arrayList3 = new ArrayList();
            for (Map.Entry entry : hashMap.entrySet()) {
                String str4 = (String) entry.getKey();
                TopologyResources topologyResources = new TopologyResources();
                int i = 0;
                int i2 = 0;
                int i3 = 0;
                for (StormBase stormBase3 : (List) entry.getValue()) {
                    try {
                        String topoId = toTopoId(stormBase3.get_name());
                        topologyResources = topologyResources.add(getResourcesForTopology(topoId, stormBase3));
                        Assignment assignment = (Assignment) assignmentsInfo.get(topoId);
                        if (assignment != null && assignment.get_executor_node_port() != null) {
                            i += assignment.get_executor_node_port().keySet().size();
                            i2 += new HashSet(assignment.get_executor_node_port().values()).size();
                            Iterator it = assignment.get_executor_node_port().keySet().iterator();
                            while (it.hasNext()) {
                                i3 += StormCommon.executorIdToTasks((List) it.next()).size();
                            }
                        }
                    } catch (NotAliveException e) {
                        LOG.warn("{} is not alive.", stormBase3.get_name());
                    }
                }
                double requestedMemOnHeap = topologyResources.getRequestedMemOnHeap() + topologyResources.getRequestedMemOffHeap();
                double assignedMemOnHeap = topologyResources.getAssignedMemOnHeap() + topologyResources.getAssignedMemOffHeap();
                OwnerResourceSummary ownerResourceSummary = new OwnerResourceSummary(str4);
                ownerResourceSummary.set_total_topologies(((List) entry.getValue()).size());
                ownerResourceSummary.set_total_executors(i);
                ownerResourceSummary.set_total_workers(i2);
                ownerResourceSummary.set_total_tasks(i3);
                ownerResourceSummary.set_memory_usage(assignedMemOnHeap);
                ownerResourceSummary.set_cpu_usage(topologyResources.getAssignedCpu());
                ownerResourceSummary.set_requested_on_heap_memory(topologyResources.getRequestedMemOnHeap());
                ownerResourceSummary.set_requested_off_heap_memory(topologyResources.getRequestedMemOffHeap());
                ownerResourceSummary.set_requested_total_memory(requestedMemOnHeap);
                ownerResourceSummary.set_requested_cpu(topologyResources.getRequestedCpu());
                ownerResourceSummary.set_assigned_on_heap_memory(topologyResources.getAssignedMemOnHeap());
                ownerResourceSummary.set_assigned_off_heap_memory(topologyResources.getAssignedMemOffHeap());
                if (config.containsKey(str4)) {
                    if (this.underlyingScheduler instanceof ResourceAwareScheduler) {
                        Map map2 = (Map) config.get(str4);
                        if (map2 != null) {
                            ownerResourceSummary.set_memory_guarantee(((Double) map2.getOrDefault("memory", 0)).doubleValue());
                            ownerResourceSummary.set_cpu_guarantee(((Double) map2.getOrDefault("cpu", 0)).doubleValue());
                            ownerResourceSummary.set_memory_guarantee_remaining(ownerResourceSummary.get_memory_guarantee() - ownerResourceSummary.get_memory_usage());
                            ownerResourceSummary.set_cpu_guarantee_remaining(ownerResourceSummary.get_cpu_guarantee() - ownerResourceSummary.get_cpu_usage());
                        }
                    } else if (this.underlyingScheduler instanceof MultitenantScheduler) {
                        ownerResourceSummary.set_isolated_node_guarantee(((Integer) ((Number) config.getOrDefault(str4, 0))).intValue());
                    }
                }
                LOG.debug("{}", ownerResourceSummary.toString());
                arrayList3.add(ownerResourceSummary);
            }
            return arrayList3;
        } catch (Exception e2) {
            LOG.warn("Get owner resource summaries exception. (owner = '{}')", str);
            if (e2 instanceof TException) {
                throw e2;
            }
            throw new RuntimeException((Throwable) e2);
        }
    }

    public SupervisorAssignments getSupervisorAssignments(String str) throws AuthorizationException, TException {
        checkAuthorization(null, null, "getSupervisorAssignments");
        try {
            if (!isLeader() || !isAssignmentsRecovered()) {
                return null;
            }
            SupervisorAssignments supervisorAssignments = new SupervisorAssignments();
            supervisorAssignments.set_storm_assignment(assignmentsForNodeId(this.stormClusterState.assignmentsInfo(), str));
            return supervisorAssignments;
        } catch (Exception e) {
            LOG.debug("Exception when node {} fetching assignments", str);
            if (e instanceof TException) {
                throw e;
            }
            LOG.debug("Exception when node {} fetching assignments", str);
            return null;
        }
    }

    public void sendSupervisorWorkerHeartbeats(SupervisorWorkerHeartbeats supervisorWorkerHeartbeats) throws AuthorizationException, TException {
        checkAuthorization(null, null, "sendSupervisorWorkerHeartbeats");
        try {
            if (isLeader()) {
                updateCachedHeartbeatsFromSupervisor(supervisorWorkerHeartbeats);
            }
        } catch (Exception e) {
            LOG.debug("Exception when update heartbeats for node {} heartbeats report.", supervisorWorkerHeartbeats.get_supervisor_id());
            if (e instanceof TException) {
                throw e;
            }
        }
    }

    public void sendSupervisorWorkerHeartbeat(SupervisorWorkerHeartbeat supervisorWorkerHeartbeat) throws AuthorizationException, TException {
        String str = supervisorWorkerHeartbeat.get_storm_id();
        try {
            Map<String, Object> tryReadTopoConf = tryReadTopoConf(str, this.topoCache);
            checkAuthorization((String) tryReadTopoConf.get("topology.name"), tryReadTopoConf, "sendSupervisorWorkerHeartbeat");
            if (isLeader()) {
                updateCachedHeartbeatsFromWorker(supervisorWorkerHeartbeat, getTopologyHeartbeatTimeoutSecs(tryReadTopoConf));
            }
        } catch (Exception e) {
            LOG.warn("Send HB exception. (topology id='{}')", str, e);
            if (!(e instanceof TException)) {
                throw new RuntimeException((Throwable) e);
            }
            throw e;
        }
    }

    public void shutdown() {
        this.shutdownCalls.mark();
        try {
            LOG.info("Shutting down master");
            this.timer.close();
            this.stormClusterState.disconnect();
            this.downloaders.cleanup();
            this.uploaders.cleanup();
            this.blobDownloaders.cleanup();
            this.blobUploaders.cleanup();
            this.blobListers.cleanup();
            this.scheduler.cleanup();
            this.blobStore.shutdown();
            this.leaderElector.close();
            this.assignmentsDistributer.close();
            ITopologyActionNotifierPlugin iTopologyActionNotifierPlugin = this.nimbusTopologyActionNotifier;
            if (iTopologyActionNotifierPlugin != null) {
                iTopologyActionNotifierPlugin.cleanup();
            }
            this.zkClient.close();
            if (this.metricsStore != null) {
                this.metricsStore.close();
            }
            this.clusterMetricSet.setActive(false);
            LOG.info("Shut down master");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public boolean isWaiting() {
        return this.timer.isTimerWaiting();
    }

    public void processWorkerMetrics(WorkerMetrics workerMetrics) throws TException {
        this.processWorkerMetricsCalls.mark();
        checkAuthorization(null, null, "processWorkerMetrics");
        if (this.metricsStore == null) {
            return;
        }
        for (WorkerMetricPoint workerMetricPoint : workerMetrics.get_metricList().get_metrics()) {
            try {
                this.metricsStore.insert(new org.apache.storm.metricstore.Metric(workerMetricPoint.get_metricName(), Long.valueOf(workerMetricPoint.get_timestamp()), workerMetrics.get_topologyId(), workerMetricPoint.get_metricValue(), workerMetricPoint.get_componentId(), workerMetricPoint.get_executorId(), workerMetrics.get_hostname(), workerMetricPoint.get_streamId(), workerMetrics.get_port(), AggLevel.AGG_LEVEL_NONE));
            } catch (Exception e) {
                LOG.error("Failed to save metric", e);
            }
        }
    }

    public boolean isRemoteBlobExists(String str) throws AuthorizationException, TException {
        try {
            this.blobStore.getBlobMeta(str, getSubject());
            return true;
        } catch (KeyNotFoundException e) {
            return false;
        }
    }

    static {
        $assertionsDisabled = !Nimbus.class.desiredAssertionStatus();
        ZK_ACLS = Arrays.asList((ACL) ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
        MIN_VERSION_SUPPORT_RPC_HEARTBEAT = new SimpleVersion("2.0.0");
        LOG = LoggerFactory.getLogger(Nimbus.class);
        STORM_VERSION = VersionInfo.getVersion();
        NIMBUS_SUBJECT = new Subject();
        NIMBUS_SUBJECT.getPrincipals().add(new NimbusPrincipal());
        NIMBUS_SUBJECT.setReadOnly();
        NOOP_TRANSITION = (obj, nimbus, str, stormBase) -> {
            return null;
        };
        INACTIVE_TRANSITION = (obj2, nimbus2, str2, stormBase2) -> {
            return make(TopologyStatus.INACTIVE);
        };
        ACTIVE_TRANSITION = (obj3, nimbus3, str3, stormBase3) -> {
            return make(TopologyStatus.ACTIVE);
        };
        REMOVE_TRANSITION = (obj4, nimbus4, str4, stormBase4) -> {
            LOG.info("Killing topology: {}", str4);
            IStormClusterState stormClusterState = nimbus4.getStormClusterState();
            Assignment assignmentInfo = stormClusterState.assignmentInfo(str4, (Runnable) null);
            stormClusterState.removeStorm(str4);
            notifySupervisorsAsKilled(stormClusterState, assignmentInfo, nimbus4.getAssignmentsDistributer(), nimbus4.getMetricsRegistry());
            nimbus4.heartbeatsCache.removeTopo(str4);
            nimbus4.getIdToExecutors().getAndUpdate(new Dissoc(str4));
            return null;
        };
        DO_REBALANCE_TRANSITION = (obj5, nimbus5, str5, stormBase5) -> {
            nimbus5.doRebalance(str5, stormBase5);
            return make(stormBase5.get_prev_status());
        };
        KILL_TRANSITION = (obj6, nimbus6, str6, stormBase6) -> {
            int intValue = obj6 != null ? ((Number) obj6).intValue() : ObjectReader.getInt(readTopoConf(str6, nimbus6.getTopoCache()).get("topology.message.timeout.secs")).intValue();
            nimbus6.delayEvent(str6, intValue, TopologyActions.REMOVE, null);
            StormBase stormBase6 = new StormBase();
            stormBase6.set_status(TopologyStatus.KILLED);
            TopologyActionOptions topologyActionOptions = new TopologyActionOptions();
            KillOptions killOptions = new KillOptions();
            killOptions.set_wait_secs(intValue);
            topologyActionOptions.set_kill_options(killOptions);
            stormBase6.set_topology_action_options(topologyActionOptions);
            stormBase6.set_component_executors(Collections.emptyMap());
            stormBase6.set_component_debug(Collections.emptyMap());
            return stormBase6;
        };
        REBALANCE_TRANSITION = (obj7, nimbus7, str7, stormBase7) -> {
            RebalanceOptions deepCopy = ((RebalanceOptions) obj7).deepCopy();
            int intValue = deepCopy.is_set_wait_secs() ? deepCopy.get_wait_secs() : ObjectReader.getInt(readTopoConf(str7, nimbus7.getTopoCache()).get("topology.message.timeout.secs")).intValue();
            nimbus7.delayEvent(str7, intValue, TopologyActions.DO_REBALANCE, null);
            deepCopy.set_wait_secs(intValue);
            if (!deepCopy.is_set_num_executors()) {
                deepCopy.set_num_executors(Collections.emptyMap());
            }
            StormBase stormBase7 = new StormBase();
            stormBase7.set_status(TopologyStatus.REBALANCING);
            stormBase7.set_prev_status(stormBase7.get_status());
            TopologyActionOptions topologyActionOptions = new TopologyActionOptions();
            topologyActionOptions.set_rebalance_options(deepCopy);
            stormBase7.set_topology_action_options(topologyActionOptions);
            stormBase7.set_component_executors(Collections.emptyMap());
            stormBase7.set_component_debug(Collections.emptyMap());
            return stormBase7;
        };
        GAIN_LEADERSHIP_WHEN_KILLED_TRANSITION = (obj8, nimbus8, str8, stormBase8) -> {
            nimbus8.delayEvent(str8, stormBase8.get_topology_action_options().get_kill_options().get_wait_secs(), TopologyActions.REMOVE, null);
            return null;
        };
        GAIN_LEADERSHIP_WHEN_REBALANCING_TRANSITION = (obj9, nimbus9, str9, stormBase9) -> {
            nimbus9.delayEvent(str9, stormBase9.get_topology_action_options().get_rebalance_options().get_wait_secs(), TopologyActions.DO_REBALANCE, null);
            return null;
        };
        TOPO_STATE_TRANSITIONS = new ImmutableMap.Builder().put(TopologyStatus.ACTIVE, new ImmutableMap.Builder().put(TopologyActions.INACTIVATE, INACTIVE_TRANSITION).put(TopologyActions.ACTIVATE, NOOP_TRANSITION).put(TopologyActions.REBALANCE, REBALANCE_TRANSITION).put(TopologyActions.KILL, KILL_TRANSITION).build()).put(TopologyStatus.INACTIVE, new ImmutableMap.Builder().put(TopologyActions.ACTIVATE, ACTIVE_TRANSITION).put(TopologyActions.INACTIVATE, NOOP_TRANSITION).put(TopologyActions.REBALANCE, REBALANCE_TRANSITION).put(TopologyActions.KILL, KILL_TRANSITION).build()).put(TopologyStatus.KILLED, new ImmutableMap.Builder().put(TopologyActions.GAIN_LEADERSHIP, GAIN_LEADERSHIP_WHEN_KILLED_TRANSITION).put(TopologyActions.KILL, KILL_TRANSITION).put(TopologyActions.REMOVE, REMOVE_TRANSITION).build()).put(TopologyStatus.REBALANCING, new ImmutableMap.Builder().put(TopologyActions.GAIN_LEADERSHIP, GAIN_LEADERSHIP_WHEN_REBALANCING_TRANSITION).put(TopologyActions.KILL, KILL_TRANSITION).put(TopologyActions.DO_REBALANCE, DO_REBALANCE_TRANSITION).build()).build();
        EMPTY_STRING_LIST = Collections.unmodifiableList(Collections.emptyList());
        EMPTY_STRING_SET = Collections.unmodifiableSet(Collections.emptySet());
        topologyCleanupDetected = new RotatingMap<>(2);
        topologyCleanupRotationTime = 0L;
    }
}
