/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.detector.notifier;

import com.linkedin.cruisecontrol.detector.AnomalyType;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils;
import com.linkedin.kafka.cruisecontrol.detector.BrokerFailures;
import com.linkedin.kafka.cruisecontrol.detector.DiskFailures;
import com.linkedin.kafka.cruisecontrol.detector.GoalViolations;
import com.linkedin.kafka.cruisecontrol.detector.KafkaMetricAnomaly;
import com.linkedin.kafka.cruisecontrol.detector.MaintenanceEvent;
import com.linkedin.kafka.cruisecontrol.detector.TopicAnomaly;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotificationResult;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier;
import com.linkedin.kafka.cruisecontrol.detector.notifier.KafkaAnomalyType;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SelfHealingNotifier
implements AnomalyNotifier {
    public static final String BROKER_FAILURE_ALERT_THRESHOLD_MS_CONFIG = "broker.failure.alert.threshold.ms";
    public static final String SELF_HEALING_ENABLED_CONFIG = "self.healing.enabled";
    public static final String SELF_HEALING_BROKER_FAILURE_ENABLED_CONFIG = "self.healing.broker.failure.enabled";
    public static final String SELF_HEALING_GOAL_VIOLATION_ENABLED_CONFIG = "self.healing.goal.violation.enabled";
    public static final String SELF_HEALING_METRIC_ANOMALY_ENABLED_CONFIG = "self.healing.metric.anomaly.enabled";
    public static final String SELF_HEALING_DISK_FAILURE_ENABLED_CONFIG = "self.healing.disk.failure.enabled";
    public static final String SELF_HEALING_TOPIC_ANOMALY_ENABLED_CONFIG = "self.healing.topic.anomaly.enabled";
    public static final String SELF_HEALING_MAINTENANCE_EVENT_ENABLED_CONFIG = "self.healing.maintenance.event.enabled";
    public static final String BROKER_FAILURE_SELF_HEALING_THRESHOLD_MS_CONFIG = "broker.failure.self.healing.threshold.ms";
    static final long DEFAULT_ALERT_THRESHOLD_MS = 900000L;
    static final long DEFAULT_AUTO_FIX_THRESHOLD_MS = 1800000L;
    private static final Logger LOG = LoggerFactory.getLogger(SelfHealingNotifier.class);
    protected final Time _time;
    protected final long _notifierStartTimeMs;
    protected final Map<AnomalyType, Boolean> _selfHealingEnabled;
    protected final Map<Boolean, Map<AnomalyType, Long>> _selfHealingStateChangeTimeMs;
    protected final Map<AnomalyType, Long> _selfHealingEnabledHistoricalDurationMs;
    protected long _brokerFailureAlertThresholdMs;
    protected long _selfHealingThresholdMs;
    protected final Map<Boolean, Map<Integer, Long>> _latestFailedBrokersByAutoFixTriggered;

    public SelfHealingNotifier() {
        this((Time)new SystemTime());
    }

    SelfHealingNotifier(Time time) {
        this._time = time;
        this._notifierStartTimeMs = this._time.milliseconds();
        int numAnomalyTypes = KafkaAnomalyType.cachedValues().size();
        this._selfHealingEnabled = new HashMap<AnomalyType, Boolean>(numAnomalyTypes);
        this._selfHealingStateChangeTimeMs = new HashMap<Boolean, Map<AnomalyType, Long>>(2);
        this._selfHealingStateChangeTimeMs.put(true, new HashMap(numAnomalyTypes));
        this._selfHealingStateChangeTimeMs.put(false, new HashMap(numAnomalyTypes));
        this._selfHealingEnabledHistoricalDurationMs = new HashMap<AnomalyType, Long>(numAnomalyTypes);
        KafkaAnomalyType.cachedValues().forEach(anomalyType -> this._selfHealingEnabledHistoricalDurationMs.put((AnomalyType)anomalyType, 0L));
        this._latestFailedBrokersByAutoFixTriggered = new HashMap<Boolean, Map<Integer, Long>>(2);
        this._latestFailedBrokersByAutoFixTriggered.put(true, new HashMap());
        this._latestFailedBrokersByAutoFixTriggered.put(false, new HashMap());
    }

    @Override
    public AnomalyNotificationResult onGoalViolation(GoalViolations goalViolations) {
        boolean autoFixTriggered = this._selfHealingEnabled.get((Object)KafkaAnomalyType.GOAL_VIOLATION);
        boolean selfHealingTriggered = autoFixTriggered && !AnomalyDetectorUtils.hasUnfixableGoals(goalViolations);
        this.alert(goalViolations, selfHealingTriggered, this._time.milliseconds(), KafkaAnomalyType.GOAL_VIOLATION);
        if (autoFixTriggered) {
            if (selfHealingTriggered) {
                return AnomalyNotificationResult.fix();
            }
            LOG.warn("Skip self healing due to unfixable goals: {}", goalViolations.violatedGoalsByFixability().get(false));
        }
        return AnomalyNotificationResult.ignore();
    }

    @Override
    public AnomalyNotificationResult onMetricAnomaly(KafkaMetricAnomaly metricAnomaly) {
        boolean autoFixTriggered = this._selfHealingEnabled.get((Object)KafkaAnomalyType.METRIC_ANOMALY) != false && metricAnomaly.fixable();
        this.alert(metricAnomaly, autoFixTriggered, this._time.milliseconds(), KafkaAnomalyType.METRIC_ANOMALY);
        return autoFixTriggered ? AnomalyNotificationResult.fix() : AnomalyNotificationResult.ignore();
    }

    @Override
    public AnomalyNotificationResult onTopicAnomaly(TopicAnomaly topicAnomaly) {
        boolean autoFixTriggered = this._selfHealingEnabled.get((Object)KafkaAnomalyType.TOPIC_ANOMALY);
        this.alert(topicAnomaly, autoFixTriggered, this._time.milliseconds(), KafkaAnomalyType.TOPIC_ANOMALY);
        return autoFixTriggered ? AnomalyNotificationResult.fix() : AnomalyNotificationResult.ignore();
    }

    @Override
    public AnomalyNotificationResult onMaintenanceEvent(MaintenanceEvent maintenanceEvent) {
        boolean autoFixTriggered = this._selfHealingEnabled.get((Object)KafkaAnomalyType.MAINTENANCE_EVENT);
        this.alert(maintenanceEvent, autoFixTriggered, this._time.milliseconds(), KafkaAnomalyType.MAINTENANCE_EVENT);
        return autoFixTriggered ? AnomalyNotificationResult.fix() : AnomalyNotificationResult.ignore();
    }

    @Override
    public AnomalyNotificationResult onDiskFailure(DiskFailures diskFailures) {
        this.alert(diskFailures, this._selfHealingEnabled.get((Object)KafkaAnomalyType.DISK_FAILURE), this._time.milliseconds(), KafkaAnomalyType.DISK_FAILURE);
        return this._selfHealingEnabled.get((Object)KafkaAnomalyType.DISK_FAILURE) != false ? AnomalyNotificationResult.fix() : AnomalyNotificationResult.ignore();
    }

    @Override
    public Map<AnomalyType, Boolean> selfHealingEnabled() {
        return this._selfHealingEnabled;
    }

    @Override
    public synchronized boolean setSelfHealingFor(AnomalyType anomalyType, boolean isSelfHealingEnabled) {
        Boolean oldValue = this._selfHealingEnabled.put(anomalyType, isSelfHealingEnabled);
        this.updateSelfHealingStateChange(anomalyType, oldValue, isSelfHealingEnabled);
        return oldValue;
    }

    private void updateSelfHealingStateChange(AnomalyType anomalyType, Boolean oldValue, boolean isSelfHealingEnabled) {
        if (oldValue == null) {
            throw new IllegalStateException(String.format("No previous value is associated with %s.", anomalyType));
        }
        if (oldValue != isSelfHealingEnabled) {
            long oldStateChangeMs = this._selfHealingStateChangeTimeMs.get(oldValue).get(anomalyType);
            long newStateChangeMs = this._time.milliseconds();
            if (!isSelfHealingEnabled) {
                this._selfHealingEnabledHistoricalDurationMs.merge(anomalyType, newStateChangeMs - oldStateChangeMs, Long::sum);
            }
            this._selfHealingStateChangeTimeMs.get(isSelfHealingEnabled).put(anomalyType, newStateChangeMs);
        }
    }

    private synchronized long enabledTimeMs(AnomalyType anomalyType, long nowMs) {
        long enabledTimeMs = this._selfHealingEnabledHistoricalDurationMs.get(anomalyType);
        if (this._selfHealingEnabled.get(anomalyType).booleanValue()) {
            Long currentEnabledSelfHealingStartTime = this._selfHealingStateChangeTimeMs.get(true).get(anomalyType);
            enabledTimeMs += nowMs - (currentEnabledSelfHealingStartTime == null ? this._notifierStartTimeMs : currentEnabledSelfHealingStartTime);
        }
        return enabledTimeMs;
    }

    @Override
    public synchronized Map<AnomalyType, Float> selfHealingEnabledRatio() {
        HashMap<AnomalyType, Float> selfHealingEnabledRatio = new HashMap<AnomalyType, Float>(this._selfHealingEnabled.size());
        long nowMs = this._time.milliseconds();
        long uptimeMs = this.uptimeMs(nowMs);
        for (AnomalyType anomalyType : KafkaAnomalyType.cachedValues()) {
            long enabledTimeMs = this.enabledTimeMs(anomalyType, nowMs);
            selfHealingEnabledRatio.put(anomalyType, Float.valueOf((float)enabledTimeMs / (float)uptimeMs));
        }
        return selfHealingEnabledRatio;
    }

    private boolean hasNewFailureToAlert(BrokerFailures brokerFailures, boolean autoFixTriggered) {
        Map<Integer, Long> failedBrokers = this._latestFailedBrokersByAutoFixTriggered.get(autoFixTriggered);
        boolean containsNewAlert = false;
        for (Map.Entry<Integer, Long> entry : brokerFailures.failedBrokers().entrySet()) {
            Long failureTime = failedBrokers.get(entry.getKey());
            if (failureTime != null && failureTime.longValue() == entry.getValue().longValue()) continue;
            failedBrokers.put(entry.getKey(), entry.getValue());
            containsNewAlert = true;
        }
        return containsNewAlert;
    }

    @Override
    public AnomalyNotificationResult onBrokerFailure(BrokerFailures brokerFailures) {
        long earliestFailureTimeMs = Long.MAX_VALUE;
        for (long t : brokerFailures.failedBrokers().values()) {
            earliestFailureTimeMs = Math.min(earliestFailureTimeMs, t);
        }
        long nowMs = this._time.milliseconds();
        long alertTimeMs = earliestFailureTimeMs + this._brokerFailureAlertThresholdMs;
        long selfHealingTimeMs = earliestFailureTimeMs + this._selfHealingThresholdMs;
        AnomalyNotificationResult result = null;
        if (nowMs < alertTimeMs) {
            long delayMs = alertTimeMs - nowMs;
            result = AnomalyNotificationResult.check(delayMs);
        } else if (nowMs < selfHealingTimeMs) {
            if (this.hasNewFailureToAlert(brokerFailures, false)) {
                this.alert(brokerFailures, false, selfHealingTimeMs, KafkaAnomalyType.BROKER_FAILURE);
            }
            long delay = selfHealingTimeMs - nowMs;
            result = AnomalyNotificationResult.check(delay);
        } else {
            boolean autoFixTriggered;
            boolean bl = autoFixTriggered = this._selfHealingEnabled.get((Object)KafkaAnomalyType.BROKER_FAILURE) != false && brokerFailures.fixable();
            if (this.hasNewFailureToAlert(brokerFailures, autoFixTriggered)) {
                this.alert(brokerFailures, autoFixTriggered, selfHealingTimeMs, KafkaAnomalyType.BROKER_FAILURE);
            }
            result = autoFixTriggered ? AnomalyNotificationResult.fix() : AnomalyNotificationResult.ignore();
        }
        return result;
    }

    public void alert(Object anomaly, boolean autoFixTriggered, long selfHealingStartTime, AnomalyType anomalyType) {
        LOG.warn("{} detected {}. Self healing {}.", new Object[]{anomalyType, anomaly, this._selfHealingEnabled.get(anomalyType) != false ? String.format("start time %s", KafkaCruiseControlUtils.toDateString(selfHealingStartTime)) : "is disabled"});
        if (autoFixTriggered) {
            LOG.warn("Self-healing has been triggered.");
        }
    }

    public void configure(Map<String, ?> config) {
        String alertThreshold = (String)config.get(BROKER_FAILURE_ALERT_THRESHOLD_MS_CONFIG);
        this._brokerFailureAlertThresholdMs = alertThreshold == null ? 900000L : Long.parseLong(alertThreshold);
        String fixThreshold = (String)config.get(BROKER_FAILURE_SELF_HEALING_THRESHOLD_MS_CONFIG);
        long l = this._selfHealingThresholdMs = fixThreshold == null ? 1800000L : Long.parseLong(fixThreshold);
        if (this._brokerFailureAlertThresholdMs > this._selfHealingThresholdMs) {
            throw new IllegalArgumentException(String.format("The failure detection threshold %d cannot be larger than the auto fix threshold. %d", this._brokerFailureAlertThresholdMs, this._selfHealingThresholdMs));
        }
        String selfHealingEnabledString = (String)config.get(SELF_HEALING_ENABLED_CONFIG);
        boolean selfHealingAllEnabled = Boolean.parseBoolean(selfHealingEnabledString);
        String selfHealingBrokerFailureEnabledString = (String)config.get(SELF_HEALING_BROKER_FAILURE_ENABLED_CONFIG);
        this._selfHealingEnabled.put(KafkaAnomalyType.BROKER_FAILURE, selfHealingBrokerFailureEnabledString == null ? selfHealingAllEnabled : Boolean.parseBoolean(selfHealingBrokerFailureEnabledString));
        String selfHealingGoalViolationEnabledString = (String)config.get(SELF_HEALING_GOAL_VIOLATION_ENABLED_CONFIG);
        this._selfHealingEnabled.put(KafkaAnomalyType.GOAL_VIOLATION, selfHealingGoalViolationEnabledString == null ? selfHealingAllEnabled : Boolean.parseBoolean(selfHealingGoalViolationEnabledString));
        String selfHealingMetricAnomalyEnabledString = (String)config.get(SELF_HEALING_METRIC_ANOMALY_ENABLED_CONFIG);
        this._selfHealingEnabled.put(KafkaAnomalyType.METRIC_ANOMALY, selfHealingMetricAnomalyEnabledString == null ? selfHealingAllEnabled : Boolean.parseBoolean(selfHealingMetricAnomalyEnabledString));
        String selfHealingDiskFailuresEnabledString = (String)config.get(SELF_HEALING_DISK_FAILURE_ENABLED_CONFIG);
        this._selfHealingEnabled.put(KafkaAnomalyType.DISK_FAILURE, selfHealingDiskFailuresEnabledString == null ? selfHealingAllEnabled : Boolean.parseBoolean(selfHealingDiskFailuresEnabledString));
        String selfHealingTopicAnomalyEnabledString = (String)config.get(SELF_HEALING_TOPIC_ANOMALY_ENABLED_CONFIG);
        this._selfHealingEnabled.put(KafkaAnomalyType.TOPIC_ANOMALY, selfHealingTopicAnomalyEnabledString == null ? selfHealingAllEnabled : Boolean.parseBoolean(selfHealingTopicAnomalyEnabledString));
        String selfHealingMaintenanceEventEnabledString = (String)config.get(SELF_HEALING_MAINTENANCE_EVENT_ENABLED_CONFIG);
        this._selfHealingEnabled.put(KafkaAnomalyType.MAINTENANCE_EVENT, selfHealingMaintenanceEventEnabledString == null ? selfHealingAllEnabled : Boolean.parseBoolean(selfHealingMaintenanceEventEnabledString));
        this._selfHealingEnabled.forEach((key, value) -> this._selfHealingStateChangeTimeMs.get(value).put((AnomalyType)key, this._notifierStartTimeMs));
    }

    @Override
    public long uptimeMs(long nowMs) {
        return nowMs - this._notifierStartTimeMs;
    }
}

