/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.common.config.ConfigDef;
import com.linkedin.cruisecontrol.exception.NotEnoughValidWindowsException;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.async.progress.OperationProgress;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfigUtils;
import com.linkedin.kafka.cruisecontrol.detector.TopicAnomaly;
import com.linkedin.kafka.cruisecontrol.detector.TopicAnomalyFinder;
import com.linkedin.kafka.cruisecontrol.detector.TopicPartitionSizeAnomaly;
import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Partition;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionSizeAnomalyFinder
implements TopicAnomalyFinder {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionSizeAnomalyFinder.class);
    public static final String SELF_HEALING_PARTITION_SIZE_THRESHOLD_BYTE_CONFIG = "self.healing.partition.size.threshold.byte";
    public static final Integer DEFAULT_SELF_HEALING_PARTITION_SIZE_THRESHOLD_BYTE = 524288000;
    public static final String TOPIC_EXCLUDED_FROM_PARTITION_SIZE_CHECK = "topic.excluded.from.partition.size.check";
    public static final String DEFAULT_TOPIC_EXCLUDED_FROM_PARTITION_SIZE_CHECK = "";
    public static final String TOPIC_PARTITION_SIZE_ANOMALY_CLASS_CONFIG = "topic.partition.size.anomaly.class";
    public static final Class<?> DEFAULT_TOPIC_PARTITION_SIZE_ANOMALY_CLASS = TopicPartitionSizeAnomaly.class;
    public static final String PARTITIONS_WITH_LARGE_SIZE_CONFIG = "partitions.with.large.size";
    private KafkaCruiseControl _kafkaCruiseControl;
    private int _partitionSizeThreshold;
    private Pattern _topicExcludedFromCheck;
    private Class<?> _topicPartitionSizeAnomalyClass;
    private boolean _allowCapacityEstimation;

    @Override
    public Set<TopicAnomaly> topicAnomalies() {
        HashMap<TopicPartition, Double> partitionsWithLargeSize = new HashMap<TopicPartition, Double>();
        OperationProgress operationProgress = new OperationProgress();
        try (LoadMonitor.AutoCloseableSemaphore ignored = this._kafkaCruiseControl.acquireForModelGeneration(operationProgress);){
            ClusterModel clusterModel = this._kafkaCruiseControl.clusterModel(new ModelCompletenessRequirements(1, 0.0, true), this._allowCapacityEstimation, new OperationProgress());
            for (Map.Entry<String, List<Partition>> entry : clusterModel.getPartitionsByTopic().entrySet()) {
                if (this._topicExcludedFromCheck.matcher(entry.getKey()).matches()) continue;
                for (Partition partition : entry.getValue()) {
                    double partitionSize = partition.leader().load().expectedUtilizationFor(Resource.DISK);
                    if (!(partitionSize > (double)this._partitionSizeThreshold)) continue;
                    partitionsWithLargeSize.put(partition.topicPartition(), partitionSize);
                }
            }
        }
        catch (NotEnoughValidWindowsException nevwe) {
            LOG.debug("Skipping topic partition size anomaly detection because there are not enough valid windows.", (Throwable)nevwe);
        }
        catch (KafkaCruiseControlException kcce) {
            LOG.warn("Partition size anomaly finder received exception", (Throwable)kcce);
        }
        catch (Exception e) {
            LOG.error("Unexpected exception", (Throwable)e);
        }
        if (!partitionsWithLargeSize.isEmpty()) {
            return Collections.singleton(this.createTopicPartitionSizeAnomaly(partitionsWithLargeSize));
        }
        return Collections.emptySet();
    }

    private TopicAnomaly createTopicPartitionSizeAnomaly(Map<TopicPartition, Double> partitionsWithLargeSize) {
        HashMap<String, Object> configs = new HashMap<String, Object>(3);
        configs.put("kafka.cruise.control.object", this._kafkaCruiseControl);
        configs.put(PARTITIONS_WITH_LARGE_SIZE_CONFIG, partitionsWithLargeSize);
        configs.put("anomaly.detection.time.ms.object", this._kafkaCruiseControl.timeMs());
        return KafkaCruiseControlConfigUtils.getConfiguredInstance(this._topicPartitionSizeAnomalyClass, TopicAnomaly.class, configs);
    }

    public void configure(Map<String, ?> configs) {
        this._kafkaCruiseControl = (KafkaCruiseControl)configs.get("kafka.cruise.control.object");
        if (this._kafkaCruiseControl == null) {
            throw new IllegalArgumentException("Partition size anomaly finder is missing kafka.cruise.control.object");
        }
        this._allowCapacityEstimation = this._kafkaCruiseControl.config().getBoolean("anomaly.detection.allow.capacity.estimation");
        String topicExcludedFromCheck = (String)configs.get(TOPIC_EXCLUDED_FROM_PARTITION_SIZE_CHECK);
        this._topicExcludedFromCheck = Pattern.compile(topicExcludedFromCheck == null ? DEFAULT_TOPIC_EXCLUDED_FROM_PARTITION_SIZE_CHECK : topicExcludedFromCheck);
        Integer partitionSizeThreshold = (Integer)configs.get(SELF_HEALING_PARTITION_SIZE_THRESHOLD_BYTE_CONFIG);
        this._partitionSizeThreshold = partitionSizeThreshold == null ? DEFAULT_SELF_HEALING_PARTITION_SIZE_THRESHOLD_BYTE : partitionSizeThreshold;
        String topicPartitionSizeAnomalyClass = (String)configs.get(TOPIC_PARTITION_SIZE_ANOMALY_CLASS_CONFIG);
        if (topicPartitionSizeAnomalyClass == null) {
            this._topicPartitionSizeAnomalyClass = DEFAULT_TOPIC_PARTITION_SIZE_ANOMALY_CLASS;
        } else {
            this._topicPartitionSizeAnomalyClass = (Class)ConfigDef.parseType((String)TOPIC_PARTITION_SIZE_ANOMALY_CLASS_CONFIG, (Object)topicPartitionSizeAnomalyClass, (ConfigDef.Type)ConfigDef.Type.CLASS);
            if (this._topicPartitionSizeAnomalyClass == null || !TopicAnomaly.class.isAssignableFrom(this._topicPartitionSizeAnomalyClass)) {
                throw new IllegalArgumentException(String.format("Invalid %s is provided to partition size anomaly finder, provided %s", TOPIC_PARTITION_SIZE_ANOMALY_CLASS_CONFIG, this._topicPartitionSizeAnomalyClass));
            }
        }
    }
}

