/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.monitor;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.linkedin.cruisecontrol.exception.NotEnoughValidWindowsException;
import com.linkedin.cruisecontrol.metricdef.MetricDef;
import com.linkedin.cruisecontrol.monitor.sampling.aggregator.MetricSampleAggregationResult;
import com.linkedin.cruisecontrol.monitor.sampling.aggregator.MetricSampleCompleteness;
import com.linkedin.cruisecontrol.monitor.sampling.aggregator.ValuesAndExtrapolations;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.AnalyzerUtils;
import com.linkedin.kafka.cruisecontrol.async.progress.GeneratingClusterModel;
import com.linkedin.kafka.cruisecontrol.async.progress.OperationProgress;
import com.linkedin.kafka.cruisecontrol.async.progress.WaitingForClusterModel;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigResolver;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityInfo;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.TopicConfigProvider;
import com.linkedin.kafka.cruisecontrol.exception.BrokerCapacityResolutionException;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitorState;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import com.linkedin.kafka.cruisecontrol.monitor.ModelGeneration;
import com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaBrokerMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaPartitionMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerEntity;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionEntity;
import com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner;
import com.linkedin.kafka.cruisecontrol.servlet.response.stats.BrokerStats;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoadMonitor {
    private static final Logger LOG = LoggerFactory.getLogger(LoadMonitor.class);
    private static final long METADATA_TTL = 10000L;
    private static final long METADATA_REFRESH_BACKOFF = 5000L;
    private final long _monitorStateUpdateTimeoutMs;
    private final LoadMonitorTaskRunner _loadMonitorTaskRunner;
    private final KafkaPartitionMetricSampleAggregator _partitionMetricSampleAggregator;
    private final KafkaBrokerMetricSampleAggregator _brokerMetricSampleAggregator;
    private final Semaphore _clusterModelSemaphore;
    private final KafkaCruiseControlConfig _config;
    private final MetadataClient _metadataClient;
    private final AdminClient _adminClient;
    private final BrokerCapacityConfigResolver _brokerCapacityConfigResolver;
    private final TopicConfigProvider _topicConfigProvider;
    private final ScheduledExecutorService _loadMonitorExecutor;
    private final Timer _clusterModelCreationTimer;
    private final ThreadLocal<Boolean> _acquiredClusterModelSemaphore;
    private final ModelCompletenessRequirements _defaultModelCompletenessRequirements;
    private final Time _time;
    private volatile int _numValidSnapshotWindows;
    private volatile double _monitoredPartitionsPercentage;
    private volatile int _totalMonitoredSnapshotWindows;
    private volatile int _numPartitionsWithExtrapolations;
    private volatile long _latestStateUpdateMs;
    private volatile int _totalNumPartitions;
    private volatile ModelGeneration _cachedBrokerLoadGeneration;
    private volatile BrokerStats _cachedBrokerLoadStats;

    public LoadMonitor(KafkaCruiseControlConfig config, Time time, MetricRegistry dropwizardMetricRegistry, MetricDef metricDef) {
        this(config, new MetadataClient(config, new Metadata(5000L, config.getLong("metadata.max.age.ms").longValue(), new LogContext(), new ClusterResourceListeners()), 10000L, time), KafkaCruiseControlUtils.createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(config)), time, dropwizardMetricRegistry, metricDef);
    }

    LoadMonitor(KafkaCruiseControlConfig config, MetadataClient metadataClient, AdminClient adminClient, Time time, MetricRegistry dropwizardMetricRegistry, MetricDef metricDef) {
        this._config = config;
        this._metadataClient = metadataClient;
        this._adminClient = adminClient;
        this._time = time;
        this._brokerCapacityConfigResolver = config.getConfiguredInstance("broker.capacity.config.resolver.class", BrokerCapacityConfigResolver.class);
        long monitorStateUpdateIntervalMs = config.getLong("monitor.state.update.interval.ms");
        this._monitorStateUpdateTimeoutMs = 10L * monitorStateUpdateIntervalMs;
        this._topicConfigProvider = config.getConfiguredInstance("topic.config.provider.class", TopicConfigProvider.class);
        this._partitionMetricSampleAggregator = new KafkaPartitionMetricSampleAggregator(config, metadataClient.metadata());
        this._brokerMetricSampleAggregator = new KafkaBrokerMetricSampleAggregator(config);
        this._acquiredClusterModelSemaphore = ThreadLocal.withInitial(() -> false);
        int numPrecomputingThread = config.getInt("num.proposal.precompute.threads");
        this._clusterModelSemaphore = new Semaphore(Math.max(1, numPrecomputingThread), true);
        this._defaultModelCompletenessRequirements = MonitorUtils.combineLoadRequirementOptions(AnalyzerUtils.getGoalsByPriority(config));
        this._loadMonitorTaskRunner = new LoadMonitorTaskRunner(config, this._partitionMetricSampleAggregator, this._brokerMetricSampleAggregator, this._metadataClient, metricDef, time, dropwizardMetricRegistry, this._brokerCapacityConfigResolver);
        this._clusterModelCreationTimer = dropwizardMetricRegistry.timer(MetricRegistry.name((String)"LoadMonitor", (String[])new String[]{"cluster-model-creation-timer"}));
        this._loadMonitorExecutor = Executors.newScheduledThreadPool(2, new KafkaCruiseControlThreadFactory("LoadMonitorExecutor", true, LOG));
        this._loadMonitorExecutor.scheduleAtFixedRate(new SensorUpdater(), 0L, monitorStateUpdateIntervalMs, TimeUnit.MILLISECONDS);
        this._loadMonitorExecutor.scheduleAtFixedRate(new PartitionMetricSampleAggregatorCleaner(), 0L, 37500L, TimeUnit.MILLISECONDS);
        dropwizardMetricRegistry.register(MetricRegistry.name((String)"LoadMonitor", (String[])new String[]{"valid-windows"}), (Metric)((Gauge)this::numValidSnapshotWindows));
        dropwizardMetricRegistry.register(MetricRegistry.name((String)"LoadMonitor", (String[])new String[]{"monitored-partitions-percentage"}), (Metric)((Gauge)this::monitoredPartitionsPercentage));
        dropwizardMetricRegistry.register(MetricRegistry.name((String)"LoadMonitor", (String[])new String[]{"total-monitored-windows"}), (Metric)((Gauge)this::totalMonitoredSnapshotWindows));
        dropwizardMetricRegistry.register(MetricRegistry.name((String)"LoadMonitor", (String[])new String[]{"num-partitions-with-extrapolations"}), (Metric)((Gauge)this::numPartitionsWithExtrapolations));
    }

    public void startUp() {
        this._loadMonitorTaskRunner.start(this._config.getBoolean("skip.loading.samples"));
    }

    public void shutdown() {
        LOG.info("Shutting down load monitor.");
        try {
            this._brokerCapacityConfigResolver.close();
            this._topicConfigProvider.close();
            this._loadMonitorExecutor.shutdown();
        }
        catch (Exception e) {
            LOG.warn("Received exception when closing broker capacity resolver.", (Throwable)e);
        }
        this._loadMonitorTaskRunner.shutdown();
        this._metadataClient.close();
        KafkaCruiseControlUtils.closeAdminClientWithTimeout(this._adminClient);
        LOG.info("Load Monitor shutdown completed.");
    }

    public LoadMonitorState state(Cluster kafkaCluster) {
        LoadMonitorTaskRunner.LoadMonitorTaskRunnerState state = this._loadMonitorTaskRunner.state();
        SortedMap<Long, Float> validPartitionRatio = this._partitionMetricSampleAggregator.validPartitionRatioByWindows(kafkaCluster);
        switch (state) {
            case NOT_STARTED: {
                return LoadMonitorState.notStarted();
            }
            case RUNNING: {
                return LoadMonitorState.running(this.numValidSnapshotWindows(), validPartitionRatio, this.monitoredPartitionsPercentage(), this._totalNumPartitions, this.numPartitionsWithExtrapolations(), this._loadMonitorTaskRunner.reasonOfLatestPauseOrResume());
            }
            case SAMPLING: {
                return LoadMonitorState.sampling(this.numValidSnapshotWindows(), validPartitionRatio, this.monitoredPartitionsPercentage(), this._totalNumPartitions, this.numPartitionsWithExtrapolations());
            }
            case PAUSED: {
                return LoadMonitorState.paused(this.numValidSnapshotWindows(), validPartitionRatio, this.monitoredPartitionsPercentage(), this._totalNumPartitions, this.numPartitionsWithExtrapolations(), this._loadMonitorTaskRunner.reasonOfLatestPauseOrResume());
            }
            case BOOTSTRAPPING: {
                double bootstrapProgress = this._loadMonitorTaskRunner.bootStrapProgress();
                return LoadMonitorState.bootstrapping(this.numValidSnapshotWindows(), validPartitionRatio, this.monitoredPartitionsPercentage(), this._totalNumPartitions, bootstrapProgress >= 0.0 ? bootstrapProgress : 1.0, this.numPartitionsWithExtrapolations());
            }
            case TRAINING: {
                return LoadMonitorState.training(this.numValidSnapshotWindows(), validPartitionRatio, this.monitoredPartitionsPercentage(), this._totalNumPartitions, this.numPartitionsWithExtrapolations());
            }
            case LOADING: {
                return LoadMonitorState.loading(this.numValidSnapshotWindows(), validPartitionRatio, this.monitoredPartitionsPercentage(), this._totalNumPartitions, this._loadMonitorTaskRunner.sampleLoadingProgress());
            }
        }
        throw new IllegalStateException("Should never be here.");
    }

    public TopicConfigProvider topicConfigProvider() {
        return this._topicConfigProvider;
    }

    public LoadMonitorTaskRunner.LoadMonitorTaskRunnerState taskRunnerState() {
        return this._loadMonitorTaskRunner.state();
    }

    public void bootstrap(long startMs, long endMs, boolean clearMetrics) {
        this._loadMonitorTaskRunner.bootstrap(startMs, endMs, clearMetrics);
    }

    public void bootstrap(long startMs, boolean clearMetrics) {
        this._loadMonitorTaskRunner.bootstrap(startMs, clearMetrics);
    }

    public void bootstrap(boolean clearMetrics) {
        this._loadMonitorTaskRunner.bootstrap(clearMetrics);
    }

    public void train(long startMs, long endMs) {
        this._loadMonitorTaskRunner.train(startMs, endMs);
    }

    public Cluster kafkaCluster() {
        return this._metadataClient.cluster();
    }

    public void pauseMetricSampling(String reason, boolean forcePauseSampling) {
        this._loadMonitorTaskRunner.pauseSampling(reason, forcePauseSampling);
    }

    public void resumeMetricSampling(String reason) {
        this._loadMonitorTaskRunner.resumeSampling(reason);
    }

    public MetricSampler.SamplingMode samplingMode() {
        return this._loadMonitorTaskRunner.samplingMode();
    }

    public void setSamplingMode(MetricSampler.SamplingMode samplingMode) {
        this._loadMonitorTaskRunner.setSamplingMode(samplingMode);
    }

    public AutoCloseableSemaphore acquireForModelGeneration(OperationProgress operationProgress) throws InterruptedException {
        if (this._acquiredClusterModelSemaphore.get().booleanValue()) {
            throw new IllegalStateException("The thread has already acquired the semaphore for cluster model generation.");
        }
        WaitingForClusterModel step = new WaitingForClusterModel();
        operationProgress.addStep(step);
        this._clusterModelSemaphore.acquire();
        this._acquiredClusterModelSemaphore.set(true);
        step.done();
        return new AutoCloseableSemaphore();
    }

    public Map<BrokerEntity, ValuesAndExtrapolations> currentBrokerMetricValues() {
        return this._brokerMetricSampleAggregator.peekCurrentWindow();
    }

    public Map<PartitionEntity, ValuesAndExtrapolations> currentPartitionMetricValues() {
        return this._partitionMetricSampleAggregator.peekCurrentWindow();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ClusterModel clusterModel(long now, ModelCompletenessRequirements requirements, boolean allowCapacityEstimation, OperationProgress operationProgress) throws NotEnoughValidWindowsException, TimeoutException, BrokerCapacityResolutionException {
        ClusterModel clusterModel = this.clusterModel(-1L, now, requirements, allowCapacityEstimation, operationProgress);
        BrokerStats brokerStats = clusterModel.brokerStats(this._config);
        LoadMonitor loadMonitor = this;
        synchronized (loadMonitor) {
            this._cachedBrokerLoadStats = brokerStats;
            this._cachedBrokerLoadGeneration = clusterModel.generation();
        }
        return clusterModel;
    }

    public ClusterModel clusterModel(long from, long to, ModelCompletenessRequirements requirements, boolean allowCapacityEstimation, OperationProgress operationProgress) throws NotEnoughValidWindowsException, TimeoutException, BrokerCapacityResolutionException {
        return this.clusterModel(from, to, requirements, false, allowCapacityEstimation, operationProgress);
    }

    public ClusterModel clusterCapacity() throws TimeoutException, BrokerCapacityResolutionException {
        MetadataClient.ClusterAndGeneration clusterAndGeneration = this._metadataClient.refreshMetadata();
        Cluster cluster = clusterAndGeneration.cluster();
        ModelGeneration modelGeneration = new ModelGeneration(clusterAndGeneration.generation(), -1L);
        ClusterModel clusterModel = new ClusterModel(modelGeneration, 0.0);
        this.populateClusterCapacity(false, false, clusterModel, cluster);
        MonitorUtils.setBadBrokerState(clusterModel, cluster);
        return clusterModel;
    }

    private void populateClusterCapacity(boolean populateReplicaPlacementInfo, boolean allowCapacityEstimation, ClusterModel clusterModel, Cluster cluster) throws TimeoutException, BrokerCapacityResolutionException {
        List shuffledNodes;
        List list = shuffledNodes = allowCapacityEstimation ? new ArrayList(cluster.nodes()) : cluster.nodes();
        if (allowCapacityEstimation) {
            Collections.shuffle(shuffledNodes);
        }
        for (Node node : shuffledNodes) {
            BrokerCapacityInfo brokerCapacity;
            String rack = MonitorUtils.getRackHandleNull(node);
            clusterModel.createRack(rack);
            try {
                brokerCapacity = this._brokerCapacityConfigResolver.capacityForBroker(rack, node.host(), node.id(), 10000L, allowCapacityEstimation);
                LOG.debug("Get capacity info for broker {}: total capacity {}, capacity by logdir {}.", new Object[]{node.id(), brokerCapacity.capacity().get((Object)Resource.DISK), brokerCapacity.diskCapacityByLogDir()});
            }
            catch (BrokerCapacityResolutionException | TimeoutException e) {
                String errorMessage = String.format("Unable to retrieve capacity for broker %d. This may be caused by churn in the cluster, please retry.", node.id());
                LOG.warn(errorMessage, (Throwable)e);
                throw e;
            }
            clusterModel.createBroker(rack, node.host(), node.id(), brokerCapacity, populateReplicaPlacementInfo);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ClusterModel clusterModel(long from, long to, ModelCompletenessRequirements requirements, boolean populateReplicaPlacementInfo, boolean allowCapacityEstimation, OperationProgress operationProgress) throws NotEnoughValidWindowsException, TimeoutException, BrokerCapacityResolutionException {
        long startMs = this._time.milliseconds();
        MetadataClient.ClusterAndGeneration clusterAndGeneration = this._metadataClient.refreshMetadata();
        Cluster cluster = clusterAndGeneration.cluster();
        MetricSampleAggregationResult<String, PartitionEntity> partitionMetricSampleAggregationResult = this._partitionMetricSampleAggregator.aggregate(cluster, from, to, requirements, operationProgress);
        Map partitionValuesAndExtrapolations = partitionMetricSampleAggregationResult.valuesAndExtrapolations();
        GeneratingClusterModel step = new GeneratingClusterModel(partitionValuesAndExtrapolations.size());
        operationProgress.addStep(step);
        long currentLoadGeneration = partitionMetricSampleAggregationResult.generation();
        ModelGeneration modelGeneration = new ModelGeneration(clusterAndGeneration.generation(), currentLoadGeneration);
        MetricSampleCompleteness completeness = partitionMetricSampleAggregationResult.completeness();
        ClusterModel clusterModel = new ClusterModel(modelGeneration, completeness.validEntityRatio());
        Timer.Context ctx = this._clusterModelCreationTimer.time();
        try {
            this.populateClusterCapacity(populateReplicaPlacementInfo, allowCapacityEstimation, clusterModel, cluster);
            Map<TopicPartition, Map<Integer, String>> replicaPlacementInfo = null;
            if (populateReplicaPlacementInfo) {
                replicaPlacementInfo = MonitorUtils.getReplicaPlacementInfo(clusterModel, cluster, this._adminClient, this._config);
            }
            for (Map.Entry entry : partitionValuesAndExtrapolations.entrySet()) {
                TopicPartition tp = ((PartitionEntity)((Object)entry.getKey())).tp();
                ValuesAndExtrapolations leaderLoad = (ValuesAndExtrapolations)entry.getValue();
                MonitorUtils.populatePartitionLoad(cluster, clusterModel, tp, leaderLoad, replicaPlacementInfo, this._brokerCapacityConfigResolver, allowCapacityEstimation);
                step.incrementPopulatedNumPartitions();
            }
            MonitorUtils.setBadBrokerState(clusterModel, cluster);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Generated cluster model in {} ms", (Object)(this._time.milliseconds() - startMs));
            }
        }
        finally {
            ctx.stop();
        }
        return clusterModel;
    }

    public ModelGeneration clusterModelGeneration() {
        int clusterGeneration = this._metadataClient.refreshMetadata().generation();
        return new ModelGeneration(clusterGeneration, this._partitionMetricSampleAggregator.generation());
    }

    public synchronized BrokerStats cachedBrokerLoadStats(boolean allowCapacityEstimation) {
        if (this._cachedBrokerLoadGeneration != null && (allowCapacityEstimation || !this._cachedBrokerLoadStats.isBrokerStatsEstimated()) && this._partitionMetricSampleAggregator.generation().longValue() == this._cachedBrokerLoadGeneration.loadGeneration() && this._metadataClient.refreshMetadata().generation() == this._cachedBrokerLoadGeneration.clusterGeneration()) {
            return this._cachedBrokerLoadStats;
        }
        return null;
    }

    public Set<Integer> brokersWithReplicas(long timeout) {
        Cluster kafkaCluster = this._metadataClient.refreshMetadata(timeout).cluster();
        return MonitorUtils.brokersWithReplicas(kafkaCluster);
    }

    public MetadataClient.ClusterAndGeneration refreshClusterAndGeneration() {
        return this._metadataClient.refreshMetadata();
    }

    public boolean meetCompletenessRequirements(Cluster cluster, ModelCompletenessRequirements requirements) {
        int requiredNumValidWindows;
        int numValidWindows = this._partitionMetricSampleAggregator.validWindows(cluster, requirements.minMonitoredPartitionsPercentage()).size();
        return numValidWindows >= (requiredNumValidWindows = requirements.minRequiredNumWindows());
    }

    public boolean meetCompletenessRequirements(ModelCompletenessRequirements requirements) {
        MetadataClient.ClusterAndGeneration clusterAndGeneration = this._metadataClient.refreshMetadata();
        return this.meetCompletenessRequirements(clusterAndGeneration.cluster(), requirements);
    }

    public MetricSampleAggregationResult<String, BrokerEntity> brokerMetrics() {
        List nodes = this._metadataClient.cluster().nodes();
        HashSet<BrokerEntity> brokerEntities = new HashSet<BrokerEntity>(nodes.size());
        for (Node node : nodes) {
            brokerEntities.add(new BrokerEntity(node.host(), node.id()));
        }
        return this._brokerMetricSampleAggregator.aggregate(brokerEntities);
    }

    KafkaPartitionMetricSampleAggregator partitionSampleAggregator() {
        return this._partitionMetricSampleAggregator;
    }

    public Set<Integer> deadBrokersWithReplicas(long timeout) {
        Cluster kafkaCluster = this._metadataClient.refreshMetadata(timeout).cluster();
        return MonitorUtils.deadBrokersWithReplicas(kafkaCluster);
    }

    public Set<Integer> brokersWithOfflineReplicas(long timeout) {
        Cluster kafkaCluster = this._metadataClient.refreshMetadata(timeout).cluster();
        return MonitorUtils.brokersWithOfflineReplicas(kafkaCluster);
    }

    private int numValidSnapshotWindows() {
        return this._latestStateUpdateMs + this._monitorStateUpdateTimeoutMs > this._time.milliseconds() ? this._numValidSnapshotWindows : -1;
    }

    private int totalMonitoredSnapshotWindows() {
        return this._latestStateUpdateMs + this._monitorStateUpdateTimeoutMs > this._time.milliseconds() ? this._totalMonitoredSnapshotWindows : -1;
    }

    private double monitoredPartitionsPercentage() {
        return this._latestStateUpdateMs + this._monitorStateUpdateTimeoutMs > this._time.milliseconds() ? this._monitoredPartitionsPercentage : 0.0;
    }

    public long lastUpdateMs() {
        return this._latestStateUpdateMs;
    }

    private int numPartitionsWithExtrapolations() {
        return this._latestStateUpdateMs + this._monitorStateUpdateTimeoutMs > this._time.milliseconds() ? this._numPartitionsWithExtrapolations : -1;
    }

    private double getMonitoredPartitionsPercentage() {
        MetricSampleAggregationResult<String, PartitionEntity> metricSampleAggregationResult;
        MetadataClient.ClusterAndGeneration clusterAndGeneration = this._metadataClient.refreshMetadata();
        Cluster kafkaCluster = clusterAndGeneration.cluster();
        try {
            metricSampleAggregationResult = this._partitionMetricSampleAggregator.aggregate(kafkaCluster, this._time.milliseconds(), new OperationProgress());
        }
        catch (NotEnoughValidWindowsException e) {
            LOG.debug("Not enough valid windows to get monitored partitions. {}", (Object)e.getMessage());
            return 0.0;
        }
        Map partitionLoads = metricSampleAggregationResult.valuesAndExtrapolations();
        AtomicInteger numPartitionsWithExtrapolations = new AtomicInteger(0);
        partitionLoads.values().forEach(valuesAndExtrapolations -> {
            if (!valuesAndExtrapolations.extrapolations().isEmpty()) {
                numPartitionsWithExtrapolations.incrementAndGet();
            }
        });
        this._numPartitionsWithExtrapolations = numPartitionsWithExtrapolations.get();
        this._totalNumPartitions = MonitorUtils.totalNumPartitions(kafkaCluster);
        return this._totalNumPartitions > 0 ? (double)metricSampleAggregationResult.completeness().validEntityRatio() : 0.0;
    }

    public class AutoCloseableSemaphore
    implements AutoCloseable {
        private final AtomicBoolean _closed = new AtomicBoolean(false);

        @Override
        public void close() {
            if (this._closed.compareAndSet(false, true)) {
                LoadMonitor.this._clusterModelSemaphore.release();
                LoadMonitor.this._acquiredClusterModelSemaphore.set(false);
            }
        }
    }

    private class PartitionMetricSampleAggregatorCleaner
    implements Runnable {
        static final long CHECK_INTERVAL_MS = 37500L;
        static final short REFRESH_LIMIT = 8;
        private final Set<String> _allTopics = new HashSet<String>();
        private int _refreshCount = 0;

        private PartitionMetricSampleAggregatorCleaner() {
        }

        @Override
        public void run() {
            this._allTopics.addAll(LoadMonitor.this._metadataClient.refreshMetadata().cluster().topics());
            ++this._refreshCount;
            if (this._refreshCount % 8 == 0) {
                LoadMonitor.this._partitionMetricSampleAggregator.retainEntityGroup(this._allTopics);
                this._allTopics.clear();
            }
        }
    }

    private class SensorUpdater
    implements Runnable {
        private SensorUpdater() {
        }

        @Override
        public void run() {
            try {
                MetadataClient.ClusterAndGeneration clusterAndGeneration = LoadMonitor.this._metadataClient.clusterAndGeneration();
                double minMonitoredPartitionsPercentage = LoadMonitor.this._defaultModelCompletenessRequirements.minMonitoredPartitionsPercentage();
                LoadMonitor.this._numValidSnapshotWindows = LoadMonitor.this._partitionMetricSampleAggregator.validWindows(clusterAndGeneration.cluster(), minMonitoredPartitionsPercentage).size();
                LoadMonitor.this._monitoredPartitionsPercentage = LoadMonitor.this.getMonitoredPartitionsPercentage();
                LoadMonitor.this._totalMonitoredSnapshotWindows = LoadMonitor.this._partitionMetricSampleAggregator.allWindows().size();
                LoadMonitor.this._latestStateUpdateMs = LoadMonitor.this._time.milliseconds();
            }
            catch (Throwable t) {
                LOG.warn("Load monitor sensor updater received exception ", t);
            }
        }
    }
}

