package com.linkedin.kafka.cruisecontrol.executor;

import com.linkedin.cruisecontrol.monitor.sampling.aggregator.ValuesAndExtrapolations;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType;
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerEntity;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.zk.KafkaZkClient;
import kafka.zk.ZkVersion;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/ExecutionUtils.class */
public final class ExecutionUtils {
    public static final int DEFAULT_RETRY_BACKOFF_BASE = 2;
    public static final long METADATA_REFRESH_BACKOFF = 100;
    public static final long METADATA_EXPIRY_MS = Long.MAX_VALUE;
    public static final long PERMANENT_TIMESTAMP = 0;
    public static final String EXECUTION_STARTED = "execution-started";
    public static final String KAFKA_ASSIGNER_MODE = "kafka_assigner";
    public static final String EXECUTION_STOPPED = "execution-stopped";
    public static final String GAUGE_EXECUTION_STOPPED = "execution-stopped";
    public static final String GAUGE_EXECUTION_STOPPED_BY_USER = "execution-stopped-by-user";
    public static final String GAUGE_EXECUTION_STARTED_IN_KAFKA_ASSIGNER_MODE = "execution-started-kafka_assigner";
    public static final String GAUGE_EXECUTION_STARTED_IN_NON_KAFKA_ASSIGNER_MODE = "execution-started-non-kafka_assigner";
    public static final String GAUGE_EXECUTION_INTER_BROKER_PARTITION_MOVEMENTS_PER_BROKER_CAP = "inter-broker-partition-movements-per-broker-cap";
    public static final String GAUGE_EXECUTION_INTRA_BROKER_PARTITION_MOVEMENTS_PER_BROKER_CAP = "intra-broker-partition-movements-per-broker-cap";
    public static final String GAUGE_EXECUTION_LEADERSHIP_MOVEMENTS_GLOBAL_CAP = "leadership-movements-global-cap";
    public static final long EXECUTION_HISTORY_SCANNER_PERIOD_SECONDS = 5;
    public static final long EXECUTION_HISTORY_SCANNER_INITIAL_DELAY_SECONDS = 0;
    public static final long EXECUTION_TASK_FUTURE_ERROR_VERIFICATION_TIMEOUT_MS = 10000;
    static long LIST_PARTITION_REASSIGNMENTS_TIMEOUT_MS;
    static int LIST_PARTITION_REASSIGNMENTS_MAX_ATTEMPTS;
    private static final Logger LOG = LoggerFactory.getLogger(ExecutionUtils.class);
    static final Map<ConcurrencyType, Integer> ADDITIVE_INCREASE = new HashMap(ConcurrencyType.cachedValues().size());
    static final Map<ConcurrencyType, Integer> MULTIPLICATIVE_DECREASE = new HashMap(ConcurrencyType.cachedValues().size());
    static final Map<ConcurrencyType, Integer> MAX_CONCURRENCY = new HashMap(ConcurrencyType.cachedValues().size());
    static final Map<ConcurrencyType, Integer> MIN_CONCURRENCY = new HashMap(ConcurrencyType.cachedValues().size());
    static final Map<String, Double> CONCURRENCY_ADJUSTER_LIMIT_BY_METRIC_NAME = new HashMap(5);

    private ExecutionUtils() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void init(KafkaCruiseControlConfig kafkaCruiseControlConfig) {
        CONCURRENCY_ADJUSTER_LIMIT_BY_METRIC_NAME.put(RawMetricType.BROKER_LOG_FLUSH_TIME_MS_999TH.name(), kafkaCruiseControlConfig.getDouble(ExecutorConfig.CONCURRENCY_ADJUSTER_LIMIT_LOG_FLUSH_TIME_MS_CONFIG));
        CONCURRENCY_ADJUSTER_LIMIT_BY_METRIC_NAME.put(RawMetricType.BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_999TH.name(), kafkaCruiseControlConfig.getDouble(ExecutorConfig.CONCURRENCY_ADJUSTER_LIMIT_FOLLOWER_FETCH_LOCAL_TIME_MS_CONFIG));
        CONCURRENCY_ADJUSTER_LIMIT_BY_METRIC_NAME.put(RawMetricType.BROKER_PRODUCE_LOCAL_TIME_MS_999TH.name(), kafkaCruiseControlConfig.getDouble(ExecutorConfig.CONCURRENCY_ADJUSTER_LIMIT_PRODUCE_LOCAL_TIME_MS_CONFIG));
        CONCURRENCY_ADJUSTER_LIMIT_BY_METRIC_NAME.put(RawMetricType.BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_999TH.name(), kafkaCruiseControlConfig.getDouble(ExecutorConfig.CONCURRENCY_ADJUSTER_LIMIT_CONSUMER_FETCH_LOCAL_TIME_MS_CONFIG));
        CONCURRENCY_ADJUSTER_LIMIT_BY_METRIC_NAME.put(RawMetricType.BROKER_REQUEST_QUEUE_SIZE.name(), kafkaCruiseControlConfig.getDouble(ExecutorConfig.CONCURRENCY_ADJUSTER_LIMIT_REQUEST_QUEUE_SIZE_CONFIG));
        ADDITIVE_INCREASE.put(ConcurrencyType.INTER_BROKER_REPLICA, kafkaCruiseControlConfig.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_ADDITIVE_INCREASE_INTER_BROKER_REPLICA_CONFIG));
        ADDITIVE_INCREASE.put(ConcurrencyType.LEADERSHIP, kafkaCruiseControlConfig.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_ADDITIVE_INCREASE_LEADERSHIP_CONFIG));
        MULTIPLICATIVE_DECREASE.put(ConcurrencyType.INTER_BROKER_REPLICA, kafkaCruiseControlConfig.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MULTIPLICATIVE_DECREASE_INTER_BROKER_REPLICA_CONFIG));
        MULTIPLICATIVE_DECREASE.put(ConcurrencyType.LEADERSHIP, kafkaCruiseControlConfig.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MULTIPLICATIVE_DECREASE_LEADERSHIP_CONFIG));
        MAX_CONCURRENCY.put(ConcurrencyType.INTER_BROKER_REPLICA, kafkaCruiseControlConfig.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MAX_PARTITION_MOVEMENTS_PER_BROKER_CONFIG));
        MAX_CONCURRENCY.put(ConcurrencyType.LEADERSHIP, kafkaCruiseControlConfig.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MAX_LEADERSHIP_MOVEMENTS_CONFIG));
        MIN_CONCURRENCY.put(ConcurrencyType.INTER_BROKER_REPLICA, kafkaCruiseControlConfig.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MIN_PARTITION_MOVEMENTS_PER_BROKER_CONFIG));
        MIN_CONCURRENCY.put(ConcurrencyType.LEADERSHIP, kafkaCruiseControlConfig.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MIN_LEADERSHIP_MOVEMENTS_CONFIG));
        LIST_PARTITION_REASSIGNMENTS_TIMEOUT_MS = kafkaCruiseControlConfig.getLong(ExecutorConfig.LIST_PARTITION_REASSIGNMENTS_TIMEOUT_MS_CONFIG).longValue();
        LIST_PARTITION_REASSIGNMENTS_MAX_ATTEMPTS = kafkaCruiseControlConfig.getInt(ExecutorConfig.LIST_PARTITION_REASSIGNMENTS_MAX_ATTEMPTS_CONFIG).intValue();
    }

    private static String toMetricName(Short sh) {
        return KafkaMetricDef.brokerMetricDef().metricInfo(sh.shortValue()).name();
    }

    static boolean withinConcurrencyAdjusterLimit(Map<BrokerEntity, ValuesAndExtrapolations> map) {
        boolean z = true;
        HashSet hashSet = new HashSet();
        HashMap hashMap = new HashMap(CONCURRENCY_ADJUSTER_LIMIT_BY_METRIC_NAME.size());
        Iterator<String> it = CONCURRENCY_ADJUSTER_LIMIT_BY_METRIC_NAME.keySet().iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), new StringBuilder());
        }
        for (Map.Entry<BrokerEntity, ValuesAndExtrapolations> entry : map.entrySet()) {
            BrokerEntity key = entry.getKey();
            ValuesAndExtrapolations value = entry.getValue();
            if (value == null) {
                hashSet.add(key);
            } else {
                for (Short sh : value.metricValues().metricIds()) {
                    String metricName = toMetricName(sh);
                    Double d = CONCURRENCY_ADJUSTER_LIMIT_BY_METRIC_NAME.get(metricName);
                    if (d != null) {
                        double latest = value.metricValues().valuesFor(sh.shortValue()).latest();
                        if (latest > d.doubleValue()) {
                            ((StringBuilder) hashMap.get(metricName)).append(String.format("%d(%.2f) ", Integer.valueOf(key.brokerId()), Double.valueOf(latest)));
                        }
                    }
                }
            }
        }
        for (Map.Entry entry2 : hashMap.entrySet()) {
            StringBuilder sb = (StringBuilder) entry2.getValue();
            if (sb.length() > 0) {
                LOG.info("{} is over the acceptable limit for brokers with values: {}.", entry2.getKey(), sb);
                z = false;
            }
        }
        if (!hashSet.isEmpty()) {
            LOG.warn("Assuming {} are over the acceptable limit as no broker metrics exist to verify.", hashSet);
            z = false;
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Integer recommendedConcurrency(Map<BrokerEntity, ValuesAndExtrapolations> map, int i, ConcurrencyType concurrencyType) {
        Integer num = null;
        if (withinConcurrencyAdjusterLimit(map)) {
            int intValue = MAX_CONCURRENCY.get(concurrencyType).intValue();
            if (i < intValue) {
                num = Integer.valueOf(Math.min(intValue, i + ADDITIVE_INCREASE.get(concurrencyType).intValue()));
                LOG.info("Concurrency adjuster recommended an increase in {} movement concurrency to {}", concurrencyType, num);
            }
        } else {
            int intValue2 = MIN_CONCURRENCY.get(concurrencyType).intValue();
            if (i > intValue2) {
                num = Integer.valueOf(Math.max(intValue2, i / MULTIPLICATIVE_DECREASE.get(concurrencyType).intValue()));
                LOG.info("Concurrency adjuster recommended a decrease in {} movement concurrency to {}", concurrencyType, num);
            }
        }
        return num;
    }

    public static Set<TopicPartition> partitionsBeingReassigned(AdminClient adminClient) throws InterruptedException, ExecutionException, TimeoutException {
        return ongoingPartitionReassignments(adminClient).keySet();
    }

    public static Map<TopicPartition, PartitionReassignment> ongoingPartitionReassignments(AdminClient adminClient) throws InterruptedException, ExecutionException, TimeoutException {
        Map<TopicPartition, PartitionReassignment> map = null;
        int i = 0;
        long j = LIST_PARTITION_REASSIGNMENTS_TIMEOUT_MS;
        do {
            try {
                map = (Map) adminClient.listPartitionReassignments().reassignments().get(j, TimeUnit.MILLISECONDS);
            } catch (TimeoutException e) {
                LOG.info("Failed to list partition reassignments in {}ms (attempt={}). Consider increasing the value of {} config.", new Object[]{Long.valueOf(j), Integer.valueOf(i + 1), ExecutorConfig.LIST_PARTITION_REASSIGNMENTS_TIMEOUT_MS_CONFIG});
                i++;
                if (i == LIST_PARTITION_REASSIGNMENTS_MAX_ATTEMPTS) {
                    throw e;
                }
                j *= 2;
            }
        } while (map == null);
        return map;
    }

    static Optional<NewPartitionReassignment> cancelReassignmentValue() {
        return Optional.empty();
    }

    private static Optional<NewPartitionReassignment> reassignmentValue(List<Integer> list) {
        return Optional.of(new NewPartitionReassignment(list));
    }

    public static AlterPartitionReassignmentsResult submitReplicaReassignmentTasks(AdminClient adminClient, List<ExecutionTask> list) {
        if (list == null || list.isEmpty()) {
            throw new IllegalArgumentException(String.format("Tasks to execute (%s) cannot be null or empty.", list));
        }
        HashMap hashMap = new HashMap(list.size());
        for (ExecutionTask executionTask : list) {
            TopicPartition topicPartition = executionTask.proposal().topicPartition();
            ArrayList arrayList = new ArrayList(executionTask.proposal().newReplicas().size());
            Iterator<ReplicaPlacementInfo> it = executionTask.proposal().newReplicas().iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().brokerId());
            }
            switch (executionTask.state()) {
                case ABORTING:
                case ABORTED:
                case DEAD:
                    hashMap.put(topicPartition, cancelReassignmentValue());
                    LOG.debug("The ongoing reassignment will be cancelled for task {}.", executionTask);
                    break;
                case COMPLETED:
                    LOG.debug("Task {} has already been completed.", executionTask);
                    break;
                case IN_PROGRESS:
                    hashMap.put(topicPartition, reassignmentValue(arrayList));
                    LOG.debug("Task {} will be executed.", executionTask);
                    break;
                default:
                    throw new IllegalStateException(String.format("Unrecognized task state %s.", executionTask.state()));
            }
        }
        if (hashMap.isEmpty()) {
            throw new IllegalArgumentException("All tasks submitted for replica reassignment are already completed.");
        }
        return adminClient.alterPartitionReassignments(hashMap);
    }

    public static boolean isSubset(Set<TopicPartition> set, Collection<ExecutionTask> collection) {
        boolean z = true;
        Iterator<ExecutionTask> it = collection.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (!set.contains(it.next().proposal().topicPartition())) {
                z = false;
                break;
            }
        }
        return z;
    }

    public static void processAlterPartitionReassignmentsResult(AlterPartitionReassignmentsResult alterPartitionReassignmentsResult, Set<TopicPartition> set, Set<TopicPartition> set2, Set<TopicPartition> set3) {
        if (alterPartitionReassignmentsResult != null) {
            for (Map.Entry entry : alterPartitionReassignmentsResult.values().entrySet()) {
                TopicPartition topicPartition = (TopicPartition) entry.getKey();
                try {
                    ((KafkaFuture) entry.getValue()).get(10000L, TimeUnit.MILLISECONDS);
                    LOG.debug("Replica reassignment for {} has been accepted.", topicPartition);
                } catch (InterruptedException | TimeoutException e) {
                    LOG.warn("Failed to process AlterPartitionReassignmentsResult of {}.", topicPartition, e);
                } catch (ExecutionException e2) {
                    if (Errors.INVALID_REPLICA_ASSIGNMENT.exception().getClass() == e2.getCause().getClass()) {
                        set2.add(topicPartition);
                        LOG.debug("Replica reassignment failed for {} due to dead destination broker(s).", topicPartition);
                    } else if (Errors.UNKNOWN_TOPIC_OR_PARTITION.exception().getClass() == e2.getCause().getClass()) {
                        set.add(topicPartition);
                        LOG.debug("Replica reassignment failed for {} due to its topic deletion.", topicPartition);
                    } else {
                        if (Errors.NO_REASSIGNMENT_IN_PROGRESS.exception().getClass() != e2.getCause().getClass()) {
                            throw new IllegalStateException(String.format("%s encountered an unknown execution exception.", topicPartition), e2);
                        }
                        set3.add(topicPartition);
                        LOG.debug("Rollback failed for {} due to lack of corresponding ongoing replica reassignment.", topicPartition);
                    }
                }
            }
        }
    }

    public static void deleteZNodesToForceStopLeadershipMoves(KafkaZkClient kafkaZkClient) {
        LOG.info("Deleting zNode for ongoing leadership changes {}.", kafkaZkClient.getPreferredReplicaElection());
        kafkaZkClient.deletePreferredReplicaElection(ZkVersion.MatchAnyVersion());
        LOG.info("Deleting controller zNode to re-elect a new controller. Old controller is {}.", kafkaZkClient.getControllerId());
        kafkaZkClient.deleteController(ZkVersion.MatchAnyVersion());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean isInterBrokerReplicaActionDone(Cluster cluster, ExecutionTask executionTask) {
        PartitionInfo partition = cluster.partition(executionTask.proposal().topicPartition());
        switch (executionTask.state()) {
            case ABORTING:
                return executionTask.proposal().isInterBrokerMovementAborted(partition);
            case ABORTED:
            case COMPLETED:
            default:
                throw new IllegalStateException("Should never be here. State " + executionTask.state());
            case DEAD:
                return true;
            case IN_PROGRESS:
                return executionTask.proposal().isInterBrokerMovementCompleted(partition);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean isIntraBrokerReplicaActionDone(Map<ExecutionTask, DescribeReplicaLogDirsResult.ReplicaLogDirInfo> map, ExecutionTask executionTask) {
        if (map.containsKey(executionTask)) {
            return map.get(executionTask).getCurrentReplicaLogDir().equals(executionTask.proposal().replicasToMoveBetweenDisksByBroker().get(Integer.valueOf(executionTask.brokerId())).logdir());
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean isLeadershipMovementDone(Cluster cluster, ExecutionTask executionTask) {
        switch (executionTask.state()) {
            case ABORTING:
            case DEAD:
                return true;
            case ABORTED:
            case COMPLETED:
            default:
                throw new IllegalStateException("Should never be here.");
            case IN_PROGRESS:
                TopicPartition topicPartition = executionTask.proposal().topicPartition();
                Node leaderFor = cluster.leaderFor(topicPartition);
                return (leaderFor != null && leaderFor.id() == executionTask.proposal().newLeader().brokerId().intValue()) || leaderFor == null || !isInIsr(executionTask.proposal().newLeader().brokerId(), cluster, topicPartition);
        }
    }

    private static boolean isInIsr(Integer num, Cluster cluster, TopicPartition topicPartition) {
        return Arrays.stream(cluster.partition(topicPartition).inSyncReplicas()).anyMatch(node -> {
            return node.id() == num.intValue();
        });
    }
}
