package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.constants.AnomalyDetectorConfig;
import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.zk.BrokerIdsZNode;
import kafka.zk.KafkaZkClient;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/BrokerFailureDetector.class */
public class BrokerFailureDetector extends AbstractAnomalyDetector {
    private static final Logger LOG = LoggerFactory.getLogger(BrokerFailureDetector.class);
    public static final String FAILED_BROKERS_OBJECT_CONFIG = "failed.brokers.object";
    public static final String BROKER_FAILURES_FIXABLE_CONFIG = "broker.failures.fixable.object";
    private static final String ZK_BROKER_FAILURE_METRIC_GROUP = "CruiseControlAnomaly";
    private static final String ZK_BROKER_FAILURE_METRIC_TYPE = "BrokerFailure";
    private final String _failedBrokersZkPath;
    private final ZkClient _zkClient;
    private final KafkaZkClient _kafkaZkClient;
    private final Map<Integer, Long> _failedBrokers;
    private final short _fixableFailedBrokerCountThreshold;
    private final double _fixableFailedBrokerPercentageThreshold;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/BrokerFailureDetector$BrokerFailureListener.class */
    public class BrokerFailureListener implements IZkChildListener {
        private BrokerFailureListener() {
        }

        public void handleChildChange(String str, List<String> list) {
            BrokerFailureDetector.this.detectBrokerFailures((Set) list.stream().map(Integer::parseInt).collect(Collectors.toSet()), true);
        }
    }

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/BrokerFailureDetector$ZkStringSerializer.class */
    private static class ZkStringSerializer implements ZkSerializer {
        private ZkStringSerializer() {
        }

        public byte[] serialize(Object obj) throws ZkMarshallingError {
            return ((String) obj).getBytes(StandardCharsets.UTF_8);
        }

        public Object deserialize(byte[] bArr) throws ZkMarshallingError {
            return new String(bArr, StandardCharsets.UTF_8);
        }
    }

    public BrokerFailureDetector(Queue<Anomaly> queue, KafkaCruiseControl kafkaCruiseControl) {
        super(queue, kafkaCruiseControl);
        KafkaCruiseControlConfig config = this._kafkaCruiseControl.config();
        String string = config.getString(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG);
        boolean booleanValue = config.getBoolean(ExecutorConfig.ZOOKEEPER_SECURITY_ENABLED_CONFIG).booleanValue();
        this._zkClient = new ZkClient(new ZkConnection(string, 120000), 120000, new ZkStringSerializer());
        this._kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(string, ZK_BROKER_FAILURE_METRIC_GROUP, ZK_BROKER_FAILURE_METRIC_TYPE, booleanValue);
        this._failedBrokers = new HashMap();
        this._failedBrokersZkPath = config.getString(AnomalyDetectorConfig.FAILED_BROKERS_ZK_PATH_CONFIG);
        this._fixableFailedBrokerCountThreshold = config.getShort(AnomalyDetectorConfig.FIXABLE_FAILED_BROKER_COUNT_THRESHOLD_CONFIG).shortValue();
        this._fixableFailedBrokerPercentageThreshold = config.getDouble(AnomalyDetectorConfig.FIXABLE_FAILED_BROKER_PERCENTAGE_THRESHOLD_CONFIG).doubleValue();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startDetection() {
        try {
            this._zkClient.createPersistent(this._failedBrokersZkPath);
        } catch (ZkNodeExistsException e) {
        }
        loadPersistedFailedBrokerList();
        detectBrokerFailures(false);
        this._zkClient.subscribeChildChanges(BrokerIdsZNode.path(), new BrokerFailureListener());
    }

    private synchronized void detectBrokerFailures(Set<Integer> set, boolean z) {
        boolean updateFailedBrokers = updateFailedBrokers(set);
        if (updateFailedBrokers) {
            persistFailedBrokerList();
        }
        if (!z || updateFailedBrokers) {
            reportBrokerFailures();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void detectBrokerFailures(boolean z) {
        detectBrokerFailures(aliveBrokers(), z);
    }

    synchronized Map<Integer, Long> failedBrokers() {
        return new HashMap(this._failedBrokers);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() {
        this._zkClient.close();
        KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(this._kafkaZkClient);
    }

    private void persistFailedBrokerList() {
        this._zkClient.writeData(this._failedBrokersZkPath, failedBrokerString());
    }

    private void loadPersistedFailedBrokerList() {
        parsePersistedFailedBrokers((String) this._zkClient.readData(this._failedBrokersZkPath));
    }

    private boolean updateFailedBrokers(Set<Integer> set) {
        Set<Integer> brokersWithReplicas = this._kafkaCruiseControl.loadMonitor().brokersWithReplicas(60000L);
        brokersWithReplicas.removeAll(set);
        LOG.debug("Brokers (alive: {}, failed: {}).", set, brokersWithReplicas);
        boolean removeIf = this._failedBrokers.entrySet().removeIf(entry -> {
            return !brokersWithReplicas.contains(entry.getKey());
        });
        Iterator<Integer> it = brokersWithReplicas.iterator();
        while (it.hasNext()) {
            if (this._failedBrokers.putIfAbsent(it.next(), Long.valueOf(this._kafkaCruiseControl.timeMs())) == null) {
                removeIf = true;
            }
        }
        return removeIf;
    }

    private Set<Integer> aliveBrokers() {
        return (Set) JavaConversions.asJavaCollection(this._kafkaZkClient.getAllBrokersInCluster()).stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet());
    }

    private String failedBrokerString() {
        StringBuilder sb = new StringBuilder();
        Iterator<Map.Entry<Integer, Long>> it = this._failedBrokers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Integer, Long> next = it.next();
            sb.append(next.getKey()).append("=").append(next.getValue());
            if (it.hasNext()) {
                sb.append(",");
            }
        }
        return sb.toString();
    }

    private void parsePersistedFailedBrokers(String str) {
        if (str == null || str.isEmpty()) {
            return;
        }
        for (String str2 : str.split(",")) {
            String[] split = str2.split("=");
            if (split.length != 2) {
                throw new IllegalStateException("The persisted failed broker string cannot be parsed. The string is " + str);
            }
            this._failedBrokers.putIfAbsent(Integer.valueOf(Integer.parseInt(split[0])), Long.valueOf(Long.parseLong(split[1])));
        }
    }

    private boolean tooManyFailedBrokers(int i, int i2) {
        return i > this._fixableFailedBrokerCountThreshold || ((double) i) / ((double) (i + i2)) > this._fixableFailedBrokerPercentageThreshold;
    }

    private void reportBrokerFailures() {
        if (this._failedBrokers.isEmpty()) {
            return;
        }
        HashMap hashMap = new HashMap(4);
        hashMap.put(AnomalyDetectorUtils.KAFKA_CRUISE_CONTROL_OBJECT_CONFIG, this._kafkaCruiseControl);
        Map<Integer, Long> failedBrokers = failedBrokers();
        hashMap.put(FAILED_BROKERS_OBJECT_CONFIG, failedBrokers);
        hashMap.put(AnomalyDetectorUtils.ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, Long.valueOf(this._kafkaCruiseControl.timeMs()));
        hashMap.put(BROKER_FAILURES_FIXABLE_CONFIG, Boolean.valueOf(!tooManyFailedBrokers(failedBrokers.size(), aliveBrokers().size())));
        this._anomalies.add((BrokerFailures) this._kafkaCruiseControl.config().getConfiguredInstance(AnomalyDetectorConfig.BROKER_FAILURES_CLASS_CONFIG, BrokerFailures.class, hashMap));
    }
}
