/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.detector.AbstractAnomalyDetector;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectionStatus;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils;
import com.linkedin.kafka.cruisecontrol.detector.DiskFailures;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.protocol.Errors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DiskFailureDetector
extends AbstractAnomalyDetector
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(DiskFailureDetector.class);
    public static final String FAILED_DISKS_OBJECT_CONFIG = "failed.disks.object";
    private final AdminClient _adminClient;
    private int _lastCheckedClusterGeneration;
    private final KafkaCruiseControlConfig _config;

    public DiskFailureDetector(Queue<Anomaly> anomalies, KafkaCruiseControl kafkaCruiseControl) {
        super(anomalies, kafkaCruiseControl);
        this._adminClient = kafkaCruiseControl.adminClient();
        this._lastCheckedClusterGeneration = -1;
        this._config = this._kafkaCruiseControl.config();
    }

    private AnomalyDetectionStatus getDiskFailureDetectionStatus() {
        int currentClusterGeneration = this._kafkaCruiseControl.loadMonitor().clusterModelGeneration().clusterGeneration();
        if (currentClusterGeneration == this._lastCheckedClusterGeneration) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Skipping disk failure detection because the model generation hasn't changed. Current model generation {}", (Object)this._kafkaCruiseControl.loadMonitor().clusterModelGeneration());
            }
            return AnomalyDetectionStatus.SKIP_MODEL_GENERATION_NOT_CHANGED;
        }
        this._lastCheckedClusterGeneration = currentClusterGeneration;
        Set<Integer> deadBrokers = this._kafkaCruiseControl.loadMonitor().deadBrokersWithReplicas(60000L);
        if (!deadBrokers.isEmpty()) {
            LOG.debug("Skipping disk failure detection because there are dead broker in the cluster, dead broker: {}", deadBrokers);
            return AnomalyDetectionStatus.SKIP_HAS_DEAD_BROKERS;
        }
        return AnomalyDetectorUtils.getAnomalyDetectionStatus(this._kafkaCruiseControl, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            if (this.getDiskFailureDetectionStatus() != AnomalyDetectionStatus.READY) {
                return;
            }
            HashMap failedDisksByBroker = new HashMap();
            Set aliveBrokers = this._kafkaCruiseControl.kafkaCluster().nodes().stream().mapToInt(Node::id).boxed().collect(Collectors.toSet());
            this._adminClient.describeLogDirs(aliveBrokers).values().forEach((broker, future) -> {
                try {
                    ((Map)future.get(this._config.getLong("logdir.response.timeout.ms").longValue(), TimeUnit.MILLISECONDS)).forEach((logdir, info) -> {
                        if (info.error != Errors.NONE) {
                            failedDisksByBroker.putIfAbsent(broker, new HashMap());
                            ((Map)failedDisksByBroker.get(broker)).put(logdir, this._kafkaCruiseControl.timeMs());
                        }
                    });
                }
                catch (InterruptedException | ExecutionException | TimeoutException e) {
                    LOG.warn("Retrieving logdir information for broker {} encountered exception {}.", broker, (Object)e);
                }
            });
            if (!failedDisksByBroker.isEmpty()) {
                HashMap<String, Object> parameterConfigOverrides = new HashMap<String, Object>(3);
                parameterConfigOverrides.put("kafka.cruise.control.object", this._kafkaCruiseControl);
                parameterConfigOverrides.put(FAILED_DISKS_OBJECT_CONFIG, failedDisksByBroker);
                parameterConfigOverrides.put("anomaly.detection.time.ms.object", this._kafkaCruiseControl.timeMs());
                DiskFailures diskFailures = this._config.getConfiguredInstance("disk.failures.class", DiskFailures.class, parameterConfigOverrides);
                this._anomalies.add(diskFailures);
            }
        }
        catch (Exception e) {
            LOG.error("Unexpected exception", (Throwable)e);
        }
        finally {
            LOG.debug("Disk failure detection finished.");
        }
    }
}

