/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.detector;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.cruisecontrol.detector.AnomalyType;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorState;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyState;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyUtils;
import com.linkedin.kafka.cruisecontrol.detector.BrokerFailureDetector;
import com.linkedin.kafka.cruisecontrol.detector.BrokerFailures;
import com.linkedin.kafka.cruisecontrol.detector.DiskFailureDetector;
import com.linkedin.kafka.cruisecontrol.detector.DiskFailures;
import com.linkedin.kafka.cruisecontrol.detector.GoalViolationDetector;
import com.linkedin.kafka.cruisecontrol.detector.GoalViolations;
import com.linkedin.kafka.cruisecontrol.detector.KafkaMetricAnomaly;
import com.linkedin.kafka.cruisecontrol.detector.MaintenanceEvent;
import com.linkedin.kafka.cruisecontrol.detector.MaintenanceEventDetector;
import com.linkedin.kafka.cruisecontrol.detector.MetricAnomalyDetector;
import com.linkedin.kafka.cruisecontrol.detector.TopicAnomaly;
import com.linkedin.kafka.cruisecontrol.detector.TopicAnomalyDetector;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotificationResult;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier;
import com.linkedin.kafka.cruisecontrol.detector.notifier.KafkaAnomalyType;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AnomalyDetectorManager {
    static final String METRIC_REGISTRY_NAME = "AnomalyDetector";
    private static final int INIT_JITTER_BOUND = 10000;
    private static final long SCHEDULER_SHUTDOWN_TIMEOUT_MS = 5000L;
    private static final int NUM_ANOMALY_DETECTION_THREADS = KafkaAnomalyType.cachedValues().size() + 1;
    private static final int ANOMALY_QUEUE_INITIAL_CAPACITY = 10;
    private static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectorManager.class);
    private static final Logger OPERATION_LOG = LoggerFactory.getLogger((String)"operationLogger");
    private final KafkaCruiseControl _kafkaCruiseControl;
    private final AnomalyNotifier _anomalyNotifier;
    private final GoalViolationDetector _goalViolationDetector;
    private final BrokerFailureDetector _brokerFailureDetector;
    private final MetricAnomalyDetector _metricAnomalyDetector;
    private final DiskFailureDetector _diskFailureDetector;
    private final TopicAnomalyDetector _topicAnomalyDetector;
    private final MaintenanceEventDetector _maintenanceEventDetector;
    private final ScheduledExecutorService _detectorScheduler;
    private final Map<KafkaAnomalyType, Long> _anomalyDetectionIntervalMsByType;
    private final long _brokerFailureDetectionBackoffMs;
    private final PriorityBlockingQueue<Anomaly> _anomalies;
    private volatile boolean _shutdown;
    private final AnomalyDetectorState _anomalyDetectorState;
    private final List<String> _selfHealingGoals;
    private final ExecutorService _anomalyLoggerExecutor;
    private volatile Anomaly _anomalyInProgress;
    private final AtomicLong _numCheckedWithDelay;
    private final Object _shutdownLock;

    public AnomalyDetectorManager(KafkaCruiseControl kafkaCruiseControl, Time time, MetricRegistry dropwizardMetricRegistry) {
        this._anomalies = new PriorityBlockingQueue<Anomaly>(10, AnomalyDetectorUtils.anomalyComparator());
        KafkaCruiseControlConfig config = kafkaCruiseControl.config();
        Long anomalyDetectionIntervalMs = config.getLong("anomaly.detection.interval.ms");
        this._anomalyDetectionIntervalMsByType = new HashMap<KafkaAnomalyType, Long>(KafkaAnomalyType.cachedValues().size() - 2);
        Long goalViolationDetectionIntervalMs = config.getLong("goal.violation.detection.interval.ms");
        this._anomalyDetectionIntervalMsByType.put(KafkaAnomalyType.GOAL_VIOLATION, goalViolationDetectionIntervalMs == null ? anomalyDetectionIntervalMs : goalViolationDetectionIntervalMs);
        Long metricAnomalyDetectionIntervalMs = config.getLong("metric.anomaly.detection.interval.ms");
        this._anomalyDetectionIntervalMsByType.put(KafkaAnomalyType.METRIC_ANOMALY, metricAnomalyDetectionIntervalMs == null ? anomalyDetectionIntervalMs : metricAnomalyDetectionIntervalMs);
        Long topicAnomalyDetectionIntervalMs = config.getLong("topic.anomaly.detection.interval.ms");
        this._anomalyDetectionIntervalMsByType.put(KafkaAnomalyType.TOPIC_ANOMALY, topicAnomalyDetectionIntervalMs == null ? anomalyDetectionIntervalMs : topicAnomalyDetectionIntervalMs);
        Long diskFailureDetectionIntervalMs = config.getLong("disk.failure.detection.interval.ms");
        this._anomalyDetectionIntervalMsByType.put(KafkaAnomalyType.DISK_FAILURE, diskFailureDetectionIntervalMs == null ? anomalyDetectionIntervalMs : diskFailureDetectionIntervalMs);
        this._brokerFailureDetectionBackoffMs = config.getLong("broker.failure.detection.backoff.ms");
        this._anomalyNotifier = config.getConfiguredInstance("anomaly.notifier.class", AnomalyNotifier.class);
        this._kafkaCruiseControl = kafkaCruiseControl;
        this._selfHealingGoals = AnomalyDetectorUtils.getSelfHealingGoalNames(config);
        KafkaCruiseControlUtils.sanityCheckGoals(this._selfHealingGoals, false, config);
        this._goalViolationDetector = new GoalViolationDetector(this._anomalies, this._kafkaCruiseControl);
        this._brokerFailureDetector = new BrokerFailureDetector(this._anomalies, this._kafkaCruiseControl);
        this._metricAnomalyDetector = new MetricAnomalyDetector(this._anomalies, this._kafkaCruiseControl);
        this._diskFailureDetector = new DiskFailureDetector(this._anomalies, this._kafkaCruiseControl);
        this._topicAnomalyDetector = new TopicAnomalyDetector(this._anomalies, this._kafkaCruiseControl);
        this._maintenanceEventDetector = new MaintenanceEventDetector(this._anomalies, this._kafkaCruiseControl);
        this._detectorScheduler = Executors.newScheduledThreadPool(NUM_ANOMALY_DETECTION_THREADS, new KafkaCruiseControlThreadFactory(METRIC_REGISTRY_NAME, false, LOG));
        this._shutdown = false;
        int numCachedRecentAnomalyStates = config.getInt("num.cached.recent.anomaly.states");
        this._anomalyLoggerExecutor = Executors.newSingleThreadScheduledExecutor(new KafkaCruiseControlThreadFactory("AnomalyLogger", true, null));
        this._anomalyInProgress = null;
        this._numCheckedWithDelay = new AtomicLong();
        this._shutdownLock = new Object();
        this.registerGaugeSensors(dropwizardMetricRegistry);
        this._anomalyDetectorState = new AnomalyDetectorState(time, this._anomalyNotifier.selfHealingEnabled(), numCachedRecentAnomalyStates, dropwizardMetricRegistry);
    }

    AnomalyDetectorManager(PriorityBlockingQueue<Anomaly> anomalies, long anomalyDetectionIntervalMs, KafkaCruiseControl kafkaCruiseControl, AnomalyNotifier anomalyNotifier, GoalViolationDetector goalViolationDetector, BrokerFailureDetector brokerFailureDetector, MetricAnomalyDetector metricAnomalyDetector, DiskFailureDetector diskFailureDetector, TopicAnomalyDetector topicAnomalyDetector, MaintenanceEventDetector maintenanceEventDetector, ScheduledExecutorService detectorScheduler) {
        this._anomalies = anomalies;
        this._anomalyDetectionIntervalMsByType = new HashMap<KafkaAnomalyType, Long>(KafkaAnomalyType.cachedValues().size() - 1);
        KafkaAnomalyType.cachedValues().stream().filter(type -> type != KafkaAnomalyType.BROKER_FAILURE).forEach(type -> this._anomalyDetectionIntervalMsByType.put((KafkaAnomalyType)((Object)type), anomalyDetectionIntervalMs));
        this._brokerFailureDetectionBackoffMs = anomalyDetectionIntervalMs;
        this._anomalyNotifier = anomalyNotifier;
        this._goalViolationDetector = goalViolationDetector;
        this._brokerFailureDetector = brokerFailureDetector;
        this._metricAnomalyDetector = metricAnomalyDetector;
        this._diskFailureDetector = diskFailureDetector;
        this._topicAnomalyDetector = topicAnomalyDetector;
        this._maintenanceEventDetector = maintenanceEventDetector;
        this._kafkaCruiseControl = kafkaCruiseControl;
        this._detectorScheduler = detectorScheduler;
        this._shutdown = false;
        this._selfHealingGoals = Collections.emptyList();
        this._anomalyLoggerExecutor = Executors.newSingleThreadScheduledExecutor(new KafkaCruiseControlThreadFactory("AnomalyLogger", true, null));
        this._anomalyInProgress = null;
        this._numCheckedWithDelay = new AtomicLong();
        this._shutdownLock = new Object();
        this._anomalyDetectorState = new AnomalyDetectorState((Time)new SystemTime(), new HashMap<AnomalyType, Boolean>(KafkaAnomalyType.cachedValues().size()), 10, null);
    }

    private void registerGaugeSensors(MetricRegistry dropwizardMetricRegistry) {
        dropwizardMetricRegistry.register(MetricRegistry.name((String)METRIC_REGISTRY_NAME, (String[])new String[]{"balancedness-score"}), (Metric)((Gauge)this._goalViolationDetector::balancednessScore));
        for (KafkaAnomalyType anomalyType : KafkaAnomalyType.cachedValues()) {
            dropwizardMetricRegistry.register(MetricRegistry.name((String)METRIC_REGISTRY_NAME, (String[])new String[]{String.format("%s-self-healing-enabled", anomalyType.toString().toLowerCase())}), (Metric)((Gauge)() -> this._anomalyNotifier.selfHealingEnabled().get((Object)anomalyType) != false ? 1 : 0));
        }
    }

    private void scheduleDetectorAtFixedRate(KafkaAnomalyType anomalyType, Runnable anomalyDetector) {
        int jitter = new Random().nextInt(10000);
        long anomalyDetectionIntervalMs = this._anomalyDetectionIntervalMsByType.get((Object)anomalyType);
        LOG.debug("Starting {} detector with delay of {} ms", (Object)anomalyType, (Object)jitter);
        this._detectorScheduler.scheduleAtFixedRate(anomalyDetector, anomalyDetectionIntervalMs / 2L + (long)jitter, anomalyDetectionIntervalMs, TimeUnit.MILLISECONDS);
    }

    public void startDetection() {
        LOG.info("Starting {} detector.", (Object)KafkaAnomalyType.BROKER_FAILURE);
        this._brokerFailureDetector.startDetection();
        this.scheduleDetectorAtFixedRate(KafkaAnomalyType.GOAL_VIOLATION, this._goalViolationDetector);
        this.scheduleDetectorAtFixedRate(KafkaAnomalyType.METRIC_ANOMALY, this._metricAnomalyDetector);
        this.scheduleDetectorAtFixedRate(KafkaAnomalyType.TOPIC_ANOMALY, this._topicAnomalyDetector);
        this.scheduleDetectorAtFixedRate(KafkaAnomalyType.DISK_FAILURE, this._diskFailureDetector);
        LOG.debug("Starting {} detector.", (Object)KafkaAnomalyType.MAINTENANCE_EVENT);
        this._detectorScheduler.submit(this._maintenanceEventDetector);
        LOG.debug("Starting anomaly handler.");
        this._detectorScheduler.submit(new AnomalyHandlerTask());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        LOG.info("Shutting down anomaly detector.");
        Object object = this._shutdownLock;
        synchronized (object) {
            this._shutdown = true;
        }
        this._anomalies.add(AnomalyDetectorUtils.SHUTDOWN_ANOMALY);
        this._maintenanceEventDetector.shutdown();
        this._detectorScheduler.shutdown();
        try {
            this._detectorScheduler.awaitTermination(5000L, TimeUnit.MILLISECONDS);
            if (!this._detectorScheduler.isTerminated()) {
                LOG.warn("The sampling scheduler failed to shutdown in 5000 ms.");
            }
        }
        catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for anomaly detector to shutdown.");
        }
        this._brokerFailureDetector.shutdown();
        this._anomalyLoggerExecutor.shutdownNow();
        LOG.info("Anomaly detector shutdown completed.");
    }

    public synchronized AnomalyDetectorState anomalyDetectorState() {
        this._anomalyDetectorState.refreshMetrics(this._anomalyNotifier.selfHealingEnabledRatio(), this._goalViolationDetector.balancednessScore());
        return this._anomalyDetectorState;
    }

    long numSelfHealingStarted() {
        return this._anomalyDetectorState.numSelfHealingStarted();
    }

    long numSelfHealingFailedToStart() {
        return this._anomalyDetectorState.numSelfHealingFailedToStart();
    }

    public void maybeClearOngoingAnomalyDetectionTimeMs() {
        this._anomalyDetectorState.maybeClearOngoingAnomalyDetectionTimeMs();
    }

    public void resetHasUnfixableGoals() {
        this._anomalyDetectorState.resetHasUnfixableGoals();
    }

    public boolean setSelfHealingFor(AnomalyType anomalyType, boolean isSelfHealingEnabled) {
        boolean oldSelfHealingEnabled = this._anomalyNotifier.setSelfHealingFor(anomalyType, isSelfHealingEnabled);
        this._anomalyDetectorState.setSelfHealingFor(anomalyType, isSelfHealingEnabled);
        return oldSelfHealingEnabled;
    }

    public long numCheckedWithDelay() {
        return this._numCheckedWithDelay.get();
    }

    public void markSelfHealingFinished(String anomalyId) {
        LOG.debug("Self healing with id {} has finished.", (Object)anomalyId);
        this._anomalyDetectorState.markSelfHealingFinished(anomalyId);
    }

    class AnomalyHandlerTask
    implements Runnable {
        AnomalyHandlerTask() {
        }

        @Override
        public void run() {
            LOG.info("Starting anomaly handler");
            while (true) {
                boolean postProcessAnomalyInProgress = false;
                AnomalyDetectorManager.this._anomalyInProgress = null;
                try {
                    AnomalyDetectorManager.this._anomalyInProgress = AnomalyDetectorManager.this._anomalies.take();
                    LOG.trace("Processing anomaly {}.", (Object)AnomalyDetectorManager.this._anomalyInProgress);
                    if (AnomalyDetectorManager.this._anomalyInProgress == AnomalyDetectorUtils.SHUTDOWN_ANOMALY) {
                        AnomalyDetectorManager.this._anomalyInProgress = null;
                        break;
                    }
                    this.handleAnomalyInProgress();
                }
                catch (InterruptedException e) {
                    LOG.debug("Received interrupted exception.", (Throwable)e);
                    postProcessAnomalyInProgress = true;
                }
                catch (OptimizationFailureException ofe) {
                    LOG.warn("Encountered optimization failure when trying to fix the anomaly {}.", (Object)AnomalyDetectorManager.this._anomalyInProgress, (Object)ofe);
                    postProcessAnomalyInProgress = false;
                }
                catch (IllegalStateException ise) {
                    LOG.warn("Unexpected state prevents anomaly detector from handling the anomaly {}.", (Object)AnomalyDetectorManager.this._anomalyInProgress, (Object)ise);
                    postProcessAnomalyInProgress = false;
                }
                catch (Throwable t) {
                    LOG.error("Uncaught exception in anomaly handler.", t);
                    postProcessAnomalyInProgress = true;
                }
                if (!postProcessAnomalyInProgress) continue;
                LOG.info("Post processing anomaly {}.", (Object)AnomalyDetectorManager.this._anomalyInProgress);
                this.postProcessAnomalyInProgress(AnomalyDetectorManager.this._brokerFailureDetectionBackoffMs);
            }
            LOG.info("Anomaly handler exited.");
        }

        private void handleAnomalyInProgress() throws Exception {
            AnomalyType anomalyType = AnomalyDetectorManager.this._anomalyInProgress.anomalyType();
            AnomalyDetectorManager.this._anomalyDetectorState.addAnomalyDetection(anomalyType, AnomalyDetectorManager.this._anomalyInProgress);
            ExecutorState.State executionState = AnomalyDetectorManager.this._kafkaCruiseControl.executionState();
            if (executionState != ExecutorState.State.NO_TASK_IN_PROGRESS) {
                LOG.info("Post processing anomaly {} because executor is in {} state.", (Object)AnomalyDetectorManager.this._anomalyInProgress, (Object)executionState);
                this.postProcessAnomalyInProgress(AnomalyDetectorManager.this._brokerFailureDetectionBackoffMs);
            } else {
                this.processAnomalyInProgress(anomalyType);
            }
        }

        private void processAnomalyInProgress(AnomalyType anomalyType) throws Exception {
            AnomalyDetectorManager.this._anomalyDetectorState.markAnomalyRate(anomalyType);
            AnomalyNotificationResult notificationResult = this.notifyAnomalyInProgress(anomalyType);
            if (notificationResult != null) {
                AnomalyDetectorManager.this._anomalyDetectorState.maybeSetOngoingAnomalyDetectionTimeMs();
                switch (notificationResult.action()) {
                    case FIX: {
                        this.fixAnomalyInProgress(anomalyType);
                        break;
                    }
                    case CHECK: {
                        LOG.info("Post processing anomaly {} for {}.", (Object)AnomalyDetectorManager.this._anomalyInProgress, (Object)AnomalyState.Status.CHECK_WITH_DELAY);
                        this.postProcessAnomalyInProgress(notificationResult.delay());
                        break;
                    }
                    case IGNORE: {
                        AnomalyDetectorManager.this._anomalyDetectorState.onAnomalyHandle(AnomalyDetectorManager.this._anomalyInProgress, AnomalyState.Status.IGNORED);
                        break;
                    }
                    default: {
                        throw new IllegalStateException("Unrecognized anomaly notification result.");
                    }
                }
            }
        }

        private AnomalyNotificationResult notifyAnomalyInProgress(AnomalyType anomalyType) {
            AnomalyNotificationResult notificationResult;
            switch ((KafkaAnomalyType)anomalyType) {
                case GOAL_VIOLATION: {
                    GoalViolations goalViolations = (GoalViolations)AnomalyDetectorManager.this._anomalyInProgress;
                    notificationResult = AnomalyDetectorManager.this._anomalyNotifier.onGoalViolation(goalViolations);
                    AnomalyDetectorManager.this._anomalyDetectorState.refreshHasUnfixableGoal(goalViolations);
                    break;
                }
                case BROKER_FAILURE: {
                    BrokerFailures brokerFailures = (BrokerFailures)AnomalyDetectorManager.this._anomalyInProgress;
                    notificationResult = AnomalyDetectorManager.this._anomalyNotifier.onBrokerFailure(brokerFailures);
                    break;
                }
                case METRIC_ANOMALY: {
                    KafkaMetricAnomaly metricAnomaly = (KafkaMetricAnomaly)AnomalyDetectorManager.this._anomalyInProgress;
                    notificationResult = AnomalyDetectorManager.this._anomalyNotifier.onMetricAnomaly(metricAnomaly);
                    break;
                }
                case DISK_FAILURE: {
                    DiskFailures diskFailures = (DiskFailures)AnomalyDetectorManager.this._anomalyInProgress;
                    notificationResult = AnomalyDetectorManager.this._anomalyNotifier.onDiskFailure(diskFailures);
                    break;
                }
                case TOPIC_ANOMALY: {
                    TopicAnomaly topicAnomaly = (TopicAnomaly)AnomalyDetectorManager.this._anomalyInProgress;
                    notificationResult = AnomalyDetectorManager.this._anomalyNotifier.onTopicAnomaly(topicAnomaly);
                    break;
                }
                case MAINTENANCE_EVENT: {
                    MaintenanceEvent maintenanceEvent = (MaintenanceEvent)AnomalyDetectorManager.this._anomalyInProgress;
                    notificationResult = AnomalyDetectorManager.this._anomalyNotifier.onMaintenanceEvent(maintenanceEvent);
                    break;
                }
                default: {
                    throw new IllegalStateException("Unrecognized anomaly type.");
                }
            }
            LOG.debug("Received notification result {}", (Object)notificationResult);
            return notificationResult;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void postProcessAnomalyInProgress(long delayMs) {
            if (AnomalyDetectorManager.this._anomalyInProgress.anomalyType() == KafkaAnomalyType.BROKER_FAILURE) {
                Object object = AnomalyDetectorManager.this._shutdownLock;
                synchronized (object) {
                    if (AnomalyDetectorManager.this._shutdown) {
                        LOG.debug("Skip delayed checking anomaly {}, because anomaly detector is shutting down.", (Object)AnomalyDetectorManager.this._anomalyInProgress);
                    } else {
                        LOG.debug("Scheduling broker failure detection with delay of {} ms", (Object)delayMs);
                        AnomalyDetectorManager.this._numCheckedWithDelay.incrementAndGet();
                        AnomalyDetectorManager.this._detectorScheduler.schedule(() -> AnomalyDetectorManager.this._brokerFailureDetector.detectBrokerFailures(false), delayMs, TimeUnit.MILLISECONDS);
                        AnomalyDetectorManager.this._anomalyDetectorState.onAnomalyHandle(AnomalyDetectorManager.this._anomalyInProgress, AnomalyState.Status.CHECK_WITH_DELAY);
                    }
                }
            } else {
                AnomalyDetectorManager.this._anomalyDetectorState.onAnomalyHandle(AnomalyDetectorManager.this._anomalyInProgress, AnomalyState.Status.IGNORED);
            }
        }

        private boolean isAnomalyInProgressReadyToFix(AnomalyType anomalyType) {
            LoadMonitorTaskRunner.LoadMonitorTaskRunnerState loadMonitorTaskRunnerState = AnomalyDetectorManager.this._kafkaCruiseControl.getLoadMonitorTaskRunnerState();
            if (!AnomalyUtils.isLoadMonitorReady(loadMonitorTaskRunnerState)) {
                LOG.info("Skipping {} fix because load monitor is in {} state.", (Object)anomalyType, (Object)loadMonitorTaskRunnerState);
                AnomalyDetectorManager.this._anomalyDetectorState.onAnomalyHandle(AnomalyDetectorManager.this._anomalyInProgress, AnomalyState.Status.LOAD_MONITOR_NOT_READY);
            } else {
                if (AnomalyDetectorManager.this._kafkaCruiseControl.meetCompletenessRequirements(AnomalyDetectorManager.this._selfHealingGoals)) {
                    return true;
                }
                LOG.warn("Skipping {} fix because load completeness requirement is not met for goals.", (Object)anomalyType);
                AnomalyDetectorManager.this._anomalyDetectorState.onAnomalyHandle(AnomalyDetectorManager.this._anomalyInProgress, AnomalyState.Status.COMPLETENESS_NOT_READY);
            }
            return false;
        }

        private void logSelfHealingOperation(String anomalyId, OptimizationFailureException ofe, String optimizationResult) {
            if (optimizationResult != null) {
                OPERATION_LOG.info("[{}] Self-healing started successfully:\n{}", (Object)anomalyId, (Object)optimizationResult);
            } else if (ofe != null) {
                OPERATION_LOG.warn("[{}] Self-healing failed to start:\n{}", (Object)anomalyId, (Object)ofe);
            } else {
                OPERATION_LOG.warn("[{}] Self-healing failed to start due to inability to optimize combined self-healing goals ({}).", (Object)anomalyId, AnomalyDetectorManager.this._selfHealingGoals);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void fixAnomalyInProgress(AnomalyType anomalyType) throws Exception {
            Object object = AnomalyDetectorManager.this._shutdownLock;
            synchronized (object) {
                if (AnomalyDetectorManager.this._shutdown) {
                    LOG.info("Skip fixing anomaly {}, because anomaly detector is shutting down.", (Object)AnomalyDetectorManager.this._anomalyInProgress);
                } else {
                    boolean isReadyToFix = this.isAnomalyInProgressReadyToFix(anomalyType);
                    boolean fixStarted = false;
                    String anomalyId = AnomalyDetectorManager.this._anomalyInProgress.anomalyId();
                    boolean skipReportingIfNotUpdated = false;
                    try {
                        if (isReadyToFix) {
                            LOG.info("Generating a fix for the anomaly {}.", (Object)AnomalyDetectorManager.this._anomalyInProgress);
                            fixStarted = AnomalyDetectorManager.this._anomalyInProgress.fix();
                            LOG.info("{} the anomaly {}.", (Object)(fixStarted ? "Fixing" : "Cannot fix"), (Object)AnomalyDetectorManager.this._anomalyInProgress);
                            String optimizationResult = fixStarted ? AnomalyDetectorManager.this._anomalyInProgress.optimizationResult(false) : null;
                            AnomalyDetectorManager.this._anomalyLoggerExecutor.submit(() -> this.logSelfHealingOperation(anomalyId, null, optimizationResult));
                        }
                    }
                    catch (OptimizationFailureException ofe) {
                        AnomalyDetectorManager.this._anomalyLoggerExecutor.submit(() -> this.logSelfHealingOperation(anomalyId, ofe, null));
                        skipReportingIfNotUpdated = anomalyType == KafkaAnomalyType.BROKER_FAILURE;
                        throw ofe;
                    }
                    finally {
                        this.handlePostFixAnomaly(isReadyToFix, fixStarted, anomalyId, skipReportingIfNotUpdated);
                    }
                }
            }
        }

        private void handlePostFixAnomaly(boolean isReadyToFix, boolean fixStarted, String anomalyId, boolean skipReportingIfNotUpdated) {
            if (isReadyToFix) {
                AnomalyDetectorManager.this._anomalyDetectorState.onAnomalyHandle(AnomalyDetectorManager.this._anomalyInProgress, fixStarted ? AnomalyState.Status.FIX_STARTED : AnomalyState.Status.FIX_FAILED_TO_START);
                if (fixStarted) {
                    AnomalyDetectorManager.this._anomalyDetectorState.incrementNumSelfHealingStarted();
                    LOG.info("[{}] Self-healing started successfully.", (Object)anomalyId);
                } else {
                    AnomalyDetectorManager.this._anomalyDetectorState.incrementNumSelfHealingFailedToStart();
                    LOG.warn("[{}] Self-healing failed to start.", (Object)anomalyId);
                }
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Clearing {} anomalies and scheduling a broker failure detection in {}ms.", (Object)AnomalyDetectorManager.this._anomalies.size(), (Object)(isReadyToFix ? 0L : AnomalyDetectorManager.this._brokerFailureDetectionBackoffMs));
            }
            AnomalyDetectorManager.this._anomalies.clear();
            AnomalyDetectorManager.this._detectorScheduler.schedule(() -> AnomalyDetectorManager.this._brokerFailureDetector.detectBrokerFailures(skipReportingIfNotUpdated), isReadyToFix ? 0L : AnomalyDetectorManager.this._brokerFailureDetectionBackoffMs, TimeUnit.MILLISECONDS);
        }
    }
}

