package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.common.config.ConfigDef;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfigUtils;
import com.linkedin.kafka.cruisecontrol.detector.TopicReplicationFactorAnomaly;
import com.linkedin.kafka.cruisecontrol.servlet.response.ClusterPartitionState;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/TopicReplicationFactorAnomalyFinder.class */
public class TopicReplicationFactorAnomalyFinder implements TopicAnomalyFinder {
    public static final String SELF_HEALING_TARGET_TOPIC_REPLICATION_FACTOR_CONFIG = "self.healing.target.topic.replication.factor";
    public static final String TOPIC_EXCLUDED_FROM_REPLICATION_FACTOR_CHECK = "topic.excluded.from.replication.factor.check";
    public static final String DEFAULT_TOPIC_EXCLUDED_FROM_REPLICATION_FACTOR_CHECK = "";
    public static final String TOPIC_REPLICATION_FACTOR_ANOMALY_CLASS_CONFIG = "topic.replication.topic.anomaly.class";
    public static final String BAD_TOPICS_BY_REPLICATION_FACTOR_CONFIG = "bad.topics.by.replication.factor";
    public static final String TOPIC_REPLICATION_FACTOR_MARGIN_CONFIG = "topic.replication.factor.margin";
    public static final short DEFAULT_TOPIC_REPLICATION_FACTOR_MARGIN = 1;
    public static final String TOPIC_MIN_ISR_RECORD_RETENTION_TIME_MS_CONFIG = "topic.min.isr.record.retention.time.ms";
    public static final long DEFAULT_TOPIC_MIN_ISR_RECORD_RETENTION_TIME_MS = 43200000;
    public static final long DESCRIBE_TOPIC_CONFIG_TIMEOUT_MS = 100000;
    private KafkaCruiseControl _kafkaCruiseControl;
    private short _targetReplicationFactor;
    private Pattern _topicExcludedFromCheck;
    private Class<?> _topicReplicationTopicAnomalyClass;
    private AdminClient _adminClient;
    private short _topicReplicationFactorMargin;
    private long _topicMinISRRecordRetentionTimeMs;
    private Map<String, TopicMinISREntry> _cachedTopicMinISR;
    private static final Logger LOG = LoggerFactory.getLogger(TopicAnomalyFinder.class);
    public static final Class<?> DEFAULT_TOPIC_REPLICATION_FACTOR_ANOMALY_CLASS = TopicReplicationFactorAnomaly.class;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/TopicReplicationFactorAnomalyFinder$TopicMinISREntry.class */
    public static class TopicMinISREntry {
        private final short _minISR;
        private final long _createTimeMs;

        TopicMinISREntry(short s, long j) {
            this._minISR = s;
            this._createTimeMs = j;
        }

        short minISR() {
            return this._minISR;
        }

        long createTimeMs() {
            return this._createTimeMs;
        }
    }

    public TopicReplicationFactorAnomalyFinder() {
    }

    TopicReplicationFactorAnomalyFinder(KafkaCruiseControl kafkaCruiseControl, short s, AdminClient adminClient) {
        this._kafkaCruiseControl = kafkaCruiseControl;
        this._targetReplicationFactor = s;
        this._topicExcludedFromCheck = Pattern.compile("");
        this._topicReplicationTopicAnomalyClass = DEFAULT_TOPIC_REPLICATION_FACTOR_ANOMALY_CLASS;
        this._topicReplicationFactorMargin = (short) 1;
        this._topicMinISRRecordRetentionTimeMs = DEFAULT_TOPIC_MIN_ISR_RECORD_RETENTION_TIME_MS;
        this._adminClient = adminClient;
        this._cachedTopicMinISR = new LinkedHashMap();
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.TopicAnomalyFinder
    public Set<TopicAnomaly> topicAnomalies() {
        LOG.info("Start to detect topic replication factor anomaly.");
        Cluster kafkaCluster = this._kafkaCruiseControl.kafkaCluster();
        HashSet hashSet = new HashSet();
        for (String str : kafkaCluster.topics()) {
            if (!this._topicExcludedFromCheck.matcher(str).matches()) {
                Iterator it = kafkaCluster.partitionsForTopic(str).iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    if (((PartitionInfo) it.next()).replicas().length != this._targetReplicationFactor) {
                        hashSet.add(str);
                        break;
                    }
                }
            }
        }
        refreshTopicMinISRCache();
        if (!hashSet.isEmpty()) {
            maybeRetrieveAndCacheTopicMinISR(hashSet);
            Map<Short, Set<TopicReplicationFactorAnomaly.TopicReplicationFactorAnomalyEntry>> populateBadTopicsByReplicationFactor = populateBadTopicsByReplicationFactor(hashSet, kafkaCluster);
            if (!populateBadTopicsByReplicationFactor.isEmpty()) {
                return Collections.singleton(createTopicReplicationFactorAnomaly(populateBadTopicsByReplicationFactor, this._targetReplicationFactor));
            }
        }
        return Collections.emptySet();
    }

    private void maybeRetrieveAndCacheTopicMinISR(Set<String> set) {
        HashSet hashSet = new HashSet(set.size());
        set.stream().filter(str -> {
            return !this._cachedTopicMinISR.containsKey(str);
        }).forEach(str2 -> {
            hashSet.add(new ConfigResource(ConfigResource.Type.TOPIC, str2));
        });
        if (hashSet.isEmpty()) {
            return;
        }
        for (Map.Entry entry : this._adminClient.describeConfigs(hashSet).values().entrySet()) {
            try {
                this._cachedTopicMinISR.put(((ConfigResource) entry.getKey()).name(), new TopicMinISREntry(Short.parseShort(((Config) ((KafkaFuture) entry.getValue()).get(DESCRIBE_TOPIC_CONFIG_TIMEOUT_MS, TimeUnit.MILLISECONDS)).get(ClusterPartitionState.MIN_INSYNC_REPLICAS).value()), System.currentTimeMillis()));
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                LOG.warn("Skip attempt to fix replication factor of topic {} due to unable to retrieve its minISR config.", ((ConfigResource) entry.getKey()).name());
            }
        }
    }

    private Map<Short, Set<TopicReplicationFactorAnomaly.TopicReplicationFactorAnomalyEntry>> populateBadTopicsByReplicationFactor(Set<String> set, Cluster cluster) {
        HashMap hashMap = new HashMap();
        for (String str : set) {
            if (this._cachedTopicMinISR.containsKey(str)) {
                short max = (short) Math.max((int) this._targetReplicationFactor, this._cachedTopicMinISR.get(str).minISR() + this._topicReplicationFactorMargin);
                int i = 0;
                Iterator it = cluster.partitionsForTopic(str).iterator();
                while (it.hasNext()) {
                    if (((PartitionInfo) it.next()).replicas().length != max) {
                        i++;
                    }
                }
                if (i > 0) {
                    hashMap.putIfAbsent(Short.valueOf(max), new HashSet());
                    ((Set) hashMap.get(Short.valueOf(max))).add(new TopicReplicationFactorAnomaly.TopicReplicationFactorAnomalyEntry(str, i / cluster.partitionCountForTopic(str).intValue()));
                }
            }
        }
        return hashMap;
    }

    private void refreshTopicMinISRCache() {
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<Map.Entry<String, TopicMinISREntry>> it = this._cachedTopicMinISR.entrySet().iterator();
        while (it.hasNext() && it.next().getValue().createTimeMs() + this._topicMinISRRecordRetentionTimeMs < currentTimeMillis) {
            it.remove();
        }
    }

    private TopicAnomaly createTopicReplicationFactorAnomaly(Map<Short, Set<TopicReplicationFactorAnomaly.TopicReplicationFactorAnomalyEntry>> map, short s) {
        HashMap hashMap = new HashMap(4);
        hashMap.put(AnomalyDetectorUtils.KAFKA_CRUISE_CONTROL_OBJECT_CONFIG, this._kafkaCruiseControl);
        hashMap.put(BAD_TOPICS_BY_REPLICATION_FACTOR_CONFIG, map);
        hashMap.put(SELF_HEALING_TARGET_TOPIC_REPLICATION_FACTOR_CONFIG, Short.valueOf(s));
        hashMap.put(AnomalyDetectorUtils.ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, Long.valueOf(this._kafkaCruiseControl.timeMs()));
        return (TopicAnomaly) KafkaCruiseControlConfigUtils.getConfiguredInstance(this._topicReplicationTopicAnomalyClass, TopicAnomaly.class, hashMap);
    }

    public void configure(Map<String, ?> map) {
        this._kafkaCruiseControl = (KafkaCruiseControl) map.get(AnomalyDetectorUtils.KAFKA_CRUISE_CONTROL_OBJECT_CONFIG);
        if (this._kafkaCruiseControl == null) {
            throw new IllegalArgumentException("Topic replication factor anomaly finder is missing kafka.cruise.control.object");
        }
        try {
            this._targetReplicationFactor = Short.parseShort((String) map.get(SELF_HEALING_TARGET_TOPIC_REPLICATION_FACTOR_CONFIG));
            if (this._targetReplicationFactor <= 0) {
                throw new IllegalArgumentException(String.format("%s config of replication factor anomaly finder should be set to positive, provided %d.", SELF_HEALING_TARGET_TOPIC_REPLICATION_FACTOR_CONFIG, Short.valueOf(this._targetReplicationFactor)));
            }
            String str = (String) map.get(TOPIC_EXCLUDED_FROM_REPLICATION_FACTOR_CHECK);
            this._topicExcludedFromCheck = Pattern.compile(str == null ? "" : str);
            String str2 = (String) map.get(TOPIC_REPLICATION_FACTOR_ANOMALY_CLASS_CONFIG);
            if (str2 == null) {
                this._topicReplicationTopicAnomalyClass = DEFAULT_TOPIC_REPLICATION_FACTOR_ANOMALY_CLASS;
            } else {
                this._topicReplicationTopicAnomalyClass = (Class) ConfigDef.parseType(TOPIC_REPLICATION_FACTOR_ANOMALY_CLASS_CONFIG, str2, ConfigDef.Type.CLASS);
                if (this._topicReplicationTopicAnomalyClass == null || !TopicAnomaly.class.isAssignableFrom(this._topicReplicationTopicAnomalyClass)) {
                    throw new IllegalArgumentException(String.format("Invalid %s is provided to replication factor anomaly finder, provided %s", TOPIC_REPLICATION_FACTOR_ANOMALY_CLASS_CONFIG, this._topicReplicationTopicAnomalyClass));
                }
            }
            try {
                this._topicReplicationFactorMargin = Short.parseShort((String) map.get(TOPIC_REPLICATION_FACTOR_MARGIN_CONFIG));
            } catch (NullPointerException | NumberFormatException e) {
                this._topicReplicationFactorMargin = (short) 1;
            }
            if (this._topicReplicationFactorMargin < 0) {
                throw new IllegalArgumentException(String.format("%s config of replication factor anomaly finder should not be set to negative, provided %d.", TOPIC_REPLICATION_FACTOR_MARGIN_CONFIG, Short.valueOf(this._topicReplicationFactorMargin)));
            }
            try {
                this._topicMinISRRecordRetentionTimeMs = Long.parseLong((String) map.get(TOPIC_MIN_ISR_RECORD_RETENTION_TIME_MS_CONFIG));
            } catch (NullPointerException | NumberFormatException e2) {
                this._topicMinISRRecordRetentionTimeMs = DEFAULT_TOPIC_MIN_ISR_RECORD_RETENTION_TIME_MS;
            }
            if (this._topicMinISRRecordRetentionTimeMs <= 0) {
                throw new IllegalArgumentException(String.format("%s config of replication factor anomaly finder should be set to positive, provided %d.", TOPIC_MIN_ISR_RECORD_RETENTION_TIME_MS_CONFIG, Long.valueOf(this._topicMinISRRecordRetentionTimeMs)));
            }
            this._adminClient = this._kafkaCruiseControl.adminClient();
            this._cachedTopicMinISR = new LinkedHashMap();
        } catch (NumberFormatException e3) {
            throw new IllegalArgumentException("self.healing.target.topic.replication.factor is missing or misconfigured for topic replication factor anomaly finder.");
        }
    }
}
