package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.common.config.ConfigDef;
import com.linkedin.cruisecontrol.exception.NotEnoughValidWindowsException;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.async.progress.OperationProgress;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfigUtils;
import com.linkedin.kafka.cruisecontrol.config.constants.AnomalyDetectorConfig;
import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException;
import com.linkedin.kafka.cruisecontrol.model.Partition;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/PartitionSizeAnomalyFinder.class */
public class PartitionSizeAnomalyFinder implements TopicAnomalyFinder {
    public static final String SELF_HEALING_PARTITION_SIZE_THRESHOLD_BYTE_CONFIG = "self.healing.partition.size.threshold.byte";
    public static final String TOPIC_EXCLUDED_FROM_PARTITION_SIZE_CHECK = "topic.excluded.from.partition.size.check";
    public static final String DEFAULT_TOPIC_EXCLUDED_FROM_PARTITION_SIZE_CHECK = "";
    public static final String TOPIC_PARTITION_SIZE_ANOMALY_CLASS_CONFIG = "topic.partition.size.anomaly.class";
    public static final String PARTITIONS_WITH_LARGE_SIZE_CONFIG = "partitions.with.large.size";
    private KafkaCruiseControl _kafkaCruiseControl;
    private int _partitionSizeThreshold;
    private Pattern _topicExcludedFromCheck;
    private Class<?> _topicPartitionSizeAnomalyClass;
    private boolean _allowCapacityEstimation;
    private static final Logger LOG = LoggerFactory.getLogger(PartitionSizeAnomalyFinder.class);
    public static final Integer DEFAULT_SELF_HEALING_PARTITION_SIZE_THRESHOLD_BYTE = 524288000;
    public static final Class<?> DEFAULT_TOPIC_PARTITION_SIZE_ANOMALY_CLASS = TopicPartitionSizeAnomaly.class;

    @Override // com.linkedin.kafka.cruisecontrol.detector.TopicAnomalyFinder
    public Set<TopicAnomaly> topicAnomalies() {
        HashMap hashMap = new HashMap();
        try {
            LoadMonitor.AutoCloseableSemaphore acquireForModelGeneration = this._kafkaCruiseControl.acquireForModelGeneration(new OperationProgress());
            try {
                for (Map.Entry<String, List<Partition>> entry : this._kafkaCruiseControl.clusterModel(new ModelCompletenessRequirements(1, 0.0d, true), this._allowCapacityEstimation, new OperationProgress()).getPartitionsByTopic().entrySet()) {
                    if (!this._topicExcludedFromCheck.matcher(entry.getKey()).matches()) {
                        for (Partition partition : entry.getValue()) {
                            double expectedUtilizationFor = partition.leader().load().expectedUtilizationFor(Resource.DISK);
                            if (expectedUtilizationFor > this._partitionSizeThreshold) {
                                hashMap.put(partition.topicPartition(), Double.valueOf(expectedUtilizationFor));
                            }
                        }
                    }
                }
                if (acquireForModelGeneration != null) {
                    acquireForModelGeneration.close();
                }
            } catch (Throwable th) {
                if (acquireForModelGeneration != null) {
                    try {
                        acquireForModelGeneration.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Exception e) {
            LOG.error("Unexpected exception", e);
        } catch (NotEnoughValidWindowsException e2) {
            LOG.debug("Skipping topic partition size anomaly detection because there are not enough valid windows.", e2);
        } catch (KafkaCruiseControlException e3) {
            LOG.warn("Partition size anomaly finder received exception", e3);
        }
        return !hashMap.isEmpty() ? Collections.singleton(createTopicPartitionSizeAnomaly(hashMap)) : Collections.emptySet();
    }

    private TopicAnomaly createTopicPartitionSizeAnomaly(Map<TopicPartition, Double> map) {
        HashMap hashMap = new HashMap(3);
        hashMap.put(AnomalyDetectorUtils.KAFKA_CRUISE_CONTROL_OBJECT_CONFIG, this._kafkaCruiseControl);
        hashMap.put(PARTITIONS_WITH_LARGE_SIZE_CONFIG, map);
        hashMap.put(AnomalyDetectorUtils.ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, Long.valueOf(this._kafkaCruiseControl.timeMs()));
        return (TopicAnomaly) KafkaCruiseControlConfigUtils.getConfiguredInstance(this._topicPartitionSizeAnomalyClass, TopicAnomaly.class, hashMap);
    }

    public void configure(Map<String, ?> map) {
        this._kafkaCruiseControl = (KafkaCruiseControl) map.get(AnomalyDetectorUtils.KAFKA_CRUISE_CONTROL_OBJECT_CONFIG);
        if (this._kafkaCruiseControl == null) {
            throw new IllegalArgumentException("Partition size anomaly finder is missing kafka.cruise.control.object");
        }
        this._allowCapacityEstimation = this._kafkaCruiseControl.config().getBoolean(AnomalyDetectorConfig.ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_CONFIG).booleanValue();
        String str = (String) map.get(TOPIC_EXCLUDED_FROM_PARTITION_SIZE_CHECK);
        this._topicExcludedFromCheck = Pattern.compile(str == null ? "" : str);
        Integer num = (Integer) map.get(SELF_HEALING_PARTITION_SIZE_THRESHOLD_BYTE_CONFIG);
        this._partitionSizeThreshold = (num == null ? DEFAULT_SELF_HEALING_PARTITION_SIZE_THRESHOLD_BYTE : num).intValue();
        String str2 = (String) map.get(TOPIC_PARTITION_SIZE_ANOMALY_CLASS_CONFIG);
        if (str2 == null) {
            this._topicPartitionSizeAnomalyClass = DEFAULT_TOPIC_PARTITION_SIZE_ANOMALY_CLASS;
            return;
        }
        this._topicPartitionSizeAnomalyClass = (Class) ConfigDef.parseType(TOPIC_PARTITION_SIZE_ANOMALY_CLASS_CONFIG, str2, ConfigDef.Type.CLASS);
        if (this._topicPartitionSizeAnomalyClass == null || !TopicAnomaly.class.isAssignableFrom(this._topicPartitionSizeAnomalyClass)) {
            throw new IllegalArgumentException(String.format("Invalid %s is provided to partition size anomaly finder, provided %s", TOPIC_PARTITION_SIZE_ANOMALY_CLASS_CONFIG, this._topicPartitionSizeAnomalyClass));
        }
    }
}
