package com.linkedin.kafka.cruisecontrol.detector.notifier;

import com.linkedin.cruisecontrol.detector.AnomalyType;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils;
import com.linkedin.kafka.cruisecontrol.detector.BrokerFailures;
import com.linkedin.kafka.cruisecontrol.detector.DiskFailures;
import com.linkedin.kafka.cruisecontrol.detector.GoalViolations;
import com.linkedin.kafka.cruisecontrol.detector.KafkaMetricAnomaly;
import com.linkedin.kafka.cruisecontrol.detector.TopicAnomaly;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/notifier/SelfHealingNotifier.class */
public class SelfHealingNotifier implements AnomalyNotifier {
    public static final String BROKER_FAILURE_ALERT_THRESHOLD_MS_CONFIG = "broker.failure.alert.threshold.ms";
    public static final String SELF_HEALING_ENABLED_CONFIG = "self.healing.enabled";
    public static final String SELF_HEALING_BROKER_FAILURE_ENABLED_CONFIG = "self.healing.broker.failure.enabled";
    public static final String SELF_HEALING_GOAL_VIOLATION_ENABLED_CONFIG = "self.healing.goal.violation.enabled";
    public static final String SELF_HEALING_METRIC_ANOMALY_ENABLED_CONFIG = "self.healing.metric.anomaly.enabled";
    public static final String SELF_HEALING_DISK_FAILURE_ENABLED_CONFIG = "self.healing.disk.failure.enabled";
    public static final String SELF_HEALING_TOPIC_ANOMALY_ENABLED_CONFIG = "self.healing.topic.anomaly.enabled";
    public static final String BROKER_FAILURE_SELF_HEALING_THRESHOLD_MS_CONFIG = "broker.failure.self.healing.threshold.ms";
    static final long DEFAULT_ALERT_THRESHOLD_MS = 900000;
    static final long DEFAULT_AUTO_FIX_THRESHOLD_MS = 1800000;
    private static final Logger LOG = LoggerFactory.getLogger(SelfHealingNotifier.class);
    protected final Time _time;
    protected final long _notifierStartTimeMs;
    protected final Map<AnomalyType, Boolean> _selfHealingEnabled;
    protected final Map<Boolean, Map<AnomalyType, Long>> _selfHealingStateChangeTimeMs;
    protected final Map<AnomalyType, Long> _selfHealingEnabledHistoricalDurationMs;
    protected long _brokerFailureAlertThresholdMs;
    protected long _selfHealingThresholdMs;
    protected final Map<Boolean, Map<Integer, Long>> _latestFailedBrokersByAutoFixTriggered;

    public SelfHealingNotifier() {
        this(new SystemTime());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SelfHealingNotifier(Time time) {
        this._time = time;
        this._notifierStartTimeMs = this._time.milliseconds();
        int size = KafkaAnomalyType.cachedValues().size();
        this._selfHealingEnabled = new HashMap(size);
        this._selfHealingStateChangeTimeMs = new HashMap(2);
        this._selfHealingStateChangeTimeMs.put(true, new HashMap(size));
        this._selfHealingStateChangeTimeMs.put(false, new HashMap(size));
        this._selfHealingEnabledHistoricalDurationMs = new HashMap(size);
        KafkaAnomalyType.cachedValues().forEach(kafkaAnomalyType -> {
            this._selfHealingEnabledHistoricalDurationMs.put(kafkaAnomalyType, 0L);
        });
        this._latestFailedBrokersByAutoFixTriggered = new HashMap(2);
        this._latestFailedBrokersByAutoFixTriggered.put(true, new HashMap());
        this._latestFailedBrokersByAutoFixTriggered.put(false, new HashMap());
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier
    public AnomalyNotificationResult onGoalViolation(GoalViolations goalViolations) {
        boolean booleanValue = this._selfHealingEnabled.get(KafkaAnomalyType.GOAL_VIOLATION).booleanValue();
        boolean z = booleanValue && !AnomalyDetectorUtils.hasUnfixableGoals(goalViolations);
        alert(goalViolations, z, this._time.milliseconds(), KafkaAnomalyType.GOAL_VIOLATION);
        if (booleanValue) {
            if (z) {
                return AnomalyNotificationResult.fix();
            }
            LOG.warn("Skip self healing due to unfixable goals: {}", goalViolations.violatedGoalsByFixability().get(false));
        }
        return AnomalyNotificationResult.ignore();
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier
    public AnomalyNotificationResult onMetricAnomaly(KafkaMetricAnomaly kafkaMetricAnomaly) {
        boolean z = this._selfHealingEnabled.get(KafkaAnomalyType.METRIC_ANOMALY).booleanValue() && kafkaMetricAnomaly.fixable();
        alert(kafkaMetricAnomaly, z, this._time.milliseconds(), KafkaAnomalyType.METRIC_ANOMALY);
        return z ? AnomalyNotificationResult.fix() : AnomalyNotificationResult.ignore();
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier
    public AnomalyNotificationResult onTopicAnomaly(TopicAnomaly topicAnomaly) {
        boolean booleanValue = this._selfHealingEnabled.get(KafkaAnomalyType.TOPIC_ANOMALY).booleanValue();
        alert(topicAnomaly, booleanValue, this._time.milliseconds(), KafkaAnomalyType.TOPIC_ANOMALY);
        return booleanValue ? AnomalyNotificationResult.fix() : AnomalyNotificationResult.ignore();
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier
    public AnomalyNotificationResult onDiskFailure(DiskFailures diskFailures) {
        alert(diskFailures, this._selfHealingEnabled.get(KafkaAnomalyType.DISK_FAILURE).booleanValue(), this._time.milliseconds(), KafkaAnomalyType.DISK_FAILURE);
        return this._selfHealingEnabled.get(KafkaAnomalyType.DISK_FAILURE).booleanValue() ? AnomalyNotificationResult.fix() : AnomalyNotificationResult.ignore();
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier
    public Map<AnomalyType, Boolean> selfHealingEnabled() {
        return this._selfHealingEnabled;
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier
    public synchronized boolean setSelfHealingFor(AnomalyType anomalyType, boolean z) {
        Boolean put = this._selfHealingEnabled.put(anomalyType, Boolean.valueOf(z));
        updateSelfHealingStateChange(anomalyType, put, z);
        return put.booleanValue();
    }

    private void updateSelfHealingStateChange(AnomalyType anomalyType, Boolean bool, boolean z) {
        if (bool == null) {
            throw new IllegalStateException(String.format("No previous value is associated with %s.", anomalyType));
        }
        if (bool.booleanValue() != z) {
            long longValue = this._selfHealingStateChangeTimeMs.get(bool).get(anomalyType).longValue();
            long milliseconds = this._time.milliseconds();
            if (!z) {
                this._selfHealingEnabledHistoricalDurationMs.merge(anomalyType, Long.valueOf(milliseconds - longValue), (v0, v1) -> {
                    return Long.sum(v0, v1);
                });
            }
            this._selfHealingStateChangeTimeMs.get(Boolean.valueOf(z)).put(anomalyType, Long.valueOf(milliseconds));
        }
    }

    private synchronized long enabledTimeMs(AnomalyType anomalyType, long j) {
        long longValue = this._selfHealingEnabledHistoricalDurationMs.get(anomalyType).longValue();
        if (this._selfHealingEnabled.get(anomalyType).booleanValue()) {
            Long l = this._selfHealingStateChangeTimeMs.get(true).get(anomalyType);
            longValue += j - (l == null ? this._notifierStartTimeMs : l.longValue());
        }
        return longValue;
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier
    public synchronized Map<AnomalyType, Float> selfHealingEnabledRatio() {
        HashMap hashMap = new HashMap(this._selfHealingEnabled.size());
        long milliseconds = this._time.milliseconds();
        long uptimeMs = uptimeMs(milliseconds);
        for (KafkaAnomalyType kafkaAnomalyType : KafkaAnomalyType.cachedValues()) {
            hashMap.put(kafkaAnomalyType, Float.valueOf(((float) enabledTimeMs(kafkaAnomalyType, milliseconds)) / ((float) uptimeMs)));
        }
        return hashMap;
    }

    private boolean hasNewFailureToAlert(BrokerFailures brokerFailures, boolean z) {
        Map<Integer, Long> map = this._latestFailedBrokersByAutoFixTriggered.get(Boolean.valueOf(z));
        boolean z2 = false;
        for (Map.Entry<Integer, Long> entry : brokerFailures.failedBrokers().entrySet()) {
            Long l = map.get(entry.getKey());
            if (l == null || l.longValue() != entry.getValue().longValue()) {
                map.put(entry.getKey(), entry.getValue());
                z2 = true;
            }
        }
        return z2;
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier
    public AnomalyNotificationResult onBrokerFailure(BrokerFailures brokerFailures) {
        AnomalyNotificationResult fix;
        long j = Long.MAX_VALUE;
        Iterator<Long> it = brokerFailures.failedBrokers().values().iterator();
        while (it.hasNext()) {
            j = Math.min(j, it.next().longValue());
        }
        long milliseconds = this._time.milliseconds();
        long j2 = j + this._brokerFailureAlertThresholdMs;
        long j3 = j + this._selfHealingThresholdMs;
        if (milliseconds < j2) {
            fix = AnomalyNotificationResult.check(j2 - milliseconds);
        } else if (milliseconds < j3) {
            if (hasNewFailureToAlert(brokerFailures, false)) {
                alert(brokerFailures, false, j3, KafkaAnomalyType.BROKER_FAILURE);
            }
            fix = AnomalyNotificationResult.check(j3 - milliseconds);
        } else {
            boolean z = this._selfHealingEnabled.get(KafkaAnomalyType.BROKER_FAILURE).booleanValue() && brokerFailures.fixable();
            if (hasNewFailureToAlert(brokerFailures, z)) {
                alert(brokerFailures, z, j3, KafkaAnomalyType.BROKER_FAILURE);
            }
            fix = z ? AnomalyNotificationResult.fix() : AnomalyNotificationResult.ignore();
        }
        return fix;
    }

    public void alert(Object obj, boolean z, long j, AnomalyType anomalyType) {
        Logger logger = LOG;
        Object[] objArr = new Object[3];
        objArr[0] = anomalyType;
        objArr[1] = obj;
        objArr[2] = this._selfHealingEnabled.get(anomalyType).booleanValue() ? String.format("start time %s", KafkaCruiseControlUtils.toDateString(j)) : "is disabled";
        logger.warn("{} detected {}. Self healing {}.", objArr);
        if (z) {
            LOG.warn("Self-healing has been triggered.");
        }
    }

    public void configure(Map<String, ?> map) {
        String str = (String) map.get(BROKER_FAILURE_ALERT_THRESHOLD_MS_CONFIG);
        this._brokerFailureAlertThresholdMs = str == null ? 900000L : Long.parseLong(str);
        String str2 = (String) map.get(BROKER_FAILURE_SELF_HEALING_THRESHOLD_MS_CONFIG);
        this._selfHealingThresholdMs = str2 == null ? DEFAULT_AUTO_FIX_THRESHOLD_MS : Long.parseLong(str2);
        if (this._brokerFailureAlertThresholdMs > this._selfHealingThresholdMs) {
            throw new IllegalArgumentException(String.format("The failure detection threshold %d cannot be larger than the auto fix threshold. %d", Long.valueOf(this._brokerFailureAlertThresholdMs), Long.valueOf(this._selfHealingThresholdMs)));
        }
        boolean parseBoolean = Boolean.parseBoolean((String) map.get(SELF_HEALING_ENABLED_CONFIG));
        String str3 = (String) map.get(SELF_HEALING_BROKER_FAILURE_ENABLED_CONFIG);
        this._selfHealingEnabled.put(KafkaAnomalyType.BROKER_FAILURE, Boolean.valueOf(str3 == null ? parseBoolean : Boolean.parseBoolean(str3)));
        String str4 = (String) map.get(SELF_HEALING_GOAL_VIOLATION_ENABLED_CONFIG);
        this._selfHealingEnabled.put(KafkaAnomalyType.GOAL_VIOLATION, Boolean.valueOf(str4 == null ? parseBoolean : Boolean.parseBoolean(str4)));
        String str5 = (String) map.get(SELF_HEALING_METRIC_ANOMALY_ENABLED_CONFIG);
        this._selfHealingEnabled.put(KafkaAnomalyType.METRIC_ANOMALY, Boolean.valueOf(str5 == null ? parseBoolean : Boolean.parseBoolean(str5)));
        String str6 = (String) map.get(SELF_HEALING_DISK_FAILURE_ENABLED_CONFIG);
        this._selfHealingEnabled.put(KafkaAnomalyType.DISK_FAILURE, Boolean.valueOf(str6 == null ? parseBoolean : Boolean.parseBoolean(str6)));
        String str7 = (String) map.get(SELF_HEALING_TOPIC_ANOMALY_ENABLED_CONFIG);
        this._selfHealingEnabled.put(KafkaAnomalyType.TOPIC_ANOMALY, Boolean.valueOf(str7 == null ? parseBoolean : Boolean.parseBoolean(str7)));
        this._selfHealingEnabled.forEach((anomalyType, bool) -> {
            this._selfHealingStateChangeTimeMs.get(bool).put(anomalyType, Long.valueOf(this._notifierStartTimeMs));
        });
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier
    public long uptimeMs(long j) {
        return j - this._notifierStartTimeMs;
    }
}
