package com.linkedin.kafka.cruisecontrol.executor;

import com.linkedin.kafka.cruisecontrol.executor.ExecutionTask;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.log.LogConfig;
import kafka.server.ConfigType;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.class */
class ReplicationThrottleHelper {
    private static final Logger LOG;
    static final String LEADER_THROTTLED_RATE = "leader.replication.throttled.rate";
    static final String FOLLOWER_THROTTLED_RATE = "follower.replication.throttled.rate";
    static final String LEADER_THROTTLED_REPLICAS;
    static final String FOLLOWER_THROTTLED_REPLICAS;
    private final KafkaZkClient _kafkaZkClient;
    private final AdminZkClient _adminZkClient;
    private final Long _throttleRate;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReplicationThrottleHelper(KafkaZkClient kafkaZkClient, Long l) {
        this._kafkaZkClient = kafkaZkClient;
        this._adminZkClient = new AdminZkClient(kafkaZkClient);
        this._throttleRate = l;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setThrottles(List<ExecutionProposal> list) {
        if (throttlingEnabled()) {
            LOG.info("Setting a rebalance throttle of {} bytes/sec", this._throttleRate);
            Set<Integer> participatingBrokers = getParticipatingBrokers(list);
            Map<String, Set<String>> throttledReplicasByTopic = getThrottledReplicasByTopic(list);
            participatingBrokers.forEach((v1) -> {
                setLeaderThrottledRateIfUnset(v1);
            });
            participatingBrokers.forEach((v1) -> {
                setFollowerThrottledRateIfUnset(v1);
            });
            throttledReplicasByTopic.forEach(this::setLeaderThrottledReplicas);
            throttledReplicasByTopic.forEach(this::setFollowerThrottledReplicas);
        }
    }

    boolean shouldRemoveThrottleForTask(ExecutionTask executionTask) {
        return (executionTask.state() == ExecutionTaskState.IN_PROGRESS || executionTask.state() == ExecutionTaskState.PENDING || executionTask.type() != ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION) ? false : true;
    }

    boolean taskIsInProgress(ExecutionTask executionTask) {
        return executionTask.state() == ExecutionTaskState.IN_PROGRESS && executionTask.type() == ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clearThrottles(List<ExecutionTask> list, List<ExecutionTask> list2) {
        if (throttlingEnabled()) {
            List<ExecutionProposal> list3 = (List) list.stream().filter(this::shouldRemoveThrottleForTask).map((v0) -> {
                return v0.proposal();
            }).collect(Collectors.toList());
            Set<Integer> participatingBrokers = getParticipatingBrokers(list3);
            Set<Integer> participatingBrokers2 = getParticipatingBrokers((List) list2.stream().filter(this::taskIsInProgress).map((v0) -> {
                return v0.proposal();
            }).collect(Collectors.toList()));
            TreeSet treeSet = new TreeSet(participatingBrokers);
            treeSet.removeAll(participatingBrokers2);
            LOG.info("Removing replica movement throttles from brokers in the cluster: {}", treeSet);
            treeSet.forEach(this::removeThrottledRateFromBroker);
            getThrottledReplicasByTopic(list3).forEach(this::removeThrottledReplicasFromTopic);
        }
    }

    private boolean throttlingEnabled() {
        return this._throttleRate != null;
    }

    private Set<Integer> getParticipatingBrokers(List<ExecutionProposal> list) {
        TreeSet treeSet = new TreeSet();
        for (ExecutionProposal executionProposal : list) {
            treeSet.addAll((Collection) executionProposal.oldReplicas().stream().map((v0) -> {
                return v0.brokerId();
            }).collect(Collectors.toSet()));
            treeSet.addAll((Collection) executionProposal.newReplicas().stream().map((v0) -> {
                return v0.brokerId();
            }).collect(Collectors.toSet()));
        }
        return treeSet;
    }

    private Map<String, Set<String>> getThrottledReplicasByTopic(List<ExecutionProposal> list) {
        HashMap hashMap = new HashMap();
        for (ExecutionProposal executionProposal : list) {
            String str = executionProposal.topic();
            int partitionId = executionProposal.partitionId();
            Stream concat = Stream.concat(executionProposal.oldReplicas().stream().map((v0) -> {
                return v0.brokerId();
            }), executionProposal.replicasToAdd().stream().map((v0) -> {
                return v0.brokerId();
            }));
            Set set = (Set) hashMap.computeIfAbsent(str, str2 -> {
                return new TreeSet();
            });
            concat.forEach(num -> {
                set.add(partitionId + ":" + num);
            });
        }
        return hashMap;
    }

    private void setLeaderThrottledRateIfUnset(int i) {
        setThrottledRateIfUnset(i, LEADER_THROTTLED_RATE);
    }

    private void setFollowerThrottledRateIfUnset(int i) {
        setThrottledRateIfUnset(i, FOLLOWER_THROTTLED_RATE);
    }

    private void setThrottledRateIfUnset(int i, String str) {
        if (!$assertionsDisabled && this._throttleRate == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !str.equals(LEADER_THROTTLED_RATE) && !str.equals(FOLLOWER_THROTTLED_RATE)) {
            throw new AssertionError();
        }
        Properties entityConfigs = this._kafkaZkClient.getEntityConfigs(ConfigType.Broker(), String.valueOf(i));
        Object property = entityConfigs.setProperty(str, String.valueOf(this._throttleRate));
        if (property != null) {
            LOG.debug("Not setting {} for broker {} because pre-existing throttle of {} was already set", new Object[]{str, Integer.valueOf(i), property});
        } else {
            LOG.debug("Setting {} to {} bytes/second for broker {}", new Object[]{str, this._throttleRate, Integer.valueOf(i)});
            ExecutorUtils.changeBrokerConfig(this._adminZkClient, i, entityConfigs);
        }
    }

    private void setLeaderThrottledReplicas(String str, Set<String> set) {
        setThrottledReplicas(str, set, LEADER_THROTTLED_REPLICAS);
    }

    private void setFollowerThrottledReplicas(String str, Set<String> set) {
        setThrottledReplicas(str, set, FOLLOWER_THROTTLED_REPLICAS);
    }

    private void setThrottledReplicas(String str, Set<String> set, String str2) {
        if (!$assertionsDisabled && !str2.equals(LEADER_THROTTLED_REPLICAS) && !str2.equals(FOLLOWER_THROTTLED_REPLICAS)) {
            throw new AssertionError();
        }
        Properties entityConfigs = this._kafkaZkClient.getEntityConfigs(ConfigType.Topic(), str);
        TreeSet treeSet = new TreeSet(set);
        String property = entityConfigs.getProperty(str2);
        if (property != null) {
            treeSet.addAll(Arrays.asList(property.split(",")));
        }
        entityConfigs.setProperty(str2, String.join(",", treeSet));
        ExecutorUtils.changeTopicConfig(this._adminZkClient, str, entityConfigs);
    }

    static String removeReplicasFromConfig(String str, Set<String> set) {
        ArrayList arrayList = new ArrayList(Arrays.asList(str.split(",")));
        set.getClass();
        arrayList.removeIf((v1) -> {
            return r1.contains(v1);
        });
        return String.join(",", arrayList);
    }

    private void removeLeaderThrottledReplicasFromTopic(Properties properties, String str, Set<String> set) {
        String property = properties.getProperty(LEADER_THROTTLED_REPLICAS);
        if (property != null) {
            set.forEach(str2 -> {
                LOG.debug("Removing leader throttles for topic {} on replica {}", str, str2);
            });
            String removeReplicasFromConfig = removeReplicasFromConfig(property, set);
            if (removeReplicasFromConfig.isEmpty()) {
                properties.remove(LEADER_THROTTLED_REPLICAS);
            } else {
                properties.setProperty(LEADER_THROTTLED_REPLICAS, removeReplicasFromConfig);
            }
        }
    }

    private void removeFollowerThrottledReplicasFromTopic(Properties properties, String str, Set<String> set) {
        String property = properties.getProperty(FOLLOWER_THROTTLED_REPLICAS);
        if (property != null) {
            set.forEach(str2 -> {
                LOG.debug("Removing follower throttles for topic {} and replica {}", str, str2);
            });
            String removeReplicasFromConfig = removeReplicasFromConfig(property, set);
            if (removeReplicasFromConfig.isEmpty()) {
                properties.remove(FOLLOWER_THROTTLED_REPLICAS);
            } else {
                properties.setProperty(FOLLOWER_THROTTLED_REPLICAS, removeReplicasFromConfig);
            }
        }
    }

    private void removeThrottledReplicasFromTopic(String str, Set<String> set) {
        Properties entityConfigs = this._kafkaZkClient.getEntityConfigs(ConfigType.Topic(), str);
        removeLeaderThrottledReplicasFromTopic(entityConfigs, str, set);
        removeFollowerThrottledReplicasFromTopic(entityConfigs, str, set);
        ExecutorUtils.changeTopicConfig(this._adminZkClient, str, entityConfigs);
    }

    private void removeAllThrottledReplicasFromTopic(String str) {
        Properties entityConfigs = this._kafkaZkClient.getEntityConfigs(ConfigType.Topic(), str);
        Object remove = entityConfigs.remove(LEADER_THROTTLED_REPLICAS);
        Object remove2 = entityConfigs.remove(FOLLOWER_THROTTLED_REPLICAS);
        if (remove != null) {
            LOG.debug("Removing leader throttled replicas for topic {}", str);
        }
        if (remove2 != null) {
            LOG.debug("Removing follower throttled replicas for topic {}", str);
        }
        if (remove == null && remove2 == null) {
            return;
        }
        ExecutorUtils.changeTopicConfig(this._adminZkClient, str, entityConfigs);
    }

    private void removeThrottledRateFromBroker(Integer num) {
        Properties entityConfigs = this._kafkaZkClient.getEntityConfigs(ConfigType.Broker(), String.valueOf(num));
        Object remove = entityConfigs.remove(LEADER_THROTTLED_RATE);
        Object remove2 = entityConfigs.remove(FOLLOWER_THROTTLED_RATE);
        if (remove != null) {
            LOG.debug("Removing leader throttle on broker {}", num);
        }
        if (remove2 != null) {
            LOG.debug("Removing follower throttle on broker {}", num);
        }
        if (remove == null && remove2 == null) {
            return;
        }
        ExecutorUtils.changeBrokerConfig(this._adminZkClient, num.intValue(), entityConfigs);
    }

    static {
        $assertionsDisabled = !ReplicationThrottleHelper.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(ReplicationThrottleHelper.class);
        LEADER_THROTTLED_REPLICAS = LogConfig.LeaderReplicationThrottledReplicasProp();
        FOLLOWER_THROTTLED_REPLICAS = LogConfig.FollowerReplicationThrottledReplicasProp();
    }
}
