/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.monitor.sampling;

import com.linkedin.cruisecontrol.metricdef.MetricDef;
import com.linkedin.cruisecontrol.metricdef.MetricInfo;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.UnknownVersionException;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.MetricSerde;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType;
import com.linkedin.kafka.cruisecontrol.model.ModelUtils;
import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerLoad;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerMetricSample;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionMetricSample;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SamplingUtils {
    private static final Logger LOG = LoggerFactory.getLogger(SamplingUtils.class);
    private static final String SKIP_BUILDING_SAMPLE_PREFIX = "Skip generating metric sample for ";
    public static final int UNRECOGNIZED_BROKER_ID = -1;
    public static final Random RANDOM = new Random();

    private SamplingUtils() {
    }

    static Map<Integer, Map<String, Integer>> leaderDistribution(Cluster cluster) {
        List clusterNodes = cluster.nodes();
        HashMap<Integer, Map<String, Integer>> stats = new HashMap<Integer, Map<String, Integer>>(clusterNodes.size());
        for (Node node : clusterNodes) {
            HashMap numLeadersByTopic = new HashMap();
            stats.put(node.id(), numLeadersByTopic);
            cluster.partitionsForNode(node.id()).forEach(partitionInfo -> numLeadersByTopic.merge(partitionInfo.topic(), 1, Integer::sum));
        }
        return stats;
    }

    private static Double estimateLeaderCpuUtil(PartitionMetricSample pms, BrokerLoad brokerLoad, MetricDef commonMetricDef, short numCpuCores) {
        double partitionBytesInRate = pms.metricValue(commonMetricDef.metricInfo(KafkaMetricDef.LEADER_BYTES_IN.name()).id());
        double partitionBytesOutRate = pms.metricValue(commonMetricDef.metricInfo(KafkaMetricDef.LEADER_BYTES_OUT.name()).id());
        double partitionReplicationBytesOutRate = pms.metricValue(commonMetricDef.metricInfo(KafkaMetricDef.REPLICATION_BYTES_OUT_RATE.name()).id());
        double brokerTotalBytesOut = brokerLoad.brokerMetric(RawMetricType.ALL_TOPIC_BYTES_OUT) + brokerLoad.brokerMetric(RawMetricType.ALL_TOPIC_REPLICATION_BYTES_OUT);
        Double estimatedLeaderCpuUtilPerCore = ModelUtils.estimateLeaderCpuUtilPerCore(brokerLoad.brokerMetric(RawMetricType.BROKER_CPU_UTIL), brokerLoad.brokerMetric(RawMetricType.ALL_TOPIC_BYTES_IN), brokerTotalBytesOut, brokerLoad.brokerMetric(RawMetricType.ALL_TOPIC_REPLICATION_BYTES_IN), partitionBytesInRate, partitionBytesOutRate + partitionReplicationBytesOutRate);
        return estimatedLeaderCpuUtilPerCore != null ? Double.valueOf((double)numCpuCores * estimatedLeaderCpuUtilPerCore) : null;
    }

    static double estimateLeaderCpuUtilUsingLinearRegressionModel(PartitionMetricSample metricSample) {
        List<Short> cpuId = KafkaMetricDef.resourceToMetricIds(Resource.CPU);
        List<Short> networkOutId = KafkaMetricDef.resourceToMetricIds(Resource.NW_OUT);
        Double cpuUtilization = SamplingUtils.sumOfMetrics(metricSample, cpuId);
        Double partitionBytesOutRate = SamplingUtils.sumOfMetrics(metricSample, networkOutId);
        return ModelUtils.estimateLeaderCpuUtilUsingLinearRegressionModel(cpuUtilization, partitionBytesOutRate);
    }

    private static Double sumOfMetrics(PartitionMetricSample metricSample, Collection<Short> metricIds) {
        double result = 0.0;
        for (short id : metricIds) {
            result += metricSample.metricValue(id).doubleValue();
        }
        return result;
    }

    private static TopicPartition partitionHandleDotInTopicName(TopicPartition tp) {
        return !tp.topic().contains(".") ? tp : new TopicPartition(SamplingUtils.replaceDotsWithUnderscores(tp.topic()), tp.partition());
    }

    public static String replaceDotsWithUnderscores(String stringWithDots) {
        return !stringWithDots.contains(".") ? stringWithDots : stringWithDots.replace('.', '_');
    }

    static PartitionMetricSample buildPartitionMetricSample(Cluster cluster, Map<Integer, Map<String, Integer>> leaderDistribution, TopicPartition tpDotNotHandled, Map<Integer, BrokerLoad> brokerLoadById, long maxMetricTimestamp, Map<Integer, Short> cachedNumCoresByBroker, Map<Integer, Integer> skippedPartitionByBroker) {
        Node leaderNode = cluster.leaderFor(tpDotNotHandled);
        if (leaderNode == null) {
            LOG.trace("Partition {} has no current leader.", (Object)tpDotNotHandled);
            skippedPartitionByBroker.merge(-1, 1, Integer::sum);
            return null;
        }
        int leaderId = leaderNode.id();
        BrokerLoad brokerLoad = brokerLoadById.get(leaderId);
        TopicPartition tpWithDotHandled = SamplingUtils.partitionHandleDotInTopicName(tpDotNotHandled);
        if (SamplingUtils.skipBuildingPartitionMetricSample(tpDotNotHandled, tpWithDotHandled, leaderId, brokerLoad, cachedNumCoresByBroker)) {
            skippedPartitionByBroker.merge(leaderId, 1, Integer::sum);
            return null;
        }
        MetricDef commonMetricDef = KafkaMetricDef.commonMetricDef();
        PartitionMetricSample pms = new PartitionMetricSample(leaderId, tpDotNotHandled);
        int numLeaders = leaderDistribution.get(leaderId).get(tpDotNotHandled.topic());
        for (RawMetricType rawMetricType : RawMetricType.topicMetricTypes()) {
            double sampleValue = numLeaders == 0 ? 0.0 : brokerLoad.topicMetrics(tpWithDotHandled.topic(), rawMetricType) / (double)numLeaders;
            MetricInfo metricInfo = commonMetricDef.metricInfo(KafkaMetricDef.forRawMetricType(rawMetricType).name());
            pms.record(metricInfo, sampleValue);
        }
        Double partitionSize = brokerLoad.partitionMetric(tpWithDotHandled.topic(), tpWithDotHandled.partition(), RawMetricType.PARTITION_SIZE);
        if (partitionSize == null) {
            skippedPartitionByBroker.merge(leaderId, 1, Integer::sum);
            return null;
        }
        pms.record(commonMetricDef.metricInfo(KafkaMetricDef.DISK_USAGE.name()), partitionSize);
        Double estimatedLeaderCpuUtil = SamplingUtils.estimateLeaderCpuUtil(pms, brokerLoad, commonMetricDef, cachedNumCoresByBroker.get(leaderId));
        if (estimatedLeaderCpuUtil == null) {
            skippedPartitionByBroker.merge(leaderId, 1, Integer::sum);
            return null;
        }
        pms.record(commonMetricDef.metricInfo(KafkaMetricDef.CPU_USAGE.name()), estimatedLeaderCpuUtil);
        pms.close(maxMetricTimestamp);
        return pms;
    }

    static BrokerMetricSample buildBrokerMetricSample(Node node, Map<Integer, BrokerLoad> brokerLoadById, long maxMetricTimestamp) throws UnknownVersionException {
        BrokerLoad brokerLoad = brokerLoadById.get(node.id());
        if (SamplingUtils.skipBuildingBrokerMetricSample(brokerLoad, node.id())) {
            return null;
        }
        MetricDef brokerMetricDef = KafkaMetricDef.brokerMetricDef();
        BrokerMetricSample bms = new BrokerMetricSample(node.host(), node.id(), brokerLoad.brokerSampleDeserializationVersion());
        for (Map.Entry entry : RawMetricType.brokerMetricTypesDiffByVersion().entrySet()) {
            for (RawMetricType rawBrokerMetricType : (Set)entry.getValue()) {
                if (!brokerLoad.brokerMetricAvailable(rawBrokerMetricType)) {
                    LOG.warn("{}broker {} because it does not have {} metrics (serde version {}) or the metrics are inconsistent.", new Object[]{SKIP_BUILDING_SAMPLE_PREFIX, node.id(), rawBrokerMetricType, entry.getKey()});
                    return null;
                }
                MetricInfo metricInfo = brokerMetricDef.metricInfo(KafkaMetricDef.forRawMetricType(rawBrokerMetricType).name());
                double metricValue = brokerLoad.brokerMetric(rawBrokerMetricType);
                bms.record(metricInfo, metricValue);
            }
        }
        bms.record(brokerMetricDef.metricInfo(KafkaMetricDef.DISK_USAGE.name()), brokerLoad.diskUsage());
        bms.close(maxMetricTimestamp);
        return bms;
    }

    private static boolean skipBuildingPartitionMetricSample(TopicPartition tpDotNotHandled, TopicPartition tpWithDotHandled, int leaderId, BrokerLoad brokerLoad, Map<Integer, Short> cachedNumCoresByBroker) {
        if (brokerLoad == null || !brokerLoad.brokerMetricAvailable(RawMetricType.BROKER_CPU_UTIL)) {
            LOG.debug("{}partition {} because {} metric for broker {} is unavailable.", new Object[]{SKIP_BUILDING_SAMPLE_PREFIX, tpDotNotHandled, RawMetricType.BROKER_CPU_UTIL, leaderId});
            return true;
        }
        if (cachedNumCoresByBroker.get(leaderId) == null) {
            LOG.debug("{}partition {} because the number of CPU cores of its leader broker {} is unavailable. Please ensure that either the broker capacity config resolver provides the number of CPU cores without estimation or allow CPU capacity estimation during sampling (i.e. set {} to true).", new Object[]{SKIP_BUILDING_SAMPLE_PREFIX, tpDotNotHandled, leaderId, "sampling.allow.cpu.capacity.estimation"});
            return true;
        }
        if (!brokerLoad.allDotHandledTopicMetricsAvailable(tpWithDotHandled.topic())) {
            LOG.debug("{}partition {} because broker {} has no metric or topic metrics are not available", new Object[]{SKIP_BUILDING_SAMPLE_PREFIX, tpDotNotHandled, leaderId});
            return true;
        }
        return false;
    }

    private static boolean skipBuildingBrokerMetricSample(BrokerLoad brokerLoad, int brokerId) {
        if (brokerLoad == null) {
            LOG.warn("{}broker {} because all broker metrics are missing.", (Object)SKIP_BUILDING_SAMPLE_PREFIX, (Object)brokerId);
            return true;
        }
        if (!brokerLoad.minRequiredBrokerMetricsAvailable()) {
            if (brokerLoad.missingBrokerMetricsInMinSupportedVersion().size() == 0) {
                LOG.warn("{}broker {} because there are not enough topic metrics to generate broker metrics.", (Object)SKIP_BUILDING_SAMPLE_PREFIX, (Object)brokerId);
            } else {
                LOG.warn("{}broker {} because the following required metrics are missing {}.", new Object[]{SKIP_BUILDING_SAMPLE_PREFIX, brokerId, brokerLoad.missingBrokerMetricsInMinSupportedVersion()});
            }
            return true;
        }
        return false;
    }

    public static Consumer<String, CruiseControlMetric> createMetricConsumer(Map<String, ?> configs, String clientIdPrefix) {
        String bootstrapServers = (String)configs.get("metric.reporter.sampler.bootstrap.servers");
        if (bootstrapServers == null) {
            bootstrapServers = SamplingUtils.bootstrapServers(configs);
        }
        long randomToken = RANDOM.nextLong();
        Properties consumerProps = new Properties();
        consumerProps.putAll(configs);
        consumerProps.setProperty("bootstrap.servers", bootstrapServers);
        consumerProps.setProperty("client.id", clientIdPrefix + "-consumer-" + randomToken);
        consumerProps.setProperty("auto.offset.reset", "latest");
        consumerProps.setProperty("enable.auto.commit", "false");
        consumerProps.setProperty("max.poll.records", Integer.toString(Integer.MAX_VALUE));
        consumerProps.setProperty("key.deserializer", StringDeserializer.class.getName());
        consumerProps.setProperty("value.deserializer", MetricSerde.class.getName());
        consumerProps.setProperty("reconnect.backoff.ms", configs.get("reconnect.backoff.ms").toString());
        return new KafkaConsumer(consumerProps);
    }

    public static String bootstrapServers(Map<String, ?> config) {
        return String.join((CharSequence)",", (List)config.get("bootstrap.servers"));
    }

    public static KafkaConsumer<byte[], byte[]> createSampleStoreConsumer(Map<String, ?> configs, String clientIdPrefix) {
        long randomToken = RANDOM.nextLong();
        Properties consumerProps = new Properties();
        consumerProps.putAll(configs);
        consumerProps.setProperty("bootstrap.servers", SamplingUtils.bootstrapServers(configs));
        consumerProps.setProperty("client.id", clientIdPrefix + "-consumer-" + randomToken);
        consumerProps.setProperty("auto.offset.reset", "earliest");
        consumerProps.setProperty("enable.auto.commit", "false");
        consumerProps.setProperty("max.poll.records", Integer.toString(Integer.MAX_VALUE));
        consumerProps.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
        consumerProps.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
        consumerProps.setProperty("reconnect.backoff.ms", configs.get("reconnect.backoff.ms").toString());
        return new KafkaConsumer(consumerProps);
    }
}

