/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.executor;

import com.linkedin.cruisecontrol.monitor.sampling.aggregator.ValuesAndExtrapolations;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.executor.ConcurrencyType;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTask;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType;
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerEntity;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.zk.KafkaZkClient;
import kafka.zk.ZkVersion;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
import org.apache.kafka.clients.admin.ListPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ExecutionUtils {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutionUtils.class);
    public static final int DEFAULT_RETRY_BACKOFF_BASE = 2;
    public static final long METADATA_REFRESH_BACKOFF = 100L;
    public static final long METADATA_EXPIRY_MS = Long.MAX_VALUE;
    public static final long PERMANENT_TIMESTAMP = 0L;
    public static final String EXECUTION_STARTED = "execution-started";
    public static final String KAFKA_ASSIGNER_MODE = "kafka_assigner";
    public static final String EXECUTION_STOPPED = "execution-stopped";
    public static final String GAUGE_EXECUTION_STOPPED = "execution-stopped";
    public static final String GAUGE_EXECUTION_STOPPED_BY_USER = "execution-stopped-by-user";
    public static final String GAUGE_EXECUTION_STARTED_IN_KAFKA_ASSIGNER_MODE = "execution-started-kafka_assigner";
    public static final String GAUGE_EXECUTION_STARTED_IN_NON_KAFKA_ASSIGNER_MODE = "execution-started-non-kafka_assigner";
    public static final String GAUGE_EXECUTION_INTER_BROKER_PARTITION_MOVEMENTS_PER_BROKER_CAP = "inter-broker-partition-movements-per-broker-cap";
    public static final String GAUGE_EXECUTION_INTRA_BROKER_PARTITION_MOVEMENTS_PER_BROKER_CAP = "intra-broker-partition-movements-per-broker-cap";
    public static final String GAUGE_EXECUTION_LEADERSHIP_MOVEMENTS_GLOBAL_CAP = "leadership-movements-global-cap";
    public static final long EXECUTION_HISTORY_SCANNER_PERIOD_SECONDS = 5L;
    public static final long EXECUTION_HISTORY_SCANNER_INITIAL_DELAY_SECONDS = 0L;
    static final Map<ConcurrencyType, Integer> ADDITIVE_INCREASE = new HashMap<ConcurrencyType, Integer>(ConcurrencyType.cachedValues().size());
    static final Map<ConcurrencyType, Integer> MULTIPLICATIVE_DECREASE = new HashMap<ConcurrencyType, Integer>(ConcurrencyType.cachedValues().size());
    static final Map<ConcurrencyType, Integer> MAX_CONCURRENCY = new HashMap<ConcurrencyType, Integer>(ConcurrencyType.cachedValues().size());
    static final Map<ConcurrencyType, Integer> MIN_CONCURRENCY = new HashMap<ConcurrencyType, Integer>(ConcurrencyType.cachedValues().size());
    static final Map<String, Double> CONCURRENCY_ADJUSTER_LIMIT_BY_METRIC_NAME = new HashMap<String, Double>(5);
    public static final long EXECUTION_TASK_FUTURE_ERROR_VERIFICATION_TIMEOUT_MS = 10000L;
    static long LIST_PARTITION_REASSIGNMENTS_TIMEOUT_MS;
    static int LIST_PARTITION_REASSIGNMENTS_MAX_ATTEMPTS;

    private ExecutionUtils() {
    }

    static void init(KafkaCruiseControlConfig config) {
        CONCURRENCY_ADJUSTER_LIMIT_BY_METRIC_NAME.put(RawMetricType.BROKER_LOG_FLUSH_TIME_MS_999TH.name(), config.getDouble("concurrency.adjuster.limit.log.flush.time.ms"));
        CONCURRENCY_ADJUSTER_LIMIT_BY_METRIC_NAME.put(RawMetricType.BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_999TH.name(), config.getDouble("concurrency.adjuster.limit.follower.fetch.local.time.ms"));
        CONCURRENCY_ADJUSTER_LIMIT_BY_METRIC_NAME.put(RawMetricType.BROKER_PRODUCE_LOCAL_TIME_MS_999TH.name(), config.getDouble("concurrency.adjuster.limit.produce.local.time.ms"));
        CONCURRENCY_ADJUSTER_LIMIT_BY_METRIC_NAME.put(RawMetricType.BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_999TH.name(), config.getDouble("concurrency.adjuster.limit.consumer.fetch.local.time.ms"));
        CONCURRENCY_ADJUSTER_LIMIT_BY_METRIC_NAME.put(RawMetricType.BROKER_REQUEST_QUEUE_SIZE.name(), config.getDouble("concurrency.adjuster.limit.request.queue.size"));
        ADDITIVE_INCREASE.put(ConcurrencyType.INTER_BROKER_REPLICA, config.getInt("concurrency.adjuster.additive.increase.inter.broker.replica"));
        ADDITIVE_INCREASE.put(ConcurrencyType.LEADERSHIP, config.getInt("concurrency.adjuster.additive.increase.leadership"));
        MULTIPLICATIVE_DECREASE.put(ConcurrencyType.INTER_BROKER_REPLICA, config.getInt("concurrency.adjuster.multiplicative.decrease.inter.broker.replica"));
        MULTIPLICATIVE_DECREASE.put(ConcurrencyType.LEADERSHIP, config.getInt("concurrency.adjuster.multiplicative.decrease.leadership"));
        MAX_CONCURRENCY.put(ConcurrencyType.INTER_BROKER_REPLICA, config.getInt("concurrency.adjuster.max.partition.movements.per.broker"));
        MAX_CONCURRENCY.put(ConcurrencyType.LEADERSHIP, config.getInt("concurrency.adjuster.max.leadership.movements"));
        MIN_CONCURRENCY.put(ConcurrencyType.INTER_BROKER_REPLICA, config.getInt("concurrency.adjuster.min.partition.movements.per.broker"));
        MIN_CONCURRENCY.put(ConcurrencyType.LEADERSHIP, config.getInt("concurrency.adjuster.min.leadership.movements"));
        LIST_PARTITION_REASSIGNMENTS_TIMEOUT_MS = config.getLong("list.partition.reassignment.timeout.ms");
        LIST_PARTITION_REASSIGNMENTS_MAX_ATTEMPTS = config.getInt("list.partition.reassignment.max.attempts");
    }

    private static String toMetricName(Short metricId) {
        return KafkaMetricDef.brokerMetricDef().metricInfo(metricId.shortValue()).name();
    }

    static boolean withinConcurrencyAdjusterLimit(Map<BrokerEntity, ValuesAndExtrapolations> currentMetricsByBroker) {
        boolean withinLimit = true;
        HashSet<BrokerEntity> brokersWithNoMetrics = new HashSet<BrokerEntity>();
        HashMap<String, StringBuilder> overLimitDetailsByMetricName = new HashMap<String, StringBuilder>(CONCURRENCY_ADJUSTER_LIMIT_BY_METRIC_NAME.size());
        for (String string : CONCURRENCY_ADJUSTER_LIMIT_BY_METRIC_NAME.keySet()) {
            overLimitDetailsByMetricName.put(string, new StringBuilder());
        }
        for (Map.Entry entry : currentMetricsByBroker.entrySet()) {
            BrokerEntity broker = (BrokerEntity)((Object)entry.getKey());
            ValuesAndExtrapolations current = (ValuesAndExtrapolations)entry.getValue();
            if (current == null) {
                brokersWithNoMetrics.add(broker);
                continue;
            }
            for (Short metricId : current.metricValues().metricIds()) {
                double metricValue;
                String metricName = ExecutionUtils.toMetricName(metricId);
                Double limit = CONCURRENCY_ADJUSTER_LIMIT_BY_METRIC_NAME.get(metricName);
                if (limit == null || !((metricValue = (double)current.metricValues().valuesFor(metricId.shortValue()).latest()) > limit)) continue;
                ((StringBuilder)overLimitDetailsByMetricName.get(metricName)).append(String.format("%d(%.2f) ", broker.brokerId(), metricValue));
            }
        }
        for (Map.Entry entry : overLimitDetailsByMetricName.entrySet()) {
            StringBuilder brokersWithValues = (StringBuilder)entry.getValue();
            if (brokersWithValues.length() <= 0) continue;
            LOG.info("{} is over the acceptable limit for brokers with values: {}.", entry.getKey(), (Object)brokersWithValues);
            withinLimit = false;
        }
        if (!brokersWithNoMetrics.isEmpty()) {
            LOG.warn("Assuming {} are over the acceptable limit as no broker metrics exist to verify.", brokersWithNoMetrics);
            withinLimit = false;
        }
        return withinLimit;
    }

    static Integer recommendedConcurrency(Map<BrokerEntity, ValuesAndExtrapolations> currentMetricsByBroker, int currentMovementConcurrency, ConcurrencyType concurrencyType) {
        boolean withinAdjusterLimit = ExecutionUtils.withinConcurrencyAdjusterLimit(currentMetricsByBroker);
        Integer recommendedConcurrency = null;
        if (withinAdjusterLimit) {
            int maxMovementsConcurrency = MAX_CONCURRENCY.get((Object)concurrencyType);
            if (currentMovementConcurrency < maxMovementsConcurrency) {
                recommendedConcurrency = Math.min(maxMovementsConcurrency, currentMovementConcurrency + ADDITIVE_INCREASE.get((Object)concurrencyType));
                LOG.info("Concurrency adjuster recommended an increase in {} movement concurrency to {}", (Object)concurrencyType, (Object)recommendedConcurrency);
            }
        } else {
            int minMovementsConcurrency = MIN_CONCURRENCY.get((Object)concurrencyType);
            if (currentMovementConcurrency > minMovementsConcurrency) {
                recommendedConcurrency = Math.max(minMovementsConcurrency, currentMovementConcurrency / MULTIPLICATIVE_DECREASE.get((Object)concurrencyType));
                LOG.info("Concurrency adjuster recommended a decrease in {} movement concurrency to {}", (Object)concurrencyType, (Object)recommendedConcurrency);
            }
        }
        return recommendedConcurrency;
    }

    public static Set<TopicPartition> partitionsBeingReassigned(AdminClient adminClient) throws InterruptedException, ExecutionException, TimeoutException {
        return ExecutionUtils.ongoingPartitionReassignments(adminClient).keySet();
    }

    public static Map<TopicPartition, PartitionReassignment> ongoingPartitionReassignments(AdminClient adminClient) throws InterruptedException, ExecutionException, TimeoutException {
        Map partitionReassignments = null;
        int attempts = 0;
        long timeoutMs = LIST_PARTITION_REASSIGNMENTS_TIMEOUT_MS;
        do {
            ListPartitionReassignmentsResult responseResult = adminClient.listPartitionReassignments();
            try {
                partitionReassignments = (Map)responseResult.reassignments().get(timeoutMs, TimeUnit.MILLISECONDS);
            }
            catch (TimeoutException e) {
                LOG.info("Failed to list partition reassignments in {}ms (attempt={}). Consider increasing the value of {} config.", new Object[]{timeoutMs, attempts + 1, "list.partition.reassignment.timeout.ms"});
                if (++attempts == LIST_PARTITION_REASSIGNMENTS_MAX_ATTEMPTS) {
                    throw e;
                }
                timeoutMs *= 2L;
            }
        } while (partitionReassignments == null);
        return partitionReassignments;
    }

    static Optional<NewPartitionReassignment> cancelReassignmentValue() {
        return Optional.empty();
    }

    private static Optional<NewPartitionReassignment> reassignmentValue(List<Integer> targetReplicas) {
        return Optional.of(new NewPartitionReassignment(targetReplicas));
    }

    public static AlterPartitionReassignmentsResult submitReplicaReassignmentTasks(AdminClient adminClient, List<ExecutionTask> tasks) {
        if (tasks == null || tasks.isEmpty()) {
            throw new IllegalArgumentException(String.format("Tasks to execute (%s) cannot be null or empty.", tasks));
        }
        HashMap<TopicPartition, Optional<NewPartitionReassignment>> newReassignments = new HashMap<TopicPartition, Optional<NewPartitionReassignment>>(tasks.size());
        block5: for (ExecutionTask task : tasks) {
            TopicPartition tp = task.proposal().topicPartition();
            ArrayList<Integer> newReplicas = new ArrayList<Integer>(task.proposal().newReplicas().size());
            for (ReplicaPlacementInfo replicaPlacementInfo : task.proposal().newReplicas()) {
                newReplicas.add(replicaPlacementInfo.brokerId());
            }
            switch (task.state()) {
                case ABORTING: 
                case ABORTED: 
                case DEAD: {
                    newReassignments.put(tp, ExecutionUtils.cancelReassignmentValue());
                    LOG.debug("The ongoing reassignment will be cancelled for task {}.", (Object)task);
                    continue block5;
                }
                case COMPLETED: {
                    LOG.debug("Task {} has already been completed.", (Object)task);
                    continue block5;
                }
                case IN_PROGRESS: {
                    newReassignments.put(tp, ExecutionUtils.reassignmentValue(newReplicas));
                    LOG.debug("Task {} will be executed.", (Object)task);
                    continue block5;
                }
            }
            throw new IllegalStateException(String.format("Unrecognized task state %s.", new Object[]{task.state()}));
        }
        if (newReassignments.isEmpty()) {
            throw new IllegalArgumentException("All tasks submitted for replica reassignment are already completed.");
        }
        return adminClient.alterPartitionReassignments(newReassignments);
    }

    public static boolean isSubset(Set<TopicPartition> set, Collection<ExecutionTask> subset) {
        boolean isSubset = true;
        for (ExecutionTask executionTask : subset) {
            TopicPartition tp = executionTask.proposal().topicPartition();
            if (set.contains(tp)) continue;
            isSubset = false;
            break;
        }
        return isSubset;
    }

    public static void processAlterPartitionReassignmentsResult(AlterPartitionReassignmentsResult result, Set<TopicPartition> deleted, Set<TopicPartition> dead, Set<TopicPartition> noReassignmentToCancel) {
        if (result != null) {
            for (Map.Entry entry : result.values().entrySet()) {
                TopicPartition tp = (TopicPartition)entry.getKey();
                try {
                    ((KafkaFuture)entry.getValue()).get(10000L, TimeUnit.MILLISECONDS);
                    LOG.debug("Replica reassignment for {} has been accepted.", (Object)tp);
                }
                catch (ExecutionException ee) {
                    if (Errors.INVALID_REPLICA_ASSIGNMENT.exception().getClass() == ee.getCause().getClass()) {
                        dead.add(tp);
                        LOG.debug("Replica reassignment failed for {} due to dead destination broker(s).", (Object)tp);
                        continue;
                    }
                    if (Errors.UNKNOWN_TOPIC_OR_PARTITION.exception().getClass() == ee.getCause().getClass()) {
                        deleted.add(tp);
                        LOG.debug("Replica reassignment failed for {} due to its topic deletion.", (Object)tp);
                        continue;
                    }
                    if (Errors.NO_REASSIGNMENT_IN_PROGRESS.exception().getClass() == ee.getCause().getClass()) {
                        noReassignmentToCancel.add(tp);
                        LOG.debug("Rollback failed for {} due to lack of corresponding ongoing replica reassignment.", (Object)tp);
                        continue;
                    }
                    throw new IllegalStateException(String.format("%s encountered an unknown execution exception.", tp), ee);
                }
                catch (InterruptedException | TimeoutException e) {
                    LOG.warn("Failed to process AlterPartitionReassignmentsResult of {}.", (Object)tp, (Object)e);
                }
            }
        }
    }

    public static void deleteZNodesToForceStopLeadershipMoves(KafkaZkClient kafkaZkClient) {
        LOG.info("Deleting zNode for ongoing leadership changes {}.", (Object)kafkaZkClient.getPreferredReplicaElection());
        kafkaZkClient.deletePreferredReplicaElection(ZkVersion.MatchAnyVersion());
        LOG.info("Deleting controller zNode to re-elect a new controller. Old controller is {}.", (Object)kafkaZkClient.getControllerId());
        kafkaZkClient.deleteController(ZkVersion.MatchAnyVersion());
    }

    static boolean isInterBrokerReplicaActionDone(Cluster cluster, ExecutionTask task) {
        PartitionInfo partitionInfo = cluster.partition(task.proposal().topicPartition());
        switch (task.state()) {
            case IN_PROGRESS: {
                return task.proposal().isInterBrokerMovementCompleted(partitionInfo);
            }
            case ABORTING: {
                return task.proposal().isInterBrokerMovementAborted(partitionInfo);
            }
            case DEAD: {
                return true;
            }
        }
        throw new IllegalStateException("Should never be here. State " + task.state());
    }

    static boolean isIntraBrokerReplicaActionDone(Map<ExecutionTask, DescribeReplicaLogDirsResult.ReplicaLogDirInfo> logdirInfoByTask, ExecutionTask task) {
        if (logdirInfoByTask.containsKey(task)) {
            return logdirInfoByTask.get(task).getCurrentReplicaLogDir().equals(task.proposal().replicasToMoveBetweenDisksByBroker().get(task.brokerId()).logdir());
        }
        return false;
    }

    static boolean isLeadershipMovementDone(Cluster cluster, ExecutionTask task) {
        switch (task.state()) {
            case IN_PROGRESS: {
                TopicPartition tp = task.proposal().topicPartition();
                Node leader = cluster.leaderFor(tp);
                return leader != null && leader.id() == task.proposal().newLeader().brokerId().intValue() || leader == null || !ExecutionUtils.isInIsr(task.proposal().newLeader().brokerId(), cluster, tp);
            }
            case ABORTING: 
            case DEAD: {
                return true;
            }
        }
        throw new IllegalStateException("Should never be here.");
    }

    private static boolean isInIsr(Integer leader, Cluster cluster, TopicPartition tp) {
        return Arrays.stream(cluster.partition(tp).inSyncReplicas()).anyMatch(node -> node.id() == leader.intValue());
    }
}

