/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol;

import com.linkedin.kafka.cruisecontrol.analyzer.AnalyzerUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.PreferredLeaderElectionGoal;
import com.linkedin.kafka.cruisecontrol.config.EnvConfigProvider;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.exception.SamplingException;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner;
import java.io.FileInputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import kafka.log.LogConfig;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ReassignmentInProgressException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

public class KafkaCruiseControlUtils {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaCruiseControlUtils.class);
    public static final String ADMIN_CLIENT_CONFIG = "admin.client.object";
    public static final double MAX_BALANCEDNESS_SCORE = 100.0;
    public static final int ZK_SESSION_TIMEOUT = 120000;
    public static final int ZK_CONNECTION_TIMEOUT = 120000;
    public static final long KAFKA_ZK_CLIENT_CLOSE_TIMEOUT_MS = 10000L;
    public static final long ADMIN_CLIENT_CLOSE_TIMEOUT_MS = 10000L;
    public static final String DATE_FORMAT = "YYYY-MM-dd_HH:mm:ss z";
    public static final String DATE_FORMAT2 = "dd/MM/yyyy HH:mm:ss";
    public static final String TIME_ZONE = "UTC";
    public static final int SEC_TO_MS = 1000;
    private static final int MIN_TO_MS = 60000;
    private static final int HOUR_TO_MS = 3600000;
    private static final int DAY_TO_MS = 86400000;
    public static final String OPERATION_LOGGER = "operationLogger";
    public static final int REQUEST_VERSION_UPDATE = -1;
    public static final String ENV_CONFIG_PROVIDER_NAME = "env";
    public static final String ENV_CONFIG_PROVIDER_CLASS_CONFIG = ".env.class";
    public static final long CLIENT_REQUEST_TIMEOUT_MS = 30000L;
    public static final String DEFAULT_CLEANUP_POLICY = "delete";

    private KafkaCruiseControlUtils() {
    }

    public static String currentUtcDate() {
        return KafkaCruiseControlUtils.utcDateFor(System.currentTimeMillis());
    }

    public static String utcDateFor(long timeMs) {
        Date date = new Date(timeMs);
        SimpleDateFormat formatter = new SimpleDateFormat(DATE_FORMAT);
        formatter.setTimeZone(TimeZone.getTimeZone(TIME_ZONE));
        return formatter.format(date);
    }

    public static String toDateString(long time) {
        return KafkaCruiseControlUtils.toDateString(time, DATE_FORMAT2, "");
    }

    public static String toDateString(long time, String dateFormat, String timeZone) {
        if (time < 0L) {
            throw new IllegalArgumentException(String.format("Attempt to convert negative time %d to date.", time));
        }
        SimpleDateFormat formatter = new SimpleDateFormat(dateFormat);
        if (!timeZone.isEmpty()) {
            formatter.setTimeZone(TimeZone.getTimeZone(timeZone));
        }
        return formatter.format(new Date(time));
    }

    public static String toPrettyDuration(double durationMs) {
        if (durationMs < 0.0) {
            throw new IllegalArgumentException(String.format("Duration cannot be negative value, get %f", durationMs));
        }
        if (durationMs < 1000.0) {
            return String.format("%.2f milliseconds", durationMs);
        }
        if (durationMs < 60000.0) {
            return String.format("%.2f seconds", durationMs / 1000.0);
        }
        if (durationMs < 3600000.0) {
            return String.format("%.2f minutes", durationMs / 60000.0);
        }
        if (durationMs < 8.64E7) {
            return String.format("%.2f hours", durationMs / 3600000.0);
        }
        return String.format("%.2f days", durationMs / 8.64E7);
    }

    public static String getRequiredConfig(Map<String, ?> configs, String configName) {
        String value = (String)configs.get(configName);
        if (value == null || value.isEmpty()) {
            throw new ConfigException(String.format("Configuration %s must be provided.", configName));
        }
        return value;
    }

    public static boolean createTopic(AdminClient adminClient, NewTopic topicToBeCreated) {
        try {
            CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(topicToBeCreated));
            ((KafkaFuture)createTopicsResult.values().get(topicToBeCreated.name())).get(30000L, TimeUnit.MILLISECONDS);
            LOG.info("Topic {} has been created.", (Object)topicToBeCreated.name());
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            if (e.getCause() instanceof TopicExistsException) {
                return false;
            }
            throw new IllegalStateException(String.format("Unable to create topic %s.", topicToBeCreated.name()), e);
        }
        return true;
    }

    public static NewTopic wrapTopic(String topic, int partitionCount, short replicationFactor, long retentionMs) {
        if (partitionCount <= 0 || replicationFactor <= 0 || retentionMs <= 0L) {
            throw new IllegalArgumentException(String.format("Partition count (%d), replication factor (%d), and retention ms (%d) must be positive for the topic (%s).", partitionCount, replicationFactor, retentionMs, topic));
        }
        NewTopic newTopic = new NewTopic(topic, partitionCount, replicationFactor);
        HashMap<String, String> config = new HashMap<String, String>(2);
        config.put(LogConfig.RetentionMsProp(), Long.toString(retentionMs));
        config.put(LogConfig.CleanupPolicyProp(), DEFAULT_CLEANUP_POLICY);
        newTopic.configs(config);
        return newTopic;
    }

    private static void maybeUpdateConfig(Set<AlterConfigOp> configsToAlter, Map<String, String> desiredConfig, Config currentConfig) {
        for (Map.Entry<String, String> entry : desiredConfig.entrySet()) {
            String configName = entry.getKey();
            String targetConfigValue = entry.getValue();
            ConfigEntry currentConfigEntry = currentConfig.get(configName);
            if (currentConfigEntry != null && currentConfigEntry.value().equals(targetConfigValue)) continue;
            configsToAlter.add(new AlterConfigOp(new ConfigEntry(configName, targetConfigValue), AlterConfigOp.OpType.SET));
        }
    }

    public static boolean maybeUpdateTopicConfig(AdminClient adminClient, NewTopic topicToUpdateConfigs) {
        Config topicConfig;
        String topicName = topicToUpdateConfigs.name();
        ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
        DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Collections.singleton(topicResource));
        try {
            topicConfig = (Config)((KafkaFuture)describeConfigsResult.values().get(topicResource)).get(30000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            LOG.warn("Config check for topic {} failed due to failure to describe its configs.", (Object)topicName, (Object)e);
            return false;
        }
        Map desiredConfig = topicToUpdateConfigs.configs();
        if (desiredConfig != null) {
            HashSet<AlterConfigOp> alterConfigOps = new HashSet<AlterConfigOp>(desiredConfig.size());
            KafkaCruiseControlUtils.maybeUpdateConfig(alterConfigOps, desiredConfig, topicConfig);
            if (!alterConfigOps.isEmpty()) {
                AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(Collections.singletonMap(topicResource, alterConfigOps));
                try {
                    ((KafkaFuture)alterConfigsResult.values().get(topicResource)).get(30000L, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException | ExecutionException | TimeoutException e) {
                    LOG.warn("Config change for topic {} failed.", (Object)topicName, (Object)e);
                    return false;
                }
            }
        }
        return true;
    }

    public static boolean maybeIncreasePartitionCount(AdminClient adminClient, NewTopic topicToAddPartitions) {
        TopicDescription topicDescription;
        String topicName = topicToAddPartitions.name();
        try {
            topicDescription = (TopicDescription)((KafkaFuture)adminClient.describeTopics(Collections.singletonList(topicName)).values().get(topicName)).get(30000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            LOG.warn("Partition count increase check for topic {} failed due to failure to describe cluster.", (Object)topicName, (Object)e);
            return false;
        }
        if (topicDescription.partitions().size() < topicToAddPartitions.numPartitions()) {
            CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(Collections.singletonMap(topicName, NewPartitions.increaseTo((int)topicToAddPartitions.numPartitions())));
            try {
                ((KafkaFuture)createPartitionsResult.values().get(topicName)).get(30000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                LOG.warn("Partition count increase to {} for topic {} failed{}.", new Object[]{topicToAddPartitions.numPartitions(), topicName, e.getCause() instanceof ReassignmentInProgressException ? " due to ongoing reassignment" : "", e});
                return false;
            }
        }
        return true;
    }

    public static void sanityCheckOffsetFetch(Map<TopicPartition, Long> endOffsets, Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes) throws SamplingException {
        HashSet<TopicPartition> failedToFetchOffsets = new HashSet<TopicPartition>();
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetsForTimes.entrySet()) {
            if (entry.getValue() != null || endOffsets.get(entry.getKey()) != null) continue;
            failedToFetchOffsets.add(entry.getKey());
        }
        if (!failedToFetchOffsets.isEmpty()) {
            throw new SamplingException(String.format("Consumer failed to fetch offsets for %s. Consider decreasing reconnect.backoff.ms to mitigate consumption failures due to transient network issues.", failedToFetchOffsets));
        }
    }

    public static <K, V> boolean consumptionDone(Consumer<K, V> consumer, Map<TopicPartition, Long> offsets) {
        HashSet partitionsNotPaused = new HashSet(consumer.assignment());
        partitionsNotPaused.removeAll(consumer.paused());
        for (TopicPartition tp : partitionsNotPaused) {
            if (consumer.position(tp) >= offsets.get(tp)) continue;
            return false;
        }
        return true;
    }

    public static void sanityCheckGoals(List<String> goals, boolean skipHardGoalCheck, KafkaCruiseControlConfig config) {
        if (!(goals == null || goals.isEmpty() || skipHardGoalCheck || goals.size() == 1 && goals.get(0).equals(PreferredLeaderElectionGoal.class.getSimpleName()))) {
            KafkaCruiseControlUtils.sanityCheckNonExistingGoal(goals, AnalyzerUtils.getCaseInsensitiveGoalsByName(config));
            Set<String> hardGoals = KafkaCruiseControlUtils.hardGoals(config);
            if (!goals.containsAll(hardGoals)) {
                throw new IllegalArgumentException(String.format("Missing hard goals %s in the provided goals: %s. Add %s=true parameter to ignore this sanity check.", hardGoals, goals, "skip_hard_goal_check"));
            }
        }
    }

    public static Set<String> hardGoals(KafkaCruiseControlConfig config) {
        return config.getList("hard.goals").stream().map(goalName -> goalName.substring(goalName.lastIndexOf(".") + 1)).collect(Collectors.toSet());
    }

    public static void sanityCheckLoadMonitorReadiness(ModelCompletenessRequirements completenessRequirements, LoadMonitorTaskRunner.LoadMonitorTaskRunnerState loadMonitorTaskRunnerState) {
        if (completenessRequirements.minRequiredNumWindows() > 0 && loadMonitorTaskRunnerState == LoadMonitorTaskRunner.LoadMonitorTaskRunnerState.LOADING) {
            throw new IllegalStateException("Unable to generate proposal since load monitor is in " + LoadMonitorTaskRunner.LoadMonitorTaskRunnerState.LOADING + " state.");
        }
    }

    public static List<Goal> goalsByPriority(List<String> goals, KafkaCruiseControlConfig config) {
        if (goals == null || goals.isEmpty()) {
            return AnalyzerUtils.getGoalsByPriority(config);
        }
        Map<String, Goal> allGoals = AnalyzerUtils.getCaseInsensitiveGoalsByName(config);
        KafkaCruiseControlUtils.sanityCheckNonExistingGoal(goals, allGoals);
        return goals.stream().map(allGoals::get).collect(Collectors.toList());
    }

    private static void closeClientWithTimeout(Runnable clientCloseTask, long timeoutMs) {
        Thread t = new Thread(clientCloseTask);
        t.setDaemon(true);
        t.start();
        try {
            t.join(timeoutMs);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        if (t.isAlive()) {
            t.interrupt();
        }
    }

    public static void closeKafkaZkClientWithTimeout(KafkaZkClient kafkaZkClient) {
        KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient, 10000L);
    }

    public static void closeKafkaZkClientWithTimeout(KafkaZkClient kafkaZkClient, long timeoutMs) {
        KafkaCruiseControlUtils.closeClientWithTimeout(() -> ((KafkaZkClient)kafkaZkClient).close(), timeoutMs);
    }

    public static boolean containsAny(Set<Integer> a, Set<Integer> b) {
        return b.stream().mapToInt(i -> i).anyMatch(a::contains);
    }

    public static KafkaZkClient createKafkaZkClient(String connectString, String metricGroup, String metricType, boolean zkSecurityEnabled) {
        String zooKeeperClientName = String.format("%s-%s", metricGroup, metricType);
        return KafkaZkClient.apply((String)connectString, (boolean)zkSecurityEnabled, (int)120000, (int)120000, (int)Integer.MAX_VALUE, (Time)new SystemTime(), (String)metricGroup, (String)metricType, (Option)Option.apply((Object)zooKeeperClientName), (Option)Option.empty());
    }

    public static AdminClient createAdminClient(Map<String, Object> adminClientConfigs) {
        return AdminClient.create(adminClientConfigs);
    }

    public static void closeAdminClientWithTimeout(AdminClient adminClient) {
        KafkaCruiseControlUtils.closeAdminClientWithTimeout(adminClient, 10000L);
    }

    public static void closeAdminClientWithTimeout(AdminClient adminClient, long timeoutMs) {
        KafkaCruiseControlUtils.closeClientWithTimeout(() -> {
            try {
                adminClient.close();
            }
            catch (Exception e) {
                throw new IllegalStateException("Failed to close the Admin Client.", e);
            }
        }, timeoutMs);
    }

    public static Map<String, Object> parseAdminClientConfigs(KafkaCruiseControlConfig configs) {
        HashMap<String, Object> adminClientConfigs = new HashMap<String, Object>();
        List bootstrapServers = configs.getList("bootstrap.servers");
        String bootstrapServersString = bootstrapServers.toString().replace(" ", "").replace("[", "").replace("]", "");
        adminClientConfigs.put("bootstrap.servers", bootstrapServersString);
        try {
            String securityProtocol = configs.getString("security.protocol");
            adminClientConfigs.put("security.protocol", securityProtocol);
            KafkaCruiseControlUtils.setStringConfigIfExists(configs, adminClientConfigs, "sasl.mechanism");
            KafkaCruiseControlUtils.setClassConfigIfExists(configs, adminClientConfigs, "sasl.login.callback.handler.class");
            KafkaCruiseControlUtils.setPasswordConfigIfExists(configs, adminClientConfigs, "sasl.jaas.config");
            if (securityProtocol.equals(SecurityProtocol.SSL.name) || securityProtocol.equals(SecurityProtocol.SASL_SSL.name)) {
                KafkaCruiseControlUtils.setStringConfigIfExists(configs, adminClientConfigs, "ssl.trustmanager.algorithm");
                KafkaCruiseControlUtils.setStringConfigIfExists(configs, adminClientConfigs, "ssl.keymanager.algorithm");
                KafkaCruiseControlUtils.setStringConfigIfExists(configs, adminClientConfigs, "ssl.keystore.type");
                KafkaCruiseControlUtils.setStringConfigIfExists(configs, adminClientConfigs, "ssl.keystore.location");
                KafkaCruiseControlUtils.setStringConfigIfExists(configs, adminClientConfigs, "ssl.truststore.type");
                KafkaCruiseControlUtils.setStringConfigIfExists(configs, adminClientConfigs, "ssl.truststore.location");
                KafkaCruiseControlUtils.setStringConfigIfExists(configs, adminClientConfigs, "ssl.secure.random.implementation");
                KafkaCruiseControlUtils.setStringConfigIfExists(configs, adminClientConfigs, "ssl.endpoint.identification.algorithm");
                KafkaCruiseControlUtils.setPasswordConfigIfExists(configs, adminClientConfigs, "ssl.keystore.password");
                KafkaCruiseControlUtils.setPasswordConfigIfExists(configs, adminClientConfigs, "ssl.key.password");
                KafkaCruiseControlUtils.setPasswordConfigIfExists(configs, adminClientConfigs, "ssl.truststore.password");
            }
        }
        catch (ConfigException configException) {
            // empty catch block
        }
        return adminClientConfigs;
    }

    public static MetadataResponse prepareMetadataResponse(List<Node> brokers, String clusterId, int controllerId, List<MetadataResponse.TopicMetadata> topicMetadataList) {
        MetadataResponseData responseData = new MetadataResponseData();
        responseData.setThrottleTimeMs(0);
        brokers.forEach(broker -> responseData.brokers().add((ImplicitLinkedHashCollection.Element)new MetadataResponseData.MetadataResponseBroker().setNodeId(broker.id()).setHost(broker.host()).setPort(broker.port()).setRack(broker.rack())));
        responseData.setClusterId(clusterId);
        responseData.setControllerId(controllerId);
        responseData.setClusterAuthorizedOperations(Integer.MIN_VALUE);
        topicMetadataList.forEach(topicMetadata -> responseData.topics().add((ImplicitLinkedHashCollection.Element)KafkaCruiseControlUtils.prepareMetadataResponseTopic(topicMetadata)));
        return new MetadataResponse(responseData);
    }

    private static MetadataResponseData.MetadataResponseTopic prepareMetadataResponseTopic(MetadataResponse.TopicMetadata topicMetadata) {
        MetadataResponseData.MetadataResponseTopic metadataResponseTopic = new MetadataResponseData.MetadataResponseTopic();
        metadataResponseTopic.setErrorCode(topicMetadata.error().code()).setName(topicMetadata.topic()).setIsInternal(topicMetadata.isInternal()).setTopicAuthorizedOperations(topicMetadata.authorizedOperations());
        for (MetadataResponse.PartitionMetadata partitionMetadata : topicMetadata.partitionMetadata()) {
            metadataResponseTopic.partitions().add(new MetadataResponseData.MetadataResponsePartition().setErrorCode(partitionMetadata.error.code()).setPartitionIndex(partitionMetadata.partition()).setLeaderId(partitionMetadata.leaderId.orElse(-1).intValue()).setLeaderEpoch(partitionMetadata.leaderEpoch.orElse(-1).intValue()).setReplicaNodes(partitionMetadata.replicaIds).setIsrNodes(partitionMetadata.inSyncReplicaIds).setOfflineReplicas(partitionMetadata.offlineReplicaIds));
        }
        return metadataResponseTopic;
    }

    private static void setPasswordConfigIfExists(KafkaCruiseControlConfig configs, Map<String, Object> props, String name) {
        try {
            props.put(name, configs.getPassword(name));
        }
        catch (ConfigException configException) {
            // empty catch block
        }
    }

    private static void setStringConfigIfExists(KafkaCruiseControlConfig configs, Map<String, Object> props, String name) {
        try {
            props.put(name, configs.getString(name));
        }
        catch (ConfigException configException) {
            // empty catch block
        }
    }

    private static void setClassConfigIfExists(KafkaCruiseControlConfig configs, Map<String, Object> props, String name) {
        try {
            props.put(name, configs.getClass(name));
        }
        catch (ConfigException configException) {
            // empty catch block
        }
    }

    public static boolean isPartitionUnderReplicated(Cluster cluster, TopicPartition tp) {
        PartitionInfo partitionInfo = cluster.partition(tp);
        return partitionInfo.inSyncReplicas().length != partitionInfo.replicas().length;
    }

    public static <E> void ensureDisjoint(Set<E> set1, Set<E> set2, String message) {
        HashSet<E> interSection = new HashSet<E>(set1);
        interSection.retainAll(set2);
        if (!interSection.isEmpty()) {
            throw new IllegalStateException(message);
        }
    }

    public static void sanityCheckNonExistingGoal(List<String> goals, Map<String, Goal> supportedGoals) {
        HashSet nonExistingGoals = new HashSet();
        goals.stream().filter(goalName -> supportedGoals.get(goalName) == null).forEach(nonExistingGoals::add);
        if (!nonExistingGoals.isEmpty()) {
            throw new IllegalArgumentException("Goals " + nonExistingGoals + " are not supported. Supported: " + supportedGoals.keySet());
        }
    }

    public static Map<String, Double> balancednessCostByGoal(List<Goal> goals, double priorityWeight, double strictnessWeight) {
        if (goals.isEmpty()) {
            throw new IllegalArgumentException("At least one goal must be provided to get the balancedness cost.");
        }
        if (priorityWeight <= 0.0 || strictnessWeight <= 0.0) {
            throw new IllegalArgumentException(String.format("Balancedness weights must be positive (priority:%f, strictness:%f).", priorityWeight, strictnessWeight));
        }
        HashMap<String, Double> balancednessCostByGoal = new HashMap<String, Double>(goals.size());
        double weightSum = 0.0;
        double previousGoalPriorityWeight = 1.0 / priorityWeight;
        for (int i = goals.size() - 1; i >= 0; --i) {
            Goal goal = goals.get(i);
            double currentGoalPriorityWeight = priorityWeight * previousGoalPriorityWeight;
            double cost = currentGoalPriorityWeight * (goal.isHardGoal() ? strictnessWeight : 1.0);
            weightSum += cost;
            balancednessCostByGoal.put(goal.name(), cost);
            previousGoalPriorityWeight = currentGoalPriorityWeight;
        }
        for (Map.Entry entry : balancednessCostByGoal.entrySet()) {
            entry.setValue(100.0 * (Double)entry.getValue() / weightSum);
        }
        return balancednessCostByGoal;
    }

    public static KafkaCruiseControlConfig readConfig(String propertiesFile) throws IOException {
        Properties props = new Properties();
        try (FileInputStream propStream = new FileInputStream(propertiesFile);){
            props.put("config.providers", ENV_CONFIG_PROVIDER_NAME);
            props.put("config.providers.env.class", EnvConfigProvider.class.getName());
            props.load(propStream);
        }
        return new KafkaCruiseControlConfig(props);
    }
}

