/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.monitor.sampling;

import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigResolver;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityInfo;
import com.linkedin.kafka.cruisecontrol.exception.BrokerCapacityResolutionException;
import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.UnknownVersionException;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
import com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.SamplingUtils;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerLoad;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerMetricSample;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionMetricSample;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CruiseControlMetricsProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(CruiseControlMetricsProcessor.class);
    private static final long INIT_METRIC_TIMESTAMP = -1L;
    private final Map<Integer, BrokerLoad> _brokerLoad = new HashMap<Integer, BrokerLoad>();
    private final Map<Integer, Short> _cachedNumCoresByBroker = new HashMap<Integer, Short>();
    private final BrokerCapacityConfigResolver _brokerCapacityConfigResolver;
    private final boolean _allowCpuCapacityEstimation;
    private long _maxMetricTimestamp;

    CruiseControlMetricsProcessor(BrokerCapacityConfigResolver brokerCapacityConfigResolver, boolean allowCpuCapacityEstimation) {
        this._brokerCapacityConfigResolver = brokerCapacityConfigResolver;
        this._allowCpuCapacityEstimation = allowCpuCapacityEstimation;
        this._maxMetricTimestamp = -1L;
    }

    void addMetric(CruiseControlMetric metric) {
        int brokerId = metric.brokerId();
        LOG.trace("Adding cruise control metric {}", (Object)metric);
        this._maxMetricTimestamp = Math.max(metric.time(), this._maxMetricTimestamp);
        this._brokerLoad.compute(brokerId, (bid, load) -> {
            BrokerLoad brokerLoad = load == null ? new BrokerLoad() : load;
            brokerLoad.recordMetric(metric);
            return brokerLoad;
        });
    }

    private void updateCachedNumCoresByBroker(Cluster cluster) {
        for (int brokerId : this._brokerLoad.keySet()) {
            this._cachedNumCoresByBroker.computeIfAbsent(brokerId, bid -> {
                Node node = cluster.nodeById(bid.intValue());
                if (node == null) {
                    LOG.warn("Received metrics from unrecognized broker {}.", bid);
                    return null;
                }
                try {
                    BrokerCapacityInfo capacity = this._brokerCapacityConfigResolver.capacityForBroker(MonitorUtils.getRackHandleNull(node), node.host(), (int)bid, 10000L, this._allowCpuCapacityEstimation);
                    return capacity == null ? null : Short.valueOf(capacity.numCpuCores());
                }
                catch (BrokerCapacityResolutionException | TimeoutException e) {
                    LOG.warn("Unable to get number of CPU cores for broker {}.", (Object)node.id(), (Object)e);
                    return null;
                }
            });
        }
    }

    Map<Integer, Short> cachedNumCoresByBroker() {
        return this._cachedNumCoresByBroker;
    }

    MetricSampler.Samples process(Cluster cluster, Set<TopicPartition> partitionsDotNotHandled, MetricSampler.SamplingMode samplingMode) {
        this.updateCachedNumCoresByBroker(cluster);
        this._brokerLoad.forEach((broker, load) -> load.prepareBrokerMetrics(cluster, (int)broker, this._maxMetricTimestamp));
        Map<Integer, Integer> skippedPartitionByBroker = null;
        HashSet<PartitionMetricSample> partitionMetricSamples = new HashSet<PartitionMetricSample>();
        if (samplingMode == MetricSampler.SamplingMode.ALL || samplingMode == MetricSampler.SamplingMode.PARTITION_METRICS_ONLY) {
            skippedPartitionByBroker = this.addPartitionMetricSamples(cluster, partitionsDotNotHandled, partitionMetricSamples);
        }
        int skippedBroker = 0;
        HashSet<BrokerMetricSample> brokerMetricSamples = new HashSet<BrokerMetricSample>();
        if (samplingMode == MetricSampler.SamplingMode.ALL || samplingMode == MetricSampler.SamplingMode.BROKER_METRICS_ONLY) {
            skippedBroker = this.addBrokerMetricSamples(cluster, brokerMetricSamples);
        }
        this.logProcess(samplingMode, skippedPartitionByBroker, skippedBroker, partitionMetricSamples, brokerMetricSamples);
        return new MetricSampler.Samples(partitionMetricSamples, brokerMetricSamples);
    }

    private void logProcess(MetricSampler.SamplingMode samplingMode, Map<Integer, Integer> skippedPartitionByBroker, int skippedBroker, Set<PartitionMetricSample> partitionMetricSamples, Set<BrokerMetricSample> brokerMetricSamples) {
        switch (samplingMode) {
            case ALL: {
                LOG.info("Generated {}{} partition metric samples and {}{} broker metric samples for timestamp {}.", new Object[]{partitionMetricSamples.size(), !skippedPartitionByBroker.isEmpty() ? String.format("(%s skipped by broker %s)", skippedPartitionByBroker.values().stream().mapToInt(v -> v).sum(), skippedPartitionByBroker) : "", brokerMetricSamples.size(), skippedBroker > 0 ? "(" + skippedBroker + " skipped)" : "", this._maxMetricTimestamp});
                break;
            }
            case PARTITION_METRICS_ONLY: {
                LOG.info("Generated {}{} partition metric samples for timestamp {}.", new Object[]{partitionMetricSamples.size(), !skippedPartitionByBroker.isEmpty() ? String.format("(%s skipped by broker %s)", skippedPartitionByBroker.values().stream().mapToInt(v -> v).sum(), skippedPartitionByBroker) : "", this._maxMetricTimestamp});
                break;
            }
            case BROKER_METRICS_ONLY: {
                LOG.info("Generated {}{} broker metric samples for timestamp {}.", new Object[]{brokerMetricSamples.size(), skippedBroker > 0 ? "(" + skippedBroker + " skipped)" : "", this._maxMetricTimestamp});
                break;
            }
            default: {
                throw new IllegalStateException("Unknown sampling mode " + samplingMode);
            }
        }
    }

    void clear() {
        this._brokerLoad.clear();
        this._maxMetricTimestamp = -1L;
    }

    private Map<Integer, Integer> addPartitionMetricSamples(Cluster cluster, Set<TopicPartition> partitionsDotNotHandled, Set<PartitionMetricSample> partitionMetricSamples) {
        HashMap<Integer, Integer> skippedPartitionByBroker = new HashMap<Integer, Integer>();
        Map<Integer, Map<String, Integer>> leaderDistribution = SamplingUtils.leaderDistribution(cluster);
        for (TopicPartition tpDotNotHandled : partitionsDotNotHandled) {
            try {
                PartitionMetricSample sample = SamplingUtils.buildPartitionMetricSample(cluster, leaderDistribution, tpDotNotHandled, this._brokerLoad, this._maxMetricTimestamp, this._cachedNumCoresByBroker, skippedPartitionByBroker);
                if (sample == null) continue;
                LOG.trace("Added partition metrics sample for {}.", (Object)tpDotNotHandled);
                partitionMetricSamples.add(sample);
            }
            catch (Exception e) {
                LOG.error("Error building partition metric sample for {}.", (Object)tpDotNotHandled, (Object)e);
                skippedPartitionByBroker.merge(-1, 1, Integer::sum);
            }
        }
        return skippedPartitionByBroker;
    }

    private int addBrokerMetricSamples(Cluster cluster, Set<BrokerMetricSample> brokerMetricSamples) {
        int skippedBroker = 0;
        for (Node node : cluster.nodes()) {
            try {
                BrokerMetricSample sample = SamplingUtils.buildBrokerMetricSample(node, this._brokerLoad, this._maxMetricTimestamp);
                if (sample != null) {
                    LOG.trace("Added broker metric sample for broker {}.", (Object)node.id());
                    brokerMetricSamples.add(sample);
                    continue;
                }
                ++skippedBroker;
            }
            catch (UnknownVersionException e) {
                LOG.error("Unrecognized serde version detected during broker metric sampling.", (Throwable)e);
                ++skippedBroker;
            }
            catch (Exception e) {
                LOG.error("Error building broker metric sample for {}.", (Object)node.id(), (Object)e);
                ++skippedBroker;
            }
        }
        return skippedBroker;
    }
}

