/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.detector.AbstractAnomalyDetector;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectionStatus;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils;
import com.linkedin.kafka.cruisecontrol.detector.MaintenanceEvent;
import com.linkedin.kafka.cruisecontrol.detector.MaintenanceEventReader;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MaintenanceEventDetector
extends AbstractAnomalyDetector
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(MaintenanceEventDetector.class);
    public static final long DETECTION_NOT_READY_BACKOFF_MS = 10000L;
    public static final Duration READ_EVENTS_TIMEOUT = Duration.ofSeconds(5L);
    private volatile boolean _shutdown;
    private final MaintenanceEventReader _maintenanceEventReader;

    public MaintenanceEventDetector(Queue<Anomaly> anomalies, KafkaCruiseControl kafkaCruiseControl) {
        super(anomalies, kafkaCruiseControl);
        KafkaCruiseControlConfig config = this._kafkaCruiseControl.config();
        this._shutdown = false;
        Map<String, Object> configWithCruiseControlObject = Collections.singletonMap("kafka.cruise.control.object", kafkaCruiseControl);
        this._maintenanceEventReader = config.getConfiguredInstance("maintenance.event.reader.class", MaintenanceEventReader.class, configWithCruiseControlObject);
    }

    void shutdown() {
        this._shutdown = true;
    }

    @Override
    public void run() {
        while (!this._shutdown) {
            try {
                if (AnomalyDetectorUtils.getAnomalyDetectionStatus(this._kafkaCruiseControl, false) != AnomalyDetectionStatus.READY) {
                    this._kafkaCruiseControl.sleep(10000L);
                }
                Set<MaintenanceEvent> maintenanceEvents = this._maintenanceEventReader.readEvents(READ_EVENTS_TIMEOUT);
                this._anomalies.addAll(maintenanceEvents);
            }
            catch (Exception e) {
                LOG.warn("Maintenance event detector encountered an exception.", (Throwable)e);
            }
        }
        try {
            this._maintenanceEventReader.close();
        }
        catch (Exception e) {
            LOG.error("Received exception while closing maintenance event reader", (Throwable)e);
        }
        LOG.debug("Maintenance event detector is shutdown.");
    }
}

