package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.constants.AnomalyDetectorConfig;
import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.protocol.Errors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/DiskFailureDetector.class */
public class DiskFailureDetector extends AbstractAnomalyDetector implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(DiskFailureDetector.class);
    public static final String FAILED_DISKS_OBJECT_CONFIG = "failed.disks.object";
    private final AdminClient _adminClient;
    private int _lastCheckedClusterGeneration;
    private final KafkaCruiseControlConfig _config;

    public DiskFailureDetector(Queue<Anomaly> queue, KafkaCruiseControl kafkaCruiseControl) {
        super(queue, kafkaCruiseControl);
        this._adminClient = kafkaCruiseControl.adminClient();
        this._lastCheckedClusterGeneration = -1;
        this._config = this._kafkaCruiseControl.config();
    }

    private AnomalyDetectionStatus getDiskFailureDetectionStatus() {
        int clusterGeneration = this._kafkaCruiseControl.loadMonitor().clusterModelGeneration().clusterGeneration();
        if (clusterGeneration == this._lastCheckedClusterGeneration) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Skipping disk failure detection because the model generation hasn't changed. Current model generation {}", this._kafkaCruiseControl.loadMonitor().clusterModelGeneration());
            }
            return AnomalyDetectionStatus.SKIP_MODEL_GENERATION_NOT_CHANGED;
        }
        this._lastCheckedClusterGeneration = clusterGeneration;
        Set<Integer> deadBrokersWithReplicas = this._kafkaCruiseControl.loadMonitor().deadBrokersWithReplicas(60000L);
        if (deadBrokersWithReplicas.isEmpty()) {
            return AnomalyDetectorUtils.getAnomalyDetectionStatus(this._kafkaCruiseControl, false);
        }
        LOG.debug("Skipping disk failure detection because there are dead broker in the cluster, dead broker: {}", deadBrokersWithReplicas);
        return AnomalyDetectionStatus.SKIP_HAS_DEAD_BROKERS;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                if (getDiskFailureDetectionStatus() != AnomalyDetectionStatus.READY) {
                    LOG.debug("Disk failure detection finished.");
                    return;
                }
                HashMap hashMap = new HashMap();
                this._adminClient.describeLogDirs((Set) this._kafkaCruiseControl.kafkaCluster().nodes().stream().mapToInt((v0) -> {
                    return v0.id();
                }).boxed().collect(Collectors.toSet())).values().forEach((num, kafkaFuture) -> {
                    try {
                        ((Map) kafkaFuture.get(this._config.getLong(ExecutorConfig.LOGDIR_RESPONSE_TIMEOUT_MS_CONFIG).longValue(), TimeUnit.MILLISECONDS)).forEach((str, logDirInfo) -> {
                            if (logDirInfo.error != Errors.NONE) {
                                hashMap.putIfAbsent(num, new HashMap());
                                ((Map) hashMap.get(num)).put(str, Long.valueOf(this._kafkaCruiseControl.timeMs()));
                            }
                        });
                    } catch (InterruptedException | ExecutionException | TimeoutException e) {
                        LOG.warn("Retrieving logdir information for broker {} encountered exception {}.", num, e);
                    }
                });
                if (!hashMap.isEmpty()) {
                    HashMap hashMap2 = new HashMap(3);
                    hashMap2.put(AnomalyDetectorUtils.KAFKA_CRUISE_CONTROL_OBJECT_CONFIG, this._kafkaCruiseControl);
                    hashMap2.put(FAILED_DISKS_OBJECT_CONFIG, hashMap);
                    hashMap2.put(AnomalyDetectorUtils.ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, Long.valueOf(this._kafkaCruiseControl.timeMs()));
                    this._anomalies.add((DiskFailures) this._config.getConfiguredInstance(AnomalyDetectorConfig.DISK_FAILURES_CLASS_CONFIG, DiskFailures.class, hashMap2));
                }
                LOG.debug("Disk failure detection finished.");
            } catch (Exception e) {
                LOG.error("Unexpected exception", e);
                LOG.debug("Disk failure detection finished.");
            }
        } catch (Throwable th) {
            LOG.debug("Disk failure detection finished.");
            throw th;
        }
    }
}
