package com.linkedin.kafka.cruisecontrol.detector;

import com.codahale.metrics.MetricRegistry;
import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.cruisecontrol.detector.AnomalyType;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.constants.AnomalyDetectorConfig;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyState;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotificationResult;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier;
import com.linkedin.kafka.cruisecontrol.detector.notifier.KafkaAnomalyType;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorManager.class */
public class AnomalyDetectorManager {
    static final String METRIC_REGISTRY_NAME = "AnomalyDetector";
    private static final int INIT_JITTER_BOUND = 10000;
    private static final long SCHEDULER_SHUTDOWN_TIMEOUT_MS = 5000;
    private static final int ANOMALY_QUEUE_INITIAL_CAPACITY = 10;
    private final KafkaCruiseControl _kafkaCruiseControl;
    private final AnomalyNotifier _anomalyNotifier;
    private final GoalViolationDetector _goalViolationDetector;
    private final BrokerFailureDetector _brokerFailureDetector;
    private final MetricAnomalyDetector _metricAnomalyDetector;
    private final DiskFailureDetector _diskFailureDetector;
    private final TopicAnomalyDetector _topicAnomalyDetector;
    private final MaintenanceEventDetector _maintenanceEventDetector;
    private final ScheduledExecutorService _detectorScheduler;
    private final Map<KafkaAnomalyType, Long> _anomalyDetectionIntervalMsByType;
    private final long _brokerFailureDetectionBackoffMs;
    private final PriorityBlockingQueue<Anomaly> _anomalies;
    private volatile boolean _shutdown;
    private final AnomalyDetectorState _anomalyDetectorState;
    private final List<String> _selfHealingGoals;
    private final ExecutorService _anomalyLoggerExecutor;
    private volatile Anomaly _anomalyInProgress;
    private final AtomicLong _numCheckedWithDelay;
    private final Object _shutdownLock;
    private static final int NUM_ANOMALY_DETECTION_THREADS = KafkaAnomalyType.cachedValues().size() + 1;
    private static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectorManager.class);
    private static final Logger OPERATION_LOG = LoggerFactory.getLogger(KafkaCruiseControlUtils.OPERATION_LOGGER);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorManager$AnomalyHandlerTask.class */
    public class AnomalyHandlerTask implements Runnable {
        AnomalyHandlerTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            AnomalyDetectorManager.LOG.info("Starting anomaly handler");
            while (true) {
                boolean z = false;
                AnomalyDetectorManager.this._anomalyInProgress = null;
                try {
                    AnomalyDetectorManager.this._anomalyInProgress = AnomalyDetectorManager.this._anomalies.take();
                    AnomalyDetectorManager.LOG.trace("Processing anomaly {}.", AnomalyDetectorManager.this._anomalyInProgress);
                } catch (OptimizationFailureException e) {
                    AnomalyDetectorManager.LOG.warn("Encountered optimization failure when trying to fix the anomaly {}.", AnomalyDetectorManager.this._anomalyInProgress, e);
                    z = false;
                } catch (IllegalStateException e2) {
                    AnomalyDetectorManager.LOG.warn("Unexpected state prevents anomaly detector from handling the anomaly {}.", AnomalyDetectorManager.this._anomalyInProgress, e2);
                    z = false;
                } catch (InterruptedException e3) {
                    AnomalyDetectorManager.LOG.debug("Received interrupted exception.", e3);
                    z = true;
                } catch (Throwable th) {
                    AnomalyDetectorManager.LOG.error("Uncaught exception in anomaly handler.", th);
                    z = true;
                }
                if (AnomalyDetectorManager.this._anomalyInProgress == AnomalyDetectorUtils.SHUTDOWN_ANOMALY) {
                    AnomalyDetectorManager.this._anomalyInProgress = null;
                    AnomalyDetectorManager.LOG.info("Anomaly handler exited.");
                    return;
                } else {
                    handleAnomalyInProgress();
                    if (z) {
                        AnomalyDetectorManager.LOG.info("Post processing anomaly {}.", AnomalyDetectorManager.this._anomalyInProgress);
                        postProcessAnomalyInProgress(AnomalyDetectorManager.this._brokerFailureDetectionBackoffMs);
                    }
                }
            }
        }

        private void handleAnomalyInProgress() throws Exception {
            AnomalyType anomalyType = AnomalyDetectorManager.this._anomalyInProgress.anomalyType();
            AnomalyDetectorManager.this._anomalyDetectorState.addAnomalyDetection(anomalyType, AnomalyDetectorManager.this._anomalyInProgress);
            ExecutorState.State executionState = AnomalyDetectorManager.this._kafkaCruiseControl.executionState();
            if (executionState == ExecutorState.State.NO_TASK_IN_PROGRESS) {
                processAnomalyInProgress(anomalyType);
            } else {
                AnomalyDetectorManager.LOG.info("Post processing anomaly {} because executor is in {} state.", AnomalyDetectorManager.this._anomalyInProgress, executionState);
                postProcessAnomalyInProgress(AnomalyDetectorManager.this._brokerFailureDetectionBackoffMs);
            }
        }

        private void processAnomalyInProgress(AnomalyType anomalyType) throws Exception {
            AnomalyDetectorManager.this._anomalyDetectorState.markAnomalyRate(anomalyType);
            AnomalyNotificationResult notifyAnomalyInProgress = notifyAnomalyInProgress(anomalyType);
            if (notifyAnomalyInProgress != null) {
                AnomalyDetectorManager.this._anomalyDetectorState.maybeSetOngoingAnomalyDetectionTimeMs();
                switch (notifyAnomalyInProgress.action()) {
                    case FIX:
                        fixAnomalyInProgress(anomalyType);
                        return;
                    case CHECK:
                        AnomalyDetectorManager.LOG.info("Post processing anomaly {} for {}.", AnomalyDetectorManager.this._anomalyInProgress, AnomalyState.Status.CHECK_WITH_DELAY);
                        postProcessAnomalyInProgress(notifyAnomalyInProgress.delay());
                        return;
                    case IGNORE:
                        AnomalyDetectorManager.this._anomalyDetectorState.onAnomalyHandle(AnomalyDetectorManager.this._anomalyInProgress, AnomalyState.Status.IGNORED);
                        return;
                    default:
                        throw new IllegalStateException("Unrecognized anomaly notification result.");
                }
            }
        }

        private AnomalyNotificationResult notifyAnomalyInProgress(AnomalyType anomalyType) {
            AnomalyNotificationResult onMaintenanceEvent;
            switch ((KafkaAnomalyType) anomalyType) {
                case GOAL_VIOLATION:
                    GoalViolations goalViolations = (GoalViolations) AnomalyDetectorManager.this._anomalyInProgress;
                    onMaintenanceEvent = AnomalyDetectorManager.this._anomalyNotifier.onGoalViolation(goalViolations);
                    AnomalyDetectorManager.this._anomalyDetectorState.refreshHasUnfixableGoal(goalViolations);
                    break;
                case BROKER_FAILURE:
                    onMaintenanceEvent = AnomalyDetectorManager.this._anomalyNotifier.onBrokerFailure((BrokerFailures) AnomalyDetectorManager.this._anomalyInProgress);
                    break;
                case METRIC_ANOMALY:
                    onMaintenanceEvent = AnomalyDetectorManager.this._anomalyNotifier.onMetricAnomaly((KafkaMetricAnomaly) AnomalyDetectorManager.this._anomalyInProgress);
                    break;
                case DISK_FAILURE:
                    onMaintenanceEvent = AnomalyDetectorManager.this._anomalyNotifier.onDiskFailure((DiskFailures) AnomalyDetectorManager.this._anomalyInProgress);
                    break;
                case TOPIC_ANOMALY:
                    onMaintenanceEvent = AnomalyDetectorManager.this._anomalyNotifier.onTopicAnomaly((TopicAnomaly) AnomalyDetectorManager.this._anomalyInProgress);
                    break;
                case MAINTENANCE_EVENT:
                    onMaintenanceEvent = AnomalyDetectorManager.this._anomalyNotifier.onMaintenanceEvent((MaintenanceEvent) AnomalyDetectorManager.this._anomalyInProgress);
                    break;
                default:
                    throw new IllegalStateException("Unrecognized anomaly type.");
            }
            AnomalyDetectorManager.LOG.debug("Received notification result {}", onMaintenanceEvent);
            return onMaintenanceEvent;
        }

        private void postProcessAnomalyInProgress(long j) {
            if (AnomalyDetectorManager.this._anomalyInProgress.anomalyType() != KafkaAnomalyType.BROKER_FAILURE) {
                AnomalyDetectorManager.this._anomalyDetectorState.onAnomalyHandle(AnomalyDetectorManager.this._anomalyInProgress, AnomalyState.Status.IGNORED);
                return;
            }
            synchronized (AnomalyDetectorManager.this._shutdownLock) {
                if (AnomalyDetectorManager.this._shutdown) {
                    AnomalyDetectorManager.LOG.debug("Skip delayed checking anomaly {}, because anomaly detector is shutting down.", AnomalyDetectorManager.this._anomalyInProgress);
                } else {
                    AnomalyDetectorManager.LOG.debug("Scheduling broker failure detection with delay of {} ms", Long.valueOf(j));
                    AnomalyDetectorManager.this._numCheckedWithDelay.incrementAndGet();
                    AnomalyDetectorManager.this._detectorScheduler.schedule(() -> {
                        AnomalyDetectorManager.this._brokerFailureDetector.detectBrokerFailures(false);
                    }, j, TimeUnit.MILLISECONDS);
                    AnomalyDetectorManager.this._anomalyDetectorState.onAnomalyHandle(AnomalyDetectorManager.this._anomalyInProgress, AnomalyState.Status.CHECK_WITH_DELAY);
                }
            }
        }

        private boolean isAnomalyInProgressReadyToFix(AnomalyType anomalyType) {
            LoadMonitorTaskRunner.LoadMonitorTaskRunnerState loadMonitorTaskRunnerState = AnomalyDetectorManager.this._kafkaCruiseControl.getLoadMonitorTaskRunnerState();
            if (!AnomalyUtils.isLoadMonitorReady(loadMonitorTaskRunnerState)) {
                AnomalyDetectorManager.LOG.info("Skipping {} fix because load monitor is in {} state.", anomalyType, loadMonitorTaskRunnerState);
                AnomalyDetectorManager.this._anomalyDetectorState.onAnomalyHandle(AnomalyDetectorManager.this._anomalyInProgress, AnomalyState.Status.LOAD_MONITOR_NOT_READY);
                return false;
            }
            if (AnomalyDetectorManager.this._kafkaCruiseControl.meetCompletenessRequirements(AnomalyDetectorManager.this._selfHealingGoals)) {
                return true;
            }
            AnomalyDetectorManager.LOG.warn("Skipping {} fix because load completeness requirement is not met for goals.", anomalyType);
            AnomalyDetectorManager.this._anomalyDetectorState.onAnomalyHandle(AnomalyDetectorManager.this._anomalyInProgress, AnomalyState.Status.COMPLETENESS_NOT_READY);
            return false;
        }

        private void logSelfHealingOperation(String str, OptimizationFailureException optimizationFailureException, String str2) {
            if (str2 != null) {
                AnomalyDetectorManager.OPERATION_LOG.info("[{}] Self-healing started successfully:\n{}", str, str2);
            } else if (optimizationFailureException != null) {
                AnomalyDetectorManager.OPERATION_LOG.warn("[{}] Self-healing failed to start:\n{}", str, optimizationFailureException);
            } else {
                AnomalyDetectorManager.OPERATION_LOG.warn("[{}] Self-healing failed to start due to inability to optimize combined self-healing goals ({}).", str, AnomalyDetectorManager.this._selfHealingGoals);
            }
        }

        private void fixAnomalyInProgress(AnomalyType anomalyType) throws Exception {
            synchronized (AnomalyDetectorManager.this._shutdownLock) {
                if (AnomalyDetectorManager.this._shutdown) {
                    AnomalyDetectorManager.LOG.info("Skip fixing anomaly {}, because anomaly detector is shutting down.", AnomalyDetectorManager.this._anomalyInProgress);
                } else {
                    boolean isAnomalyInProgressReadyToFix = isAnomalyInProgressReadyToFix(anomalyType);
                    boolean z = false;
                    String anomalyId = AnomalyDetectorManager.this._anomalyInProgress.anomalyId();
                    boolean z2 = false;
                    if (isAnomalyInProgressReadyToFix) {
                        try {
                            try {
                                AnomalyDetectorManager.LOG.info("Generating a fix for the anomaly {}.", AnomalyDetectorManager.this._anomalyInProgress);
                                z = AnomalyDetectorManager.this._anomalyInProgress.fix();
                                AnomalyDetectorManager.LOG.info("{} the anomaly {}.", z ? "Fixing" : "Cannot fix", AnomalyDetectorManager.this._anomalyInProgress);
                                String optimizationResult = z ? AnomalyDetectorManager.this._anomalyInProgress.optimizationResult(false) : null;
                                AnomalyDetectorManager.this._anomalyLoggerExecutor.submit(() -> {
                                    logSelfHealingOperation(anomalyId, null, optimizationResult);
                                });
                            } catch (OptimizationFailureException e) {
                                AnomalyDetectorManager.this._anomalyLoggerExecutor.submit(() -> {
                                    logSelfHealingOperation(anomalyId, e, null);
                                });
                                z2 = anomalyType == KafkaAnomalyType.BROKER_FAILURE;
                                throw e;
                            }
                        } catch (Throwable th) {
                            handlePostFixAnomaly(isAnomalyInProgressReadyToFix, z, anomalyId, z2);
                            throw th;
                        }
                    }
                    handlePostFixAnomaly(isAnomalyInProgressReadyToFix, z, anomalyId, false);
                }
            }
        }

        private void handlePostFixAnomaly(boolean z, boolean z2, String str, boolean z3) {
            if (z) {
                AnomalyDetectorManager.this._anomalyDetectorState.onAnomalyHandle(AnomalyDetectorManager.this._anomalyInProgress, z2 ? AnomalyState.Status.FIX_STARTED : AnomalyState.Status.FIX_FAILED_TO_START);
                if (z2) {
                    AnomalyDetectorManager.this._anomalyDetectorState.incrementNumSelfHealingStarted();
                    AnomalyDetectorManager.LOG.info("[{}] Self-healing started successfully.", str);
                } else {
                    AnomalyDetectorManager.this._anomalyDetectorState.incrementNumSelfHealingFailedToStart();
                    AnomalyDetectorManager.LOG.warn("[{}] Self-healing failed to start.", str);
                }
            }
            if (AnomalyDetectorManager.LOG.isDebugEnabled()) {
                AnomalyDetectorManager.LOG.debug("Clearing {} anomalies and scheduling a broker failure detection in {}ms.", Integer.valueOf(AnomalyDetectorManager.this._anomalies.size()), Long.valueOf(z ? 0L : AnomalyDetectorManager.this._brokerFailureDetectionBackoffMs));
            }
            AnomalyDetectorManager.this._anomalies.clear();
            AnomalyDetectorManager.this._detectorScheduler.schedule(() -> {
                AnomalyDetectorManager.this._brokerFailureDetector.detectBrokerFailures(z3);
            }, z ? 0L : AnomalyDetectorManager.this._brokerFailureDetectionBackoffMs, TimeUnit.MILLISECONDS);
        }
    }

    public AnomalyDetectorManager(KafkaCruiseControl kafkaCruiseControl, Time time, MetricRegistry metricRegistry) {
        this._anomalies = new PriorityBlockingQueue<>(10, AnomalyDetectorUtils.anomalyComparator());
        KafkaCruiseControlConfig config = kafkaCruiseControl.config();
        Long l = config.getLong(AnomalyDetectorConfig.ANOMALY_DETECTION_INTERVAL_MS_CONFIG);
        this._anomalyDetectionIntervalMsByType = new HashMap(KafkaAnomalyType.cachedValues().size() - 2);
        Long l2 = config.getLong(AnomalyDetectorConfig.GOAL_VIOLATION_DETECTION_INTERVAL_MS_CONFIG);
        this._anomalyDetectionIntervalMsByType.put(KafkaAnomalyType.GOAL_VIOLATION, l2 == null ? l : l2);
        Long l3 = config.getLong(AnomalyDetectorConfig.METRIC_ANOMALY_DETECTION_INTERVAL_MS_CONFIG);
        this._anomalyDetectionIntervalMsByType.put(KafkaAnomalyType.METRIC_ANOMALY, l3 == null ? l : l3);
        Long l4 = config.getLong(AnomalyDetectorConfig.TOPIC_ANOMALY_DETECTION_INTERVAL_MS_CONFIG);
        this._anomalyDetectionIntervalMsByType.put(KafkaAnomalyType.TOPIC_ANOMALY, l4 == null ? l : l4);
        Long l5 = config.getLong(AnomalyDetectorConfig.DISK_FAILURE_DETECTION_INTERVAL_MS_CONFIG);
        this._anomalyDetectionIntervalMsByType.put(KafkaAnomalyType.DISK_FAILURE, l5 == null ? l : l5);
        this._brokerFailureDetectionBackoffMs = config.getLong(AnomalyDetectorConfig.BROKER_FAILURE_DETECTION_BACKOFF_MS_CONFIG).longValue();
        this._anomalyNotifier = (AnomalyNotifier) config.getConfiguredInstance(AnomalyDetectorConfig.ANOMALY_NOTIFIER_CLASS_CONFIG, AnomalyNotifier.class);
        this._kafkaCruiseControl = kafkaCruiseControl;
        this._selfHealingGoals = AnomalyDetectorUtils.getSelfHealingGoalNames(config);
        KafkaCruiseControlUtils.sanityCheckGoals(this._selfHealingGoals, false, config);
        this._goalViolationDetector = new GoalViolationDetector(this._anomalies, this._kafkaCruiseControl);
        this._brokerFailureDetector = new BrokerFailureDetector(this._anomalies, this._kafkaCruiseControl);
        this._metricAnomalyDetector = new MetricAnomalyDetector(this._anomalies, this._kafkaCruiseControl);
        this._diskFailureDetector = new DiskFailureDetector(this._anomalies, this._kafkaCruiseControl);
        this._topicAnomalyDetector = new TopicAnomalyDetector(this._anomalies, this._kafkaCruiseControl);
        this._maintenanceEventDetector = new MaintenanceEventDetector(this._anomalies, this._kafkaCruiseControl);
        this._detectorScheduler = Executors.newScheduledThreadPool(NUM_ANOMALY_DETECTION_THREADS, new KafkaCruiseControlThreadFactory(METRIC_REGISTRY_NAME, false, LOG));
        this._shutdown = false;
        int intValue = config.getInt(AnomalyDetectorConfig.NUM_CACHED_RECENT_ANOMALY_STATES_CONFIG).intValue();
        this._anomalyLoggerExecutor = Executors.newSingleThreadScheduledExecutor(new KafkaCruiseControlThreadFactory("AnomalyLogger", true, null));
        this._anomalyInProgress = null;
        this._numCheckedWithDelay = new AtomicLong();
        this._shutdownLock = new Object();
        registerGaugeSensors(metricRegistry);
        this._anomalyDetectorState = new AnomalyDetectorState(time, this._anomalyNotifier.selfHealingEnabled(), intValue, metricRegistry);
    }

    AnomalyDetectorManager(PriorityBlockingQueue<Anomaly> priorityBlockingQueue, long j, KafkaCruiseControl kafkaCruiseControl, AnomalyNotifier anomalyNotifier, GoalViolationDetector goalViolationDetector, BrokerFailureDetector brokerFailureDetector, MetricAnomalyDetector metricAnomalyDetector, DiskFailureDetector diskFailureDetector, TopicAnomalyDetector topicAnomalyDetector, MaintenanceEventDetector maintenanceEventDetector, ScheduledExecutorService scheduledExecutorService) {
        this._anomalies = priorityBlockingQueue;
        this._anomalyDetectionIntervalMsByType = new HashMap(KafkaAnomalyType.cachedValues().size() - 1);
        KafkaAnomalyType.cachedValues().stream().filter(kafkaAnomalyType -> {
            return kafkaAnomalyType != KafkaAnomalyType.BROKER_FAILURE;
        }).forEach(kafkaAnomalyType2 -> {
            this._anomalyDetectionIntervalMsByType.put(kafkaAnomalyType2, Long.valueOf(j));
        });
        this._brokerFailureDetectionBackoffMs = j;
        this._anomalyNotifier = anomalyNotifier;
        this._goalViolationDetector = goalViolationDetector;
        this._brokerFailureDetector = brokerFailureDetector;
        this._metricAnomalyDetector = metricAnomalyDetector;
        this._diskFailureDetector = diskFailureDetector;
        this._topicAnomalyDetector = topicAnomalyDetector;
        this._maintenanceEventDetector = maintenanceEventDetector;
        this._kafkaCruiseControl = kafkaCruiseControl;
        this._detectorScheduler = scheduledExecutorService;
        this._shutdown = false;
        this._selfHealingGoals = Collections.emptyList();
        this._anomalyLoggerExecutor = Executors.newSingleThreadScheduledExecutor(new KafkaCruiseControlThreadFactory("AnomalyLogger", true, null));
        this._anomalyInProgress = null;
        this._numCheckedWithDelay = new AtomicLong();
        this._shutdownLock = new Object();
        this._anomalyDetectorState = new AnomalyDetectorState(new SystemTime(), new HashMap(KafkaAnomalyType.cachedValues().size()), 10, null);
    }

    private void registerGaugeSensors(MetricRegistry metricRegistry) {
        String name = MetricRegistry.name(METRIC_REGISTRY_NAME, new String[]{"balancedness-score"});
        GoalViolationDetector goalViolationDetector = this._goalViolationDetector;
        Objects.requireNonNull(goalViolationDetector);
        metricRegistry.register(name, goalViolationDetector::balancednessScore);
        for (KafkaAnomalyType kafkaAnomalyType : KafkaAnomalyType.cachedValues()) {
            metricRegistry.register(MetricRegistry.name(METRIC_REGISTRY_NAME, new String[]{String.format("%s-self-healing-enabled", kafkaAnomalyType.toString().toLowerCase())}), () -> {
                return Integer.valueOf(this._anomalyNotifier.selfHealingEnabled().get(kafkaAnomalyType).booleanValue() ? 1 : 0);
            });
        }
    }

    private void scheduleDetectorAtFixedRate(KafkaAnomalyType kafkaAnomalyType, Runnable runnable) {
        int nextInt = new Random().nextInt(INIT_JITTER_BOUND);
        long longValue = this._anomalyDetectionIntervalMsByType.get(kafkaAnomalyType).longValue();
        LOG.debug("Starting {} detector with delay of {} ms", kafkaAnomalyType, Integer.valueOf(nextInt));
        this._detectorScheduler.scheduleAtFixedRate(runnable, (longValue / 2) + nextInt, longValue, TimeUnit.MILLISECONDS);
    }

    public void startDetection() {
        LOG.info("Starting {} detector.", KafkaAnomalyType.BROKER_FAILURE);
        this._brokerFailureDetector.startDetection();
        scheduleDetectorAtFixedRate(KafkaAnomalyType.GOAL_VIOLATION, this._goalViolationDetector);
        scheduleDetectorAtFixedRate(KafkaAnomalyType.METRIC_ANOMALY, this._metricAnomalyDetector);
        scheduleDetectorAtFixedRate(KafkaAnomalyType.TOPIC_ANOMALY, this._topicAnomalyDetector);
        scheduleDetectorAtFixedRate(KafkaAnomalyType.DISK_FAILURE, this._diskFailureDetector);
        LOG.debug("Starting {} detector.", KafkaAnomalyType.MAINTENANCE_EVENT);
        this._detectorScheduler.submit(this._maintenanceEventDetector);
        LOG.debug("Starting anomaly handler.");
        this._detectorScheduler.submit(new AnomalyHandlerTask());
    }

    public void shutdown() {
        LOG.info("Shutting down anomaly detector.");
        synchronized (this._shutdownLock) {
            this._shutdown = true;
        }
        this._anomalies.add(AnomalyDetectorUtils.SHUTDOWN_ANOMALY);
        this._maintenanceEventDetector.shutdown();
        this._detectorScheduler.shutdown();
        try {
            this._detectorScheduler.awaitTermination(5000L, TimeUnit.MILLISECONDS);
            if (!this._detectorScheduler.isTerminated()) {
                LOG.warn("The sampling scheduler failed to shutdown in 5000 ms.");
            }
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for anomaly detector to shutdown.");
        }
        this._brokerFailureDetector.shutdown();
        this._anomalyLoggerExecutor.shutdownNow();
        LOG.info("Anomaly detector shutdown completed.");
    }

    public synchronized AnomalyDetectorState anomalyDetectorState() {
        this._anomalyDetectorState.refreshMetrics(this._anomalyNotifier.selfHealingEnabledRatio(), this._goalViolationDetector.balancednessScore());
        return this._anomalyDetectorState;
    }

    long numSelfHealingStarted() {
        return this._anomalyDetectorState.numSelfHealingStarted();
    }

    long numSelfHealingFailedToStart() {
        return this._anomalyDetectorState.numSelfHealingFailedToStart();
    }

    public void maybeClearOngoingAnomalyDetectionTimeMs() {
        this._anomalyDetectorState.maybeClearOngoingAnomalyDetectionTimeMs();
    }

    public void resetHasUnfixableGoals() {
        this._anomalyDetectorState.resetHasUnfixableGoals();
    }

    public boolean setSelfHealingFor(AnomalyType anomalyType, boolean z) {
        boolean selfHealingFor = this._anomalyNotifier.setSelfHealingFor(anomalyType, z);
        this._anomalyDetectorState.setSelfHealingFor(anomalyType, z);
        return selfHealingFor;
    }

    public long numCheckedWithDelay() {
        return this._numCheckedWithDelay.get();
    }

    public void markSelfHealingFinished(String str) {
        LOG.debug("Self healing with id {} has finished.", str);
        this._anomalyDetectorState.markSelfHealingFinished(str);
    }
}
