/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.executor;

import com.linkedin.kafka.cruisecontrol.executor.ExecutionProposal;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTask;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskState;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorUtils;
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.log.LogConfig;
import kafka.server.ConfigType;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ReplicationThrottleHelper {
    private static final Logger LOG = LoggerFactory.getLogger(ReplicationThrottleHelper.class);
    static final String LEADER_THROTTLED_RATE = "leader.replication.throttled.rate";
    static final String FOLLOWER_THROTTLED_RATE = "follower.replication.throttled.rate";
    static final String LEADER_THROTTLED_REPLICAS = LogConfig.LeaderReplicationThrottledReplicasProp();
    static final String FOLLOWER_THROTTLED_REPLICAS = LogConfig.FollowerReplicationThrottledReplicasProp();
    private final KafkaZkClient _kafkaZkClient;
    private final AdminZkClient _adminZkClient;
    private final Long _throttleRate;

    ReplicationThrottleHelper(KafkaZkClient kafkaZkClient, Long throttleRate) {
        this._kafkaZkClient = kafkaZkClient;
        this._adminZkClient = new AdminZkClient(kafkaZkClient);
        this._throttleRate = throttleRate;
    }

    void setThrottles(List<ExecutionProposal> replicaMovementProposals) {
        if (this.throttlingEnabled()) {
            LOG.info("Setting a rebalance throttle of {} bytes/sec", (Object)this._throttleRate);
            Set<Integer> participatingBrokers = this.getParticipatingBrokers(replicaMovementProposals);
            Map<String, Set<String>> throttledReplicas = this.getThrottledReplicasByTopic(replicaMovementProposals);
            participatingBrokers.forEach(this::setLeaderThrottledRateIfUnset);
            participatingBrokers.forEach(this::setFollowerThrottledRateIfUnset);
            throttledReplicas.forEach(this::setLeaderThrottledReplicas);
            throttledReplicas.forEach(this::setFollowerThrottledReplicas);
        }
    }

    boolean shouldRemoveThrottleForTask(ExecutionTask task) {
        return task.state() != ExecutionTaskState.IN_PROGRESS && task.state() != ExecutionTaskState.PENDING && task.type() == ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION;
    }

    boolean taskIsInProgress(ExecutionTask task) {
        return task.state() == ExecutionTaskState.IN_PROGRESS && task.type() == ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION;
    }

    void clearThrottles(List<ExecutionTask> completedTasks, List<ExecutionTask> inProgressTasks) {
        if (this.throttlingEnabled()) {
            List<ExecutionProposal> completedProposals = completedTasks.stream().filter(this::shouldRemoveThrottleForTask).map(ExecutionTask::proposal).collect(Collectors.toList());
            Set<Integer> participatingBrokers = this.getParticipatingBrokers(completedProposals);
            List<ExecutionProposal> inProgressProposals = inProgressTasks.stream().filter(this::taskIsInProgress).map(ExecutionTask::proposal).collect(Collectors.toList());
            Set<Integer> brokersWithInProgressTasks = this.getParticipatingBrokers(inProgressProposals);
            TreeSet<Integer> brokersToRemoveThrottlesFrom = new TreeSet<Integer>(participatingBrokers);
            brokersToRemoveThrottlesFrom.removeAll(brokersWithInProgressTasks);
            LOG.info("Removing replica movement throttles from brokers in the cluster: {}", brokersToRemoveThrottlesFrom);
            brokersToRemoveThrottlesFrom.forEach(this::removeThrottledRateFromBroker);
            Map<String, Set<String>> throttledReplicas = this.getThrottledReplicasByTopic(completedProposals);
            throttledReplicas.forEach(this::removeThrottledReplicasFromTopic);
        }
    }

    private boolean throttlingEnabled() {
        return this._throttleRate != null;
    }

    private Set<Integer> getParticipatingBrokers(List<ExecutionProposal> replicaMovementProposals) {
        TreeSet<Integer> participatingBrokers = new TreeSet<Integer>();
        for (ExecutionProposal proposal : replicaMovementProposals) {
            participatingBrokers.addAll(proposal.oldReplicas().stream().map(ReplicaPlacementInfo::brokerId).collect(Collectors.toSet()));
            participatingBrokers.addAll(proposal.newReplicas().stream().map(ReplicaPlacementInfo::brokerId).collect(Collectors.toSet()));
        }
        return participatingBrokers;
    }

    private Map<String, Set<String>> getThrottledReplicasByTopic(List<ExecutionProposal> replicaMovementProposals) {
        HashMap<String, Set<String>> throttledReplicasByTopic = new HashMap<String, Set<String>>();
        for (ExecutionProposal proposal : replicaMovementProposals) {
            String topic = proposal.topic();
            int partitionId = proposal.partitionId();
            Stream<Integer> brokers = Stream.concat(proposal.oldReplicas().stream().map(ReplicaPlacementInfo::brokerId), proposal.replicasToAdd().stream().map(ReplicaPlacementInfo::brokerId));
            Set throttledReplicas = throttledReplicasByTopic.computeIfAbsent(topic, x -> new TreeSet());
            brokers.forEach(brokerId -> throttledReplicas.add(partitionId + ":" + brokerId));
        }
        return throttledReplicasByTopic;
    }

    private void setLeaderThrottledRateIfUnset(int brokerId) {
        this.setThrottledRateIfUnset(brokerId, LEADER_THROTTLED_RATE);
    }

    private void setFollowerThrottledRateIfUnset(int brokerId) {
        this.setThrottledRateIfUnset(brokerId, FOLLOWER_THROTTLED_RATE);
    }

    private void setThrottledRateIfUnset(int brokerId, String configKey) {
        assert (this._throttleRate != null);
        assert (configKey.equals(LEADER_THROTTLED_RATE) || configKey.equals(FOLLOWER_THROTTLED_RATE));
        Properties config = this._kafkaZkClient.getEntityConfigs(ConfigType.Broker(), String.valueOf(brokerId));
        Object oldThrottleRate = config.setProperty(configKey, String.valueOf(this._throttleRate));
        if (oldThrottleRate == null) {
            LOG.debug("Setting {} to {} bytes/second for broker {}", new Object[]{configKey, this._throttleRate, brokerId});
            ExecutorUtils.changeBrokerConfig(this._adminZkClient, brokerId, config);
        } else {
            LOG.debug("Not setting {} for broker {} because pre-existing throttle of {} was already set", new Object[]{configKey, brokerId, oldThrottleRate});
        }
    }

    private void setLeaderThrottledReplicas(String topic, Set<String> replicas) {
        this.setThrottledReplicas(topic, replicas, LEADER_THROTTLED_REPLICAS);
    }

    private void setFollowerThrottledReplicas(String topic, Set<String> replicas) {
        this.setThrottledReplicas(topic, replicas, FOLLOWER_THROTTLED_REPLICAS);
    }

    private void setThrottledReplicas(String topic, Set<String> replicas, String configKey) {
        assert (configKey.equals(LEADER_THROTTLED_REPLICAS) || configKey.equals(FOLLOWER_THROTTLED_REPLICAS));
        Properties config = this._kafkaZkClient.getEntityConfigs(ConfigType.Topic(), topic);
        TreeSet<String> newThrottledReplicas = new TreeSet<String>(replicas);
        String oldThrottledReplicas = config.getProperty(configKey);
        if (oldThrottledReplicas != null) {
            newThrottledReplicas.addAll(Arrays.asList(oldThrottledReplicas.split(",")));
        }
        config.setProperty(configKey, String.join((CharSequence)",", newThrottledReplicas));
        ExecutorUtils.changeTopicConfig(this._adminZkClient, topic, config);
    }

    static String removeReplicasFromConfig(String throttleConfig, Set<String> replicas) {
        ArrayList<String> throttles = new ArrayList<String>(Arrays.asList(throttleConfig.split(",")));
        throttles.removeIf(replicas::contains);
        return String.join((CharSequence)",", throttles);
    }

    private void removeLeaderThrottledReplicasFromTopic(Properties config, String topic, Set<String> replicas) {
        String oldLeaderThrottledReplicas = config.getProperty(LEADER_THROTTLED_REPLICAS);
        if (oldLeaderThrottledReplicas != null) {
            replicas.forEach(r -> LOG.debug("Removing leader throttles for topic {} on replica {}", (Object)topic, r));
            String newLeaderThrottledReplicas = ReplicationThrottleHelper.removeReplicasFromConfig(oldLeaderThrottledReplicas, replicas);
            if (newLeaderThrottledReplicas.isEmpty()) {
                config.remove(LEADER_THROTTLED_REPLICAS);
            } else {
                config.setProperty(LEADER_THROTTLED_REPLICAS, newLeaderThrottledReplicas);
            }
        }
    }

    private void removeFollowerThrottledReplicasFromTopic(Properties config, String topic, Set<String> replicas) {
        String oldLeaderThrottledReplicas = config.getProperty(FOLLOWER_THROTTLED_REPLICAS);
        if (oldLeaderThrottledReplicas != null) {
            replicas.forEach(r -> LOG.debug("Removing follower throttles for topic {} and replica {}", (Object)topic, r));
            String newLeaderThrottledReplicas = ReplicationThrottleHelper.removeReplicasFromConfig(oldLeaderThrottledReplicas, replicas);
            if (newLeaderThrottledReplicas.isEmpty()) {
                config.remove(FOLLOWER_THROTTLED_REPLICAS);
            } else {
                config.setProperty(FOLLOWER_THROTTLED_REPLICAS, newLeaderThrottledReplicas);
            }
        }
    }

    private void removeThrottledReplicasFromTopic(String topic, Set<String> replicas) {
        Properties config = this._kafkaZkClient.getEntityConfigs(ConfigType.Topic(), topic);
        this.removeLeaderThrottledReplicasFromTopic(config, topic, replicas);
        this.removeFollowerThrottledReplicasFromTopic(config, topic, replicas);
        ExecutorUtils.changeTopicConfig(this._adminZkClient, topic, config);
    }

    private void removeAllThrottledReplicasFromTopic(String topic) {
        Properties config = this._kafkaZkClient.getEntityConfigs(ConfigType.Topic(), topic);
        Object oldLeaderThrottle = config.remove(LEADER_THROTTLED_REPLICAS);
        Object oldFollowerThrottle = config.remove(FOLLOWER_THROTTLED_REPLICAS);
        if (oldLeaderThrottle != null) {
            LOG.debug("Removing leader throttled replicas for topic {}", (Object)topic);
        }
        if (oldFollowerThrottle != null) {
            LOG.debug("Removing follower throttled replicas for topic {}", (Object)topic);
        }
        if (oldLeaderThrottle != null || oldFollowerThrottle != null) {
            ExecutorUtils.changeTopicConfig(this._adminZkClient, topic, config);
        }
    }

    private void removeThrottledRateFromBroker(Integer brokerId) {
        Properties config = this._kafkaZkClient.getEntityConfigs(ConfigType.Broker(), String.valueOf(brokerId));
        Object oldLeaderThrottle = config.remove(LEADER_THROTTLED_RATE);
        Object oldFollowerThrottle = config.remove(FOLLOWER_THROTTLED_RATE);
        if (oldLeaderThrottle != null) {
            LOG.debug("Removing leader throttle on broker {}", (Object)brokerId);
        }
        if (oldFollowerThrottle != null) {
            LOG.debug("Removing follower throttle on broker {}", (Object)brokerId);
        }
        if (oldLeaderThrottle != null || oldFollowerThrottle != null) {
            ExecutorUtils.changeBrokerConfig(this._adminZkClient, brokerId, config);
        }
    }
}

