/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig;
import com.linkedin.kafka.cruisecontrol.detector.AbstractAnomalyDetector;
import com.linkedin.kafka.cruisecontrol.detector.BrokerFailures;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.cluster.Broker;
import kafka.zk.BrokerIdsZNode;
import kafka.zk.KafkaZkClient;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkConnection;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Iterable;
import scala.collection.JavaConversions;

public class BrokerFailureDetector
extends AbstractAnomalyDetector {
    private static final Logger LOG = LoggerFactory.getLogger(BrokerFailureDetector.class);
    public static final String FAILED_BROKERS_OBJECT_CONFIG = "failed.brokers.object";
    public static final String BROKER_FAILURES_FIXABLE_CONFIG = "broker.failures.fixable.object";
    private static final String ZK_BROKER_FAILURE_METRIC_GROUP = "CruiseControlAnomaly";
    private static final String ZK_BROKER_FAILURE_METRIC_TYPE = "BrokerFailure";
    private final String _failedBrokersZkPath;
    private final ZkClient _zkClient;
    private final KafkaZkClient _kafkaZkClient;
    private final Map<Integer, Long> _failedBrokers;
    private final short _fixableFailedBrokerCountThreshold;
    private final double _fixableFailedBrokerPercentageThreshold;

    public BrokerFailureDetector(Queue<Anomaly> anomalies, KafkaCruiseControl kafkaCruiseControl) {
        super(anomalies, kafkaCruiseControl);
        KafkaCruiseControlConfig config = this._kafkaCruiseControl.config();
        String zkUrl = config.getString(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG);
        boolean zkSecurityEnabled = config.getBoolean("zookeeper.security.enabled");
        ZkConnection zkConnection = new ZkConnection(zkUrl, 120000);
        this._zkClient = new ZkClient((IZkConnection)zkConnection, 120000, (ZkSerializer)new ZkStringSerializer());
        this._kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zkUrl, ZK_BROKER_FAILURE_METRIC_GROUP, ZK_BROKER_FAILURE_METRIC_TYPE, zkSecurityEnabled);
        this._failedBrokers = new HashMap<Integer, Long>();
        this._failedBrokersZkPath = config.getString("failed.brokers.zk.path");
        this._fixableFailedBrokerCountThreshold = config.getShort("fixable.failed.broker.count.threshold");
        this._fixableFailedBrokerPercentageThreshold = config.getDouble("fixable.failed.broker.percentage.threshold");
    }

    void startDetection() {
        try {
            this._zkClient.createPersistent(this._failedBrokersZkPath);
        }
        catch (ZkNodeExistsException zkNodeExistsException) {
            // empty catch block
        }
        this.loadPersistedFailedBrokerList();
        this.detectBrokerFailures(false);
        this._zkClient.subscribeChildChanges(BrokerIdsZNode.path(), (IZkChildListener)new BrokerFailureListener());
    }

    private synchronized void detectBrokerFailures(Set<Integer> aliveBrokers, boolean skipReportingIfNotUpdated) {
        boolean updated = this.updateFailedBrokers(aliveBrokers);
        if (updated) {
            this.persistFailedBrokerList();
        }
        if (!skipReportingIfNotUpdated || updated) {
            this.reportBrokerFailures();
        }
    }

    synchronized void detectBrokerFailures(boolean skipReportingIfNotUpdated) {
        this.detectBrokerFailures(this.aliveBrokers(), skipReportingIfNotUpdated);
    }

    synchronized Map<Integer, Long> failedBrokers() {
        return new HashMap<Integer, Long>(this._failedBrokers);
    }

    void shutdown() {
        this._zkClient.close();
        KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(this._kafkaZkClient);
    }

    private void persistFailedBrokerList() {
        this._zkClient.writeData(this._failedBrokersZkPath, (Object)this.failedBrokerString());
    }

    private void loadPersistedFailedBrokerList() {
        String failedBrokerListString = (String)this._zkClient.readData(this._failedBrokersZkPath);
        this.parsePersistedFailedBrokers(failedBrokerListString);
    }

    private boolean updateFailedBrokers(Set<Integer> aliveBrokers) {
        Set<Integer> currentFailedBrokers = this._kafkaCruiseControl.loadMonitor().brokersWithReplicas(60000L);
        currentFailedBrokers.removeAll(aliveBrokers);
        LOG.debug("Brokers (alive: {}, failed: {}).", aliveBrokers, currentFailedBrokers);
        boolean updated = this._failedBrokers.entrySet().removeIf(entry -> !currentFailedBrokers.contains(entry.getKey()));
        for (Integer brokerId : currentFailedBrokers) {
            if (this._failedBrokers.putIfAbsent(brokerId, this._kafkaCruiseControl.timeMs()) != null) continue;
            updated = true;
        }
        return updated;
    }

    private Set<Integer> aliveBrokers() {
        return JavaConversions.asJavaCollection((Iterable)this._kafkaZkClient.getAllBrokersInCluster()).stream().map(Broker::id).collect(Collectors.toSet());
    }

    private String failedBrokerString() {
        StringBuilder sb = new StringBuilder();
        Iterator<Map.Entry<Integer, Long>> iter = this._failedBrokers.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<Integer, Long> entry = iter.next();
            sb.append(entry.getKey()).append("=").append(entry.getValue());
            if (!iter.hasNext()) continue;
            sb.append(",");
        }
        return sb.toString();
    }

    private void parsePersistedFailedBrokers(String failedBrokerListString) {
        if (failedBrokerListString != null && !failedBrokerListString.isEmpty()) {
            String[] entries;
            for (String entry : entries = failedBrokerListString.split(",")) {
                String[] idAndTimestamp = entry.split("=");
                if (idAndTimestamp.length != 2) {
                    throw new IllegalStateException("The persisted failed broker string cannot be parsed. The string is " + failedBrokerListString);
                }
                this._failedBrokers.putIfAbsent(Integer.parseInt(idAndTimestamp[0]), Long.parseLong(idAndTimestamp[1]));
            }
        }
    }

    private boolean tooManyFailedBrokers(int failedBrokerCount, int aliveBrokerCount) {
        return failedBrokerCount > this._fixableFailedBrokerCountThreshold || (double)failedBrokerCount / (double)(failedBrokerCount + aliveBrokerCount) > this._fixableFailedBrokerPercentageThreshold;
    }

    private void reportBrokerFailures() {
        if (!this._failedBrokers.isEmpty()) {
            HashMap<String, Object> parameterConfigOverrides = new HashMap<String, Object>(4);
            parameterConfigOverrides.put("kafka.cruise.control.object", this._kafkaCruiseControl);
            Map<Integer, Long> failedBrokers = this.failedBrokers();
            parameterConfigOverrides.put(FAILED_BROKERS_OBJECT_CONFIG, failedBrokers);
            parameterConfigOverrides.put("anomaly.detection.time.ms.object", this._kafkaCruiseControl.timeMs());
            parameterConfigOverrides.put(BROKER_FAILURES_FIXABLE_CONFIG, !this.tooManyFailedBrokers(failedBrokers.size(), this.aliveBrokers().size()));
            BrokerFailures brokerFailures = this._kafkaCruiseControl.config().getConfiguredInstance("broker.failures.class", BrokerFailures.class, parameterConfigOverrides);
            this._anomalies.add(brokerFailures);
        }
    }

    private static class ZkStringSerializer
    implements ZkSerializer {
        private ZkStringSerializer() {
        }

        public byte[] serialize(Object data) throws ZkMarshallingError {
            return ((String)data).getBytes(StandardCharsets.UTF_8);
        }

        public Object deserialize(byte[] bytes) throws ZkMarshallingError {
            return new String(bytes, StandardCharsets.UTF_8);
        }
    }

    private class BrokerFailureListener
    implements IZkChildListener {
        private BrokerFailureListener() {
        }

        public void handleChildChange(String parentPath, List<String> currentChildren) {
            BrokerFailureDetector.this.detectBrokerFailures(currentChildren.stream().map(Integer::parseInt).collect(Collectors.toSet()), true);
        }
    }
}

