/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.common.config.ConfigDef;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfigUtils;
import com.linkedin.kafka.cruisecontrol.detector.TopicAnomaly;
import com.linkedin.kafka.cruisecontrol.detector.TopicAnomalyFinder;
import com.linkedin.kafka.cruisecontrol.detector.TopicReplicationFactorAnomaly;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicReplicationFactorAnomalyFinder
implements TopicAnomalyFinder {
    private static final Logger LOG = LoggerFactory.getLogger(TopicAnomalyFinder.class);
    public static final String SELF_HEALING_TARGET_TOPIC_REPLICATION_FACTOR_CONFIG = "self.healing.target.topic.replication.factor";
    public static final String TOPIC_EXCLUDED_FROM_REPLICATION_FACTOR_CHECK = "topic.excluded.from.replication.factor.check";
    public static final String DEFAULT_TOPIC_EXCLUDED_FROM_REPLICATION_FACTOR_CHECK = "";
    public static final String TOPIC_REPLICATION_FACTOR_ANOMALY_CLASS_CONFIG = "topic.replication.topic.anomaly.class";
    public static final Class<?> DEFAULT_TOPIC_REPLICATION_FACTOR_ANOMALY_CLASS = TopicReplicationFactorAnomaly.class;
    public static final String BAD_TOPICS_BY_REPLICATION_FACTOR_CONFIG = "bad.topics.by.replication.factor";
    public static final String TOPIC_REPLICATION_FACTOR_MARGIN_CONFIG = "topic.replication.factor.margin";
    public static final short DEFAULT_TOPIC_REPLICATION_FACTOR_MARGIN = 1;
    public static final String TOPIC_MIN_ISR_RECORD_RETENTION_TIME_MS_CONFIG = "topic.min.isr.record.retention.time.ms";
    public static final long DEFAULT_TOPIC_MIN_ISR_RECORD_RETENTION_TIME_MS = 43200000L;
    public static final long DESCRIBE_TOPIC_CONFIG_TIMEOUT_MS = 100000L;
    private KafkaCruiseControl _kafkaCruiseControl;
    private short _targetReplicationFactor;
    private Pattern _topicExcludedFromCheck;
    private Class<?> _topicReplicationTopicAnomalyClass;
    private AdminClient _adminClient;
    private short _topicReplicationFactorMargin;
    private long _topicMinISRRecordRetentionTimeMs;
    private Map<String, TopicMinISREntry> _cachedTopicMinISR;

    public TopicReplicationFactorAnomalyFinder() {
    }

    TopicReplicationFactorAnomalyFinder(KafkaCruiseControl kafkaCruiseControl, short targetReplicationFactor, AdminClient adminClient) {
        this._kafkaCruiseControl = kafkaCruiseControl;
        this._targetReplicationFactor = targetReplicationFactor;
        this._topicExcludedFromCheck = Pattern.compile(DEFAULT_TOPIC_EXCLUDED_FROM_REPLICATION_FACTOR_CHECK);
        this._topicReplicationTopicAnomalyClass = DEFAULT_TOPIC_REPLICATION_FACTOR_ANOMALY_CLASS;
        this._topicReplicationFactorMargin = 1;
        this._topicMinISRRecordRetentionTimeMs = 43200000L;
        this._adminClient = adminClient;
        this._cachedTopicMinISR = new LinkedHashMap<String, TopicMinISREntry>();
    }

    @Override
    public Set<TopicAnomaly> topicAnomalies() {
        LOG.info("Start to detect topic replication factor anomaly.");
        Cluster cluster = this._kafkaCruiseControl.kafkaCluster();
        HashSet<String> topicsToCheck = new HashSet<String>();
        block0: for (String topic : cluster.topics()) {
            if (this._topicExcludedFromCheck.matcher(topic).matches()) continue;
            for (PartitionInfo partition : cluster.partitionsForTopic(topic)) {
                if (partition.replicas().length == this._targetReplicationFactor) continue;
                topicsToCheck.add(topic);
                continue block0;
            }
        }
        this.refreshTopicMinISRCache();
        if (!topicsToCheck.isEmpty()) {
            this.maybeRetrieveAndCacheTopicMinISR(topicsToCheck);
            Map<Short, Set<TopicReplicationFactorAnomaly.TopicReplicationFactorAnomalyEntry>> badTopicsByReplicationFactor = this.populateBadTopicsByReplicationFactor(topicsToCheck, cluster);
            if (!badTopicsByReplicationFactor.isEmpty()) {
                return Collections.singleton(this.createTopicReplicationFactorAnomaly(badTopicsByReplicationFactor, this._targetReplicationFactor));
            }
        }
        return Collections.emptySet();
    }

    private void maybeRetrieveAndCacheTopicMinISR(Set<String> topicsToCheck) {
        HashSet topicResourcesToCheck = new HashSet(topicsToCheck.size());
        topicsToCheck.stream().filter(t -> !this._cachedTopicMinISR.containsKey(t)).forEach(t -> topicResourcesToCheck.add(new ConfigResource(ConfigResource.Type.TOPIC, t)));
        if (topicResourcesToCheck.isEmpty()) {
            return;
        }
        for (Map.Entry entry : this._adminClient.describeConfigs(topicResourcesToCheck).values().entrySet()) {
            try {
                short topicMinISR = Short.parseShort(((Config)((KafkaFuture)entry.getValue()).get(100000L, TimeUnit.MILLISECONDS)).get("min.insync.replicas").value());
                this._cachedTopicMinISR.put(((ConfigResource)entry.getKey()).name(), new TopicMinISREntry(topicMinISR, System.currentTimeMillis()));
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                LOG.warn("Skip attempt to fix replication factor of topic {} due to unable to retrieve its minISR config.", (Object)((ConfigResource)entry.getKey()).name());
            }
        }
    }

    private Map<Short, Set<TopicReplicationFactorAnomaly.TopicReplicationFactorAnomalyEntry>> populateBadTopicsByReplicationFactor(Set<String> topicsToCheck, Cluster cluster) {
        HashMap<Short, Set<TopicReplicationFactorAnomaly.TopicReplicationFactorAnomalyEntry>> topicsByReplicationFactor = new HashMap<Short, Set<TopicReplicationFactorAnomaly.TopicReplicationFactorAnomalyEntry>>();
        for (String topic : topicsToCheck) {
            if (!this._cachedTopicMinISR.containsKey(topic)) continue;
            short topicMinISR = this._cachedTopicMinISR.get(topic).minISR();
            short targetReplicationFactor = (short)Math.max(this._targetReplicationFactor, topicMinISR + this._topicReplicationFactorMargin);
            int violatedPartitionCount = 0;
            for (PartitionInfo partitionInfo : cluster.partitionsForTopic(topic)) {
                if (partitionInfo.replicas().length == targetReplicationFactor) continue;
                ++violatedPartitionCount;
            }
            if (violatedPartitionCount <= 0) continue;
            topicsByReplicationFactor.putIfAbsent(targetReplicationFactor, new HashSet());
            ((Set)topicsByReplicationFactor.get(targetReplicationFactor)).add(new TopicReplicationFactorAnomaly.TopicReplicationFactorAnomalyEntry(topic, (double)violatedPartitionCount / (double)cluster.partitionCountForTopic(topic).intValue()));
        }
        return topicsByReplicationFactor;
    }

    private void refreshTopicMinISRCache() {
        Map.Entry<String, TopicMinISREntry> entry;
        long currentTimeMs = System.currentTimeMillis();
        Iterator<Map.Entry<String, TopicMinISREntry>> cacheIterator = this._cachedTopicMinISR.entrySet().iterator();
        while (cacheIterator.hasNext() && (entry = cacheIterator.next()).getValue().createTimeMs() + this._topicMinISRRecordRetentionTimeMs < currentTimeMs) {
            cacheIterator.remove();
        }
    }

    private TopicAnomaly createTopicReplicationFactorAnomaly(Map<Short, Set<TopicReplicationFactorAnomaly.TopicReplicationFactorAnomalyEntry>> badTopicsByReplicationFactor, short targetReplicationFactor) {
        HashMap<String, Object> configs = new HashMap<String, Object>(4);
        configs.put("kafka.cruise.control.object", this._kafkaCruiseControl);
        configs.put(BAD_TOPICS_BY_REPLICATION_FACTOR_CONFIG, badTopicsByReplicationFactor);
        configs.put(SELF_HEALING_TARGET_TOPIC_REPLICATION_FACTOR_CONFIG, targetReplicationFactor);
        configs.put("anomaly.detection.time.ms.object", this._kafkaCruiseControl.timeMs());
        return KafkaCruiseControlConfigUtils.getConfiguredInstance(this._topicReplicationTopicAnomalyClass, TopicAnomaly.class, configs);
    }

    public void configure(Map<String, ?> configs) {
        this._kafkaCruiseControl = (KafkaCruiseControl)configs.get("kafka.cruise.control.object");
        if (this._kafkaCruiseControl == null) {
            throw new IllegalArgumentException("Topic replication factor anomaly finder is missing kafka.cruise.control.object");
        }
        try {
            this._targetReplicationFactor = Short.parseShort((String)configs.get(SELF_HEALING_TARGET_TOPIC_REPLICATION_FACTOR_CONFIG));
            if (this._targetReplicationFactor <= 0) {
                throw new IllegalArgumentException(String.format("%s config of replication factor anomaly finder should be set to positive, provided %d.", SELF_HEALING_TARGET_TOPIC_REPLICATION_FACTOR_CONFIG, this._targetReplicationFactor));
            }
        }
        catch (NumberFormatException e) {
            throw new IllegalArgumentException("self.healing.target.topic.replication.factor is missing or misconfigured for topic replication factor anomaly finder.");
        }
        String topicExcludedFromCheck = (String)configs.get(TOPIC_EXCLUDED_FROM_REPLICATION_FACTOR_CHECK);
        this._topicExcludedFromCheck = Pattern.compile(topicExcludedFromCheck == null ? DEFAULT_TOPIC_EXCLUDED_FROM_REPLICATION_FACTOR_CHECK : topicExcludedFromCheck);
        String topicReplicationTopicAnomalyClass = (String)configs.get(TOPIC_REPLICATION_FACTOR_ANOMALY_CLASS_CONFIG);
        if (topicReplicationTopicAnomalyClass == null) {
            this._topicReplicationTopicAnomalyClass = DEFAULT_TOPIC_REPLICATION_FACTOR_ANOMALY_CLASS;
        } else {
            this._topicReplicationTopicAnomalyClass = (Class)ConfigDef.parseType((String)TOPIC_REPLICATION_FACTOR_ANOMALY_CLASS_CONFIG, (Object)topicReplicationTopicAnomalyClass, (ConfigDef.Type)ConfigDef.Type.CLASS);
            if (this._topicReplicationTopicAnomalyClass == null || !TopicAnomaly.class.isAssignableFrom(this._topicReplicationTopicAnomalyClass)) {
                throw new IllegalArgumentException(String.format("Invalid %s is provided to replication factor anomaly finder, provided %s", TOPIC_REPLICATION_FACTOR_ANOMALY_CLASS_CONFIG, this._topicReplicationTopicAnomalyClass));
            }
        }
        try {
            this._topicReplicationFactorMargin = Short.parseShort((String)configs.get(TOPIC_REPLICATION_FACTOR_MARGIN_CONFIG));
            if (this._topicReplicationFactorMargin < 0) {
                throw new IllegalArgumentException(String.format("%s config of replication factor anomaly finder should not be set to negative, provided %d.", TOPIC_REPLICATION_FACTOR_MARGIN_CONFIG, this._topicReplicationFactorMargin));
            }
        }
        catch (NullPointerException | NumberFormatException e) {
            this._topicReplicationFactorMargin = 1;
        }
        try {
            this._topicMinISRRecordRetentionTimeMs = Long.parseLong((String)configs.get(TOPIC_MIN_ISR_RECORD_RETENTION_TIME_MS_CONFIG));
            if (this._topicMinISRRecordRetentionTimeMs <= 0L) {
                throw new IllegalArgumentException(String.format("%s config of replication factor anomaly finder should be set to positive, provided %d.", TOPIC_MIN_ISR_RECORD_RETENTION_TIME_MS_CONFIG, this._topicMinISRRecordRetentionTimeMs));
            }
        }
        catch (NullPointerException | NumberFormatException e) {
            this._topicMinISRRecordRetentionTimeMs = 43200000L;
        }
        this._adminClient = this._kafkaCruiseControl.adminClient();
        this._cachedTopicMinISR = new LinkedHashMap<String, TopicMinISREntry>();
    }

    private static class TopicMinISREntry {
        private final short _minISR;
        private final long _createTimeMs;

        TopicMinISREntry(short minISR, long createTimeMs) {
            this._minISR = minISR;
            this._createTimeMs = createTimeMs;
        }

        short minISR() {
            return this._minISR;
        }

        long createTimeMs() {
            return this._createTimeMs;
        }
    }
}

