/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.monitor;

import com.linkedin.cruisecontrol.monitor.sampling.aggregator.AggregatedMetricValues;
import com.linkedin.cruisecontrol.monitor.sampling.aggregator.MetricValues;
import com.linkedin.cruisecontrol.monitor.sampling.aggregator.ValuesAndExtrapolations;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigResolver;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityInfo;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.exception.BrokerCapacityResolutionException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Disk;
import com.linkedin.kafka.cruisecontrol.model.ModelUtils;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MonitorUtils {
    public static final double UNIT_INTERVAL_TO_PERCENTAGE = 100.0;
    private static final Logger LOG = LoggerFactory.getLogger(MonitorUtils.class);
    public static final Map<Resource, Double> EMPTY_BROKER_CAPACITY;
    public static final long BROKER_CAPACITY_FETCH_TIMEOUT_MS = 10000L;

    private MonitorUtils() {
    }

    private static AggregatedMetricValues toFollowerMetricValues(AggregatedMetricValues aggregatedMetricValues) {
        AggregatedMetricValues followerLoad = new AggregatedMetricValues();
        Iterator iterator = aggregatedMetricValues.metricIds().iterator();
        while (iterator.hasNext()) {
            short metricId = (Short)iterator.next();
            String metricGroup = KafkaMetricDef.commonMetricDef().metricInfo(metricId).group();
            if (Resource.CPU.name().equals(metricGroup) || Resource.NW_OUT.name().equals(metricGroup)) continue;
            followerLoad.add(metricId, aggregatedMetricValues.valuesFor(metricId));
        }
        MetricValues followerCpu = new MetricValues(aggregatedMetricValues.length());
        MetricValues leaderBytesInRate = aggregatedMetricValues.valuesForGroup(Resource.NW_IN.name(), KafkaMetricDef.commonMetricDef(), false);
        MetricValues leaderBytesOutRate = aggregatedMetricValues.valuesForGroup(Resource.NW_OUT.name(), KafkaMetricDef.commonMetricDef(), false);
        MetricValues leaderCpuUtilization = aggregatedMetricValues.valuesFor(KafkaMetricDef.commonMetricDefId(KafkaMetricDef.CPU_USAGE));
        for (int i = 0; i < aggregatedMetricValues.length(); ++i) {
            double followerCpuUtil = ModelUtils.getFollowerCpuUtilFromLeaderLoad(leaderBytesInRate.get(i), leaderBytesOutRate.get(i), leaderCpuUtilization.get(i));
            followerCpu.set(i, followerCpuUtil);
        }
        for (short nwOutMetricId : KafkaMetricDef.resourceToMetricIds(Resource.NW_OUT)) {
            followerLoad.add(nwOutMetricId, new MetricValues(aggregatedMetricValues.length()));
        }
        followerLoad.add(KafkaMetricDef.commonMetricDefId(KafkaMetricDef.CPU_USAGE), followerCpu);
        return followerLoad;
    }

    public static boolean metadataChanged(Cluster prev, Cluster curr) {
        HashSet prevNodeSet = new HashSet(prev.nodes());
        if (prevNodeSet.size() != curr.nodes().size()) {
            return true;
        }
        prevNodeSet.removeAll(curr.nodes());
        if (!prevNodeSet.isEmpty()) {
            return true;
        }
        if (!prev.topics().equals(curr.topics())) {
            return true;
        }
        for (String topic : prev.topics()) {
            if (!prev.partitionCountForTopic(topic).equals(curr.partitionCountForTopic(topic))) {
                return true;
            }
            for (PartitionInfo prevPartInfo : prev.partitionsForTopic(topic)) {
                PartitionInfo currPartInfo;
                if (!MonitorUtils.leaderChanged(prevPartInfo, currPartInfo = curr.partition(new TopicPartition(prevPartInfo.topic(), prevPartInfo.partition()))) && !MonitorUtils.replicaListChanged(prevPartInfo, currPartInfo)) continue;
                return true;
            }
        }
        return false;
    }

    private static boolean leaderChanged(PartitionInfo prevPartInfo, PartitionInfo currPartInfo) {
        Node prevLeader = prevPartInfo.leader();
        Node currLeader = currPartInfo.leader();
        return !(prevLeader == null && currLeader == null || prevLeader != null && currLeader != null && prevLeader.id() == currLeader.id());
    }

    private static boolean replicaListChanged(PartitionInfo prevPartInfo, PartitionInfo currPartInfo) {
        if (prevPartInfo.replicas().length != currPartInfo.replicas().length) {
            return true;
        }
        for (int i = 0; i < prevPartInfo.replicas().length; ++i) {
            if (prevPartInfo.replicas()[i].id() == currPartInfo.replicas()[i].id()) continue;
            return true;
        }
        return false;
    }

    public static ModelCompletenessRequirements combineLoadRequirementOptions(Collection<Goal> goals) {
        ModelCompletenessRequirements requirements = null;
        for (Goal goal : goals) {
            requirements = goal.clusterModelCompletenessRequirements().stronger(requirements);
        }
        return requirements;
    }

    public static int totalNumPartitions(Cluster cluster) {
        int totalNumPartitions = 0;
        for (String topic : cluster.topics()) {
            totalNumPartitions += cluster.partitionCountForTopic(topic).intValue();
        }
        return totalNumPartitions;
    }

    private static void adjustCpuUsage(AggregatedMetricValues aggregatedMetricValues) {
        short cpuUsageId = KafkaMetricDef.commonMetricDefId(KafkaMetricDef.CPU_USAGE);
        MetricValues cpuUsage = aggregatedMetricValues.valuesFor(cpuUsageId);
        for (int i = 0; i < cpuUsage.length(); ++i) {
            cpuUsage.set(i, cpuUsage.get(i) * 100.0);
        }
    }

    private static AggregatedMetricValues getAggregatedMetricValues(ValuesAndExtrapolations valuesAndExtrapolations, PartitionInfo partitionInfo, boolean isLeader, boolean needToAdjustCpuUsage) {
        AggregatedMetricValues aggregatedMetricValues = valuesAndExtrapolations.metricValues();
        if (needToAdjustCpuUsage) {
            MonitorUtils.adjustCpuUsage(aggregatedMetricValues);
        }
        return isLeader ? MonitorUtils.fillInReplicationBytesOut(aggregatedMetricValues, partitionInfo) : MonitorUtils.toFollowerMetricValues(aggregatedMetricValues);
    }

    private static AggregatedMetricValues fillInReplicationBytesOut(AggregatedMetricValues aggregatedMetricValues, PartitionInfo info) {
        int numFollowers = info.replicas().length - 1;
        short leaderBytesInRateId = KafkaMetricDef.commonMetricDefId(KafkaMetricDef.LEADER_BYTES_IN);
        short replicationBytesOutRateId = KafkaMetricDef.commonMetricDefId(KafkaMetricDef.REPLICATION_BYTES_OUT_RATE);
        MetricValues leaderBytesInRate = aggregatedMetricValues.valuesFor(leaderBytesInRateId);
        MetricValues replicationBytesOutRate = aggregatedMetricValues.valuesFor(replicationBytesOutRateId);
        if (replicationBytesOutRate == null) {
            replicationBytesOutRate = new MetricValues(leaderBytesInRate.length());
            aggregatedMetricValues.add(replicationBytesOutRateId, replicationBytesOutRate);
        }
        for (int i = 0; i < leaderBytesInRate.length(); ++i) {
            replicationBytesOutRate.set(i, leaderBytesInRate.get(i) * (double)numFollowers);
        }
        return aggregatedMetricValues;
    }

    public static String getRackHandleNull(Node node) {
        return node.rack() == null || node.rack().isEmpty() ? node.host() : node.rack();
    }

    static Set<Integer> brokersWithReplicas(Cluster cluster) {
        HashSet<Integer> allBrokers = new HashSet<Integer>();
        for (String topic : cluster.topics()) {
            for (PartitionInfo partition : cluster.partitionsForTopic(topic)) {
                Arrays.stream(partition.replicas()).map(Node::id).forEach(allBrokers::add);
            }
        }
        return allBrokers;
    }

    static Set<Integer> deadBrokersWithReplicas(Cluster cluster) {
        Set<Integer> brokersWithReplicas = MonitorUtils.brokersWithReplicas(cluster);
        cluster.nodes().forEach(node -> brokersWithReplicas.remove(node.id()));
        return brokersWithReplicas;
    }

    static Set<Integer> brokersWithOfflineReplicas(Cluster cluster) {
        HashSet<Integer> brokersWithOfflineReplicas = new HashSet<Integer>();
        for (String topic : cluster.topics()) {
            for (PartitionInfo partition : cluster.partitionsForTopic(topic)) {
                if (partition.leader() == null) continue;
                brokersWithOfflineReplicas.addAll(Arrays.stream(partition.offlineReplicas()).map(Node::id).collect(Collectors.toSet()));
            }
        }
        return brokersWithOfflineReplicas;
    }

    static void setBadBrokerState(ClusterModel clusterModel, Cluster cluster) {
        MonitorUtils.deadBrokersWithReplicas(cluster).forEach(brokerId -> clusterModel.setBrokerState((int)brokerId, Broker.State.DEAD));
        for (Integer brokerId2 : MonitorUtils.brokersWithOfflineReplicas(cluster)) {
            if (!clusterModel.broker(brokerId2).isAlive()) continue;
            clusterModel.setBrokerState(brokerId2, Broker.State.BAD_DISKS);
        }
    }

    static Map<TopicPartition, Map<Integer, String>> getReplicaPlacementInfo(ClusterModel clusterModel, Cluster cluster, AdminClient adminClient, KafkaCruiseControlConfig config) {
        HashMap<TopicPartition, Map<Integer, String>> replicaPlacementInfo = new HashMap<TopicPartition, Map<Integer, String>>();
        Map logDirsByBrokerId = adminClient.describeLogDirs((Collection)cluster.nodes().stream().mapToInt(Node::id).boxed().collect(Collectors.toList())).values();
        for (Map.Entry entry : logDirsByBrokerId.entrySet()) {
            Integer brokerId = (Integer)entry.getKey();
            try {
                ((Map)((KafkaFuture)entry.getValue()).get(config.getLong("logdir.response.timeout.ms").longValue(), TimeUnit.MILLISECONDS)).forEach((logdir, info) -> {
                    if (info.error == Errors.NONE) {
                        for (Map.Entry e : info.replicaInfos.entrySet()) {
                            if (!((DescribeLogDirsResponse.ReplicaInfo)e.getValue()).isFuture) {
                                replicaPlacementInfo.putIfAbsent((TopicPartition)e.getKey(), new HashMap());
                                ((Map)replicaPlacementInfo.get(e.getKey())).put(brokerId, logdir);
                                continue;
                            }
                            LOG.trace("Topic partition {}'s replica is moving to {} on broker {}.", new Object[]{e.getKey(), logdir, brokerId});
                        }
                    } else {
                        clusterModel.broker(brokerId).disk((String)logdir).setState(Disk.State.DEAD);
                    }
                });
            }
            catch (TimeoutException te) {
                throw new RuntimeException(String.format("Getting logdir information for broker %d encountered TimeoutException %s.", entry.getKey(), te));
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(String.format("Populating logdir information for broker %d encountered Exception %s.", entry.getKey(), e));
            }
        }
        return replicaPlacementInfo;
    }

    static void populatePartitionLoad(Cluster cluster, ClusterModel clusterModel, TopicPartition tp, ValuesAndExtrapolations valuesAndExtrapolations, Map<TopicPartition, Map<Integer, String>> replicaPlacementInfo, BrokerCapacityConfigResolver brokerCapacityConfigResolver, boolean allowCapacityEstimation) throws TimeoutException {
        PartitionInfo partitionInfo = cluster.partition(tp);
        if (partitionInfo != null) {
            Set aliveBrokers = cluster.nodes().stream().mapToInt(Node::id).boxed().collect(Collectors.toSet());
            boolean needToAdjustCpuUsage = true;
            HashSet<Integer> deadBrokersWithUnknownCapacity = new HashSet<Integer>();
            for (int index = 0; index < partitionInfo.replicas().length; ++index) {
                BrokerCapacityInfo brokerCapacity;
                Node replica = partitionInfo.replicas()[index];
                String rack = MonitorUtils.getRackHandleNull(replica);
                try {
                    brokerCapacity = brokerCapacityConfigResolver.capacityForBroker(rack, replica.host(), replica.id(), 10000L, aliveBrokers.contains(replica.id()) && allowCapacityEstimation);
                }
                catch (BrokerCapacityResolutionException | TimeoutException e) {
                    if (!aliveBrokers.contains(replica.id())) {
                        brokerCapacity = new BrokerCapacityInfo(EMPTY_BROKER_CAPACITY);
                        deadBrokersWithUnknownCapacity.add(replica.id());
                    }
                    String errorMessage = String.format("Unable to retrieve capacity for broker %d. This may be caused by churn in the cluster, please retry.", replica.id());
                    LOG.warn(errorMessage, (Throwable)e);
                    throw new TimeoutException(errorMessage);
                }
                clusterModel.handleDeadBroker(rack, replica.id(), brokerCapacity);
                if (partitionInfo.leader() == null) {
                    LOG.warn("Detected offline partition {}-{}, skipping", (Object)partitionInfo.topic(), (Object)partitionInfo.partition());
                    continue;
                }
                boolean isLeader = replica.id() == partitionInfo.leader().id();
                boolean isOffline = Arrays.stream(partitionInfo.offlineReplicas()).anyMatch(offlineReplica -> offlineReplica.id() == replica.id());
                String logdir = replicaPlacementInfo == null ? null : replicaPlacementInfo.get(tp).get(replica.id());
                clusterModel.createReplica(rack, replica.id(), tp, index, isLeader, isOffline, logdir, false);
                clusterModel.setReplicaLoad(rack, replica.id(), tp, MonitorUtils.getAggregatedMetricValues(valuesAndExtrapolations, cluster.partition(tp), isLeader, needToAdjustCpuUsage), valuesAndExtrapolations.windows());
                needToAdjustCpuUsage = false;
            }
            if (!deadBrokersWithUnknownCapacity.isEmpty()) {
                LOG.info("Assign empty capacity to brokers {} because they are dead and capacity resolver is unable to fetch their capacity.", deadBrokersWithUnknownCapacity);
            }
        }
    }

    static {
        HashMap emptyBrokerCapacity = new HashMap(Resource.cachedValues().size());
        Resource.cachedValues().forEach(r -> emptyBrokerCapacity.put(r, 0.0));
        EMPTY_BROKER_CAPACITY = Collections.unmodifiableMap(emptyBrokerCapacity);
    }
}

