/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.servlet.response;

import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.servlet.response.ClusterStats;
import com.linkedin.kafka.cruisecontrol.servlet.response.JsonResponseClass;
import com.linkedin.kafka.cruisecontrol.servlet.response.JsonResponseField;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.protocol.Errors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@JsonResponseClass
public class ClusterBrokerState {
    private static final Logger LOG = LoggerFactory.getLogger(ClusterBrokerState.class);
    @JsonResponseField
    public static final String LEADER_COUNT = "LeaderCountByBrokerId";
    @JsonResponseField
    public static final String OUT_OF_SYNC_COUNT = "OutOfSyncCountByBrokerId";
    @JsonResponseField
    public static final String REPLICA_COUNT = "ReplicaCountByBrokerId";
    @JsonResponseField
    public static final String OFFLINE_REPLICA_COUNT = "OfflineReplicaCountByBrokerId";
    @JsonResponseField
    public static final String IS_CONTROLLER = "IsController";
    @JsonResponseField
    public static final String ONLINE_LOGDIRS = "OnlineLogDirsByBrokerId";
    @JsonResponseField
    public static final String OFFLINE_LOGDIRS = "OfflineLogDirsByBrokerId";
    @JsonResponseField
    public static final String SUMMARY = "Summary";
    public static final String TIMED_OUT_LOGDIR_FLAG = "timed_out";
    protected final Map<Integer, Integer> _leaderCountByBrokerId;
    protected final Map<Integer, Integer> _outOfSyncCountByBrokerId;
    protected final Map<Integer, Integer> _replicaCountByBrokerId;
    protected final Map<Integer, Integer> _offlineReplicaCountByBrokerId;
    protected final Map<Integer, Boolean> _isControllerByBrokerId;
    protected final Map<Integer, Set<String>> _onlineLogDirsByBrokerId;
    protected final Map<Integer, Set<String>> _offlineLogDirsByBrokerId;
    protected final Cluster _kafkaCluster;
    protected final AdminClient _adminClient;
    protected final KafkaCruiseControlConfig _config;

    public ClusterBrokerState(Cluster kafkaCluster, AdminClient adminClient, KafkaCruiseControlConfig config) throws ExecutionException, InterruptedException {
        this._kafkaCluster = kafkaCluster;
        this._adminClient = adminClient;
        this._config = config;
        this._leaderCountByBrokerId = new TreeMap<Integer, Integer>();
        this._outOfSyncCountByBrokerId = new TreeMap<Integer, Integer>();
        this._replicaCountByBrokerId = new TreeMap<Integer, Integer>();
        this._offlineReplicaCountByBrokerId = new TreeMap<Integer, Integer>();
        this._isControllerByBrokerId = new TreeMap<Integer, Boolean>();
        this.populateKafkaBrokerState(this._leaderCountByBrokerId, this._outOfSyncCountByBrokerId, this._replicaCountByBrokerId, this._offlineReplicaCountByBrokerId, this._isControllerByBrokerId);
        this._onlineLogDirsByBrokerId = new TreeMap<Integer, Set<String>>();
        this._offlineLogDirsByBrokerId = new TreeMap<Integer, Set<String>>();
        this.populateKafkaBrokerLogDirState(this._onlineLogDirsByBrokerId, this._offlineLogDirsByBrokerId, this._replicaCountByBrokerId.keySet());
    }

    public Map<String, Object> getJsonStructure() {
        HashMap<String, Object> jsonMap = new HashMap<String, Object>(8);
        jsonMap.put(LEADER_COUNT, this._leaderCountByBrokerId);
        jsonMap.put(OUT_OF_SYNC_COUNT, this._outOfSyncCountByBrokerId);
        jsonMap.put(REPLICA_COUNT, this._replicaCountByBrokerId);
        jsonMap.put(OFFLINE_REPLICA_COUNT, this._offlineReplicaCountByBrokerId);
        jsonMap.put(IS_CONTROLLER, this._isControllerByBrokerId);
        jsonMap.put(ONLINE_LOGDIRS, this._onlineLogDirsByBrokerId);
        jsonMap.put(OFFLINE_LOGDIRS, this._offlineLogDirsByBrokerId);
        jsonMap.put(SUMMARY, new ClusterStats(this._kafkaCluster.topics().size(), this._replicaCountByBrokerId, this._leaderCountByBrokerId).getJsonStructure());
        return jsonMap;
    }

    protected void populateKafkaBrokerState(Map<Integer, Integer> leaderCountByBrokerId, Map<Integer, Integer> outOfSyncCountByBrokerId, Map<Integer, Integer> replicaCountByBrokerId, Map<Integer, Integer> offlineReplicaCountByBrokerId, Map<Integer, Boolean> isControllerByBrokerId) {
        for (String topic : this._kafkaCluster.topics()) {
            for (PartitionInfo partitionInfo : this._kafkaCluster.partitionsForTopic(topic)) {
                if (partitionInfo.leader() == null) continue;
                leaderCountByBrokerId.merge(partitionInfo.leader().id(), 1, Integer::sum);
                Set<Integer> replicas = Arrays.stream(partitionInfo.replicas()).map(Node::id).collect(Collectors.toSet());
                Set inSyncReplicas = Arrays.stream(partitionInfo.inSyncReplicas()).map(Node::id).collect(Collectors.toSet());
                HashSet<Integer> outOfSyncReplicas = new HashSet<Integer>(replicas);
                outOfSyncReplicas.removeAll(inSyncReplicas);
                Set<Integer> offlineReplicas = Arrays.stream(partitionInfo.offlineReplicas()).map(Node::id).collect(Collectors.toSet());
                outOfSyncReplicas.forEach(brokerId -> outOfSyncCountByBrokerId.merge((Integer)brokerId, 1, Integer::sum));
                offlineReplicas.forEach(brokerId -> offlineReplicaCountByBrokerId.merge((Integer)brokerId, 1, Integer::sum));
                replicas.forEach(brokerId -> replicaCountByBrokerId.merge((Integer)brokerId, 1, Integer::sum));
            }
        }
        for (Node node : this._kafkaCluster.nodes()) {
            int nodeId = node.id();
            if (replicaCountByBrokerId.get(nodeId) != null) continue;
            offlineReplicaCountByBrokerId.put(nodeId, 0);
            replicaCountByBrokerId.put(nodeId, 0);
            outOfSyncCountByBrokerId.put(nodeId, 0);
            leaderCountByBrokerId.put(nodeId, 0);
        }
        replicaCountByBrokerId.keySet().forEach(brokerId -> isControllerByBrokerId.put((Integer)brokerId, false));
        Node controller = this._kafkaCluster.controller();
        if (controller != null) {
            isControllerByBrokerId.put(controller.id(), true);
        }
    }

    protected void populateKafkaBrokerLogDirState(Map<Integer, Set<String>> onlineLogDirsByBrokerId, Map<Integer, Set<String>> offlineLogDirsByBrokerId, Set<Integer> brokers) throws ExecutionException, InterruptedException {
        HashSet aliveBrokers = new HashSet(brokers.size());
        this._kafkaCluster.nodes().forEach(node -> aliveBrokers.add(node.id()));
        for (Integer broker : brokers) {
            if (aliveBrokers.contains(broker)) continue;
            onlineLogDirsByBrokerId.put(broker, Collections.singleton("broker_dead"));
            offlineLogDirsByBrokerId.put(broker, Collections.singleton("broker_dead"));
        }
        Map logDirsByBrokerId = this._adminClient.describeLogDirs(aliveBrokers).values();
        for (Map.Entry entry : logDirsByBrokerId.entrySet()) {
            onlineLogDirsByBrokerId.put((Integer)entry.getKey(), new HashSet());
            offlineLogDirsByBrokerId.put((Integer)entry.getKey(), new HashSet());
            try {
                ((Map)((KafkaFuture)entry.getValue()).get(this._config.getLong("logdir.response.timeout.ms").longValue(), TimeUnit.MILLISECONDS)).forEach((key, value) -> {
                    if (value.error == Errors.NONE) {
                        ((Set)onlineLogDirsByBrokerId.get(entry.getKey())).add(key);
                    } else {
                        ((Set)offlineLogDirsByBrokerId.get(entry.getKey())).add(key);
                    }
                });
            }
            catch (TimeoutException te) {
                LOG.error("Getting log dir information for broker {} timed out.", entry.getKey());
                onlineLogDirsByBrokerId.get(entry.getKey()).add(TIMED_OUT_LOGDIR_FLAG);
                offlineLogDirsByBrokerId.get(entry.getKey()).add(TIMED_OUT_LOGDIR_FLAG);
            }
        }
    }

    public void writeBrokerSummary(StringBuilder sb) {
        new ClusterStats(this._kafkaCluster.topics().size(), this._replicaCountByBrokerId, this._leaderCountByBrokerId).writeClusterStats(sb);
        String initMessage = "Brokers:";
        sb.append(String.format("%s%n%20s%20s%20s%20s%20s%20s%n", initMessage, "BROKER", "LEADER(S)", "REPLICAS", "OUT-OF-SYNC", "OFFLINE", "IS_CONTROLLER"));
        for (Integer brokerId : this._replicaCountByBrokerId.keySet()) {
            sb.append(String.format("%20d%20d%20d%20d%20d%20s%n", brokerId, this._leaderCountByBrokerId.getOrDefault(brokerId, 0), this._replicaCountByBrokerId.getOrDefault(brokerId, 0), this._outOfSyncCountByBrokerId.getOrDefault(brokerId, 0), this._offlineReplicaCountByBrokerId.getOrDefault(brokerId, 0), this._isControllerByBrokerId.get(brokerId)));
        }
        this.writeKafkaBrokerLogDirState(sb, this._replicaCountByBrokerId.keySet());
    }

    protected void writeKafkaBrokerLogDirState(StringBuilder sb, Set<Integer> brokers) {
        int onlineLogDirsNameLength = this._onlineLogDirsByBrokerId.values().stream().mapToInt(dir -> dir.toString().length()).max().orElse(20) + 14;
        int offlineLogDirsNameLength = this._offlineLogDirsByBrokerId.values().stream().mapToInt(dir -> dir.toString().length()).max().orElse(20) + 15;
        String initMessage = "LogDirs of brokers with replicas:";
        sb.append(String.format("%n%s%n%20s%" + onlineLogDirsNameLength + "s%" + offlineLogDirsNameLength + "s%n", initMessage, "BROKER", "ONLINE-LOGDIRS", "OFFLINE-LOGDIRS"));
        for (int brokerId : brokers) {
            sb.append(String.format("%20d%" + onlineLogDirsNameLength + "s%" + offlineLogDirsNameLength + "s%n", brokerId, this._onlineLogDirsByBrokerId.get(brokerId).toString(), this._offlineLogDirsByBrokerId.get(brokerId).toString()));
        }
    }
}

