/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerDiskUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerEvenRackAwareGoal;
import com.linkedin.kafka.cruisecontrol.async.progress.OperationProgress;
import com.linkedin.kafka.cruisecontrol.async.progress.WaitingForOngoingExecutionToStop;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import com.linkedin.kafka.cruisecontrol.executor.strategy.ReplicaMovementStrategy;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils;
import com.linkedin.kafka.cruisecontrol.servlet.response.CruiseControlState;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RunnableUtils {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaCruiseControlUtils.class);
    public static final boolean SELF_HEALING_DRYRUN = false;
    public static final Set<Integer> SELF_HEALING_DESTINATION_BROKER_IDS = Collections.emptySet();
    public static final ReplicaMovementStrategy SELF_HEALING_REPLICA_MOVEMENT_STRATEGY = null;
    public static final Pattern SELF_HEALING_EXCLUDED_TOPICS = null;
    public static final Integer SELF_HEALING_CONCURRENT_MOVEMENTS = null;
    public static final Long SELF_HEALING_EXECUTION_PROGRESS_CHECK_INTERVAL_MS = null;
    public static final boolean SELF_HEALING_SKIP_HARD_GOAL_CHECK = false;
    public static final boolean SELF_HEALING_STOP_ONGOING_EXECUTION = false;
    public static final ModelCompletenessRequirements SELF_HEALING_MODEL_COMPLETENESS_REQUIREMENTS = null;
    public static final boolean SELF_HEALING_SKIP_URP_DEMOTION = true;
    public static final boolean SELF_HEALING_EXCLUDE_FOLLOWER_DEMOTION = true;
    public static final boolean SELF_HEALING_SKIP_RACK_AWARENESS_CHECK = false;
    public static final boolean SELF_HEALING_IS_TRIGGERED_BY_USER_REQUEST = false;
    private static final Set<String> KAFKA_ASSIGNER_GOALS = Collections.unmodifiableSet(new HashSet<String>(Arrays.asList(KafkaAssignerEvenRackAwareGoal.class.getSimpleName(), KafkaAssignerDiskUsageDistributionGoal.class.getSimpleName())));

    private RunnableUtils() {
    }

    public static void populateRackInfoForReplicationFactorChange(Map<Short, Set<String>> topicsByReplicationFactor, Cluster cluster, boolean skipTopicRackAwarenessCheck, Map<String, List<Integer>> brokersByRack, Map<Integer, String> rackByBroker) {
        for (Node node : cluster.nodes()) {
            String rack = MonitorUtils.getRackHandleNull(node);
            brokersByRack.putIfAbsent(rack, new ArrayList());
            brokersByRack.get(rack).add(node.id());
            rackByBroker.put(node.id(), rack);
        }
        topicsByReplicationFactor.forEach((replicationFactor, topics) -> {
            if (replicationFactor > rackByBroker.size()) {
                throw new RuntimeException(String.format("Unable to change replication factor (RF) of topics %s to %d since there are only %d alive brokers in the cluster. Requested RF cannot be more than number of alive brokers.", topics, replicationFactor, rackByBroker.size()));
            }
            if (replicationFactor > brokersByRack.size()) {
                if (skipTopicRackAwarenessCheck) {
                    LOG.info("Target replication factor for topics {} is {}, which is larger than number of racks in cluster. Rack-awareness property will be violated to add new replicas.", topics, replicationFactor);
                } else {
                    throw new RuntimeException(String.format("Unable to change replication factor of topics %s to %d since there are only %d racks in the cluster, to skip the rack-awareness check, set %s to true in the request.", topics, replicationFactor, brokersByRack.size(), "skip_rack_awareness_check"));
                }
            }
        });
    }

    private static void sanityCheckTargetReplicationFactorForTopic(Map<Short, Set<String>> topicsToChangeByReplicationFactor) {
        HashSet<String> topicsToChange = new HashSet<String>();
        HashSet<String> topicsHavingMultipleTargetReplicationFactors = new HashSet<String>();
        for (Set<String> topics : topicsToChangeByReplicationFactor.values()) {
            for (String topic : topics) {
                if (topicsToChange.add(topic)) continue;
                topicsHavingMultipleTargetReplicationFactors.add(topic);
            }
        }
        if (!topicsHavingMultipleTargetReplicationFactors.isEmpty()) {
            throw new IllegalStateException(String.format("Topics %s are requested with more than one target replication factor.", topicsHavingMultipleTargetReplicationFactors));
        }
    }

    public static Map<Short, Set<String>> topicsForReplicationFactorChange(Map<Short, Pattern> topicPatternByReplicationFactor, Cluster cluster) {
        HashMap<Short, Set<String>> topicsToChangeByReplicationFactor = new HashMap<Short, Set<String>>(topicPatternByReplicationFactor.size());
        for (Map.Entry<Short, Pattern> entry : topicPatternByReplicationFactor.entrySet()) {
            short replicationFactor = entry.getKey();
            Pattern topicPattern = entry.getValue();
            Set topics = cluster.topics().stream().filter(t -> topicPattern.matcher((CharSequence)t).matches()).collect(Collectors.toSet());
            if (topics.isEmpty()) {
                throw new IllegalStateException(String.format("There is no topic in cluster matching pattern '%s'.", topicPattern));
            }
            Set topicsToChange = topics.stream().filter(t -> cluster.partitionsForTopic(t).stream().anyMatch(p -> p.replicas().length != replicationFactor)).collect(Collectors.toSet());
            if (topicsToChange.isEmpty()) continue;
            topicsToChangeByReplicationFactor.put(replicationFactor, topicsToChange);
        }
        if (topicsToChangeByReplicationFactor.isEmpty()) {
            throw new IllegalStateException(String.format("All topics matching given pattern already have target replication factor. Requested topic pattern by replication factor: %s.", topicPatternByReplicationFactor));
        }
        RunnableUtils.sanityCheckTargetReplicationFactorForTopic(topicsToChangeByReplicationFactor);
        return topicsToChangeByReplicationFactor;
    }

    public static boolean shouldRefreshClusterAndGeneration(Set<CruiseControlState.SubState> substates) {
        return substates.stream().anyMatch(substate -> substate == CruiseControlState.SubState.ANALYZER || substate == CruiseControlState.SubState.MONITOR);
    }

    public static PartitionInfo partitionWithOfflineReplicas(Cluster cluster) {
        for (String topic : cluster.topics()) {
            for (PartitionInfo partitionInfo : cluster.partitionsForTopic(topic)) {
                if (partitionInfo.offlineReplicas().length <= 0) continue;
                return partitionInfo;
            }
        }
        return null;
    }

    public static boolean isKafkaAssignerMode(Collection<String> goals) {
        return goals.stream().anyMatch(KAFKA_ASSIGNER_GOALS::contains);
    }

    public static void sanityCheckBrokersHavingOfflineReplicasOnBadDisks(List<String> goals, ClusterModel clusterModel) {
        if (RunnableUtils.isKafkaAssignerMode(goals) && !clusterModel.brokersHavingOfflineReplicasOnBadDisks().isEmpty()) {
            throw new IllegalStateException("Kafka Assigner mode is not supported when there are offline replicas on bad disks. Please run fix_offline_replicas before using Kafka Assigner mode.");
        }
    }

    public static void maybeStopOngoingExecutionToModifyAndWait(KafkaCruiseControl kafkaCruiseControl, OperationProgress operationProgress) {
        if (!kafkaCruiseControl.hasOngoingExecution()) {
            LOG.info("There is already no ongoing Cruise Control execution. Skip stopping execution.");
            return;
        }
        if (!kafkaCruiseControl.modifyOngoingExecution(true)) {
            throw new IllegalStateException("Another request has asked for modifying the ongoing execution.");
        }
        LOG.info("Gracefully stopping the ongoing execution...");
        WaitingForOngoingExecutionToStop step = new WaitingForOngoingExecutionToStop();
        operationProgress.addStep(step);
        while (kafkaCruiseControl.executionState() != ExecutorState.State.NO_TASK_IN_PROGRESS) {
            try {
                kafkaCruiseControl.userTriggeredStopExecution(false);
                Thread.sleep(kafkaCruiseControl.executionProgressCheckIntervalMs());
            }
            catch (InterruptedException e) {
                kafkaCruiseControl.modifyOngoingExecution(false);
                throw new IllegalStateException("Interrupted while waiting for gracefully stopping the ongoing execution.");
            }
        }
        step.done();
    }

    public static OptimizationOptions computeOptimizationOptions(ClusterModel clusterModel, boolean isTriggeredByGoalViolation, KafkaCruiseControl kafkaCruiseControl, Set<Integer> brokersToDrop, boolean dryRun, boolean excludeRecentlyDemotedBrokers, boolean excludeRecentlyRemovedBrokers, Pattern excludedTopicsPattern, Set<Integer> requestedDestinationBrokerIds, boolean onlyMoveImmigrantReplicas) {
        RecentBrokers recentBrokers = RunnableUtils.maybeDropFromRecentBrokers(kafkaCruiseControl, brokersToDrop, dryRun);
        Set<Integer> excludedBrokersForLeadership = excludeRecentlyDemotedBrokers ? recentBrokers.recentlyDemotedBrokers() : Collections.emptySet();
        Set<Integer> excludedBrokersForReplicaMove = excludeRecentlyRemovedBrokers ? recentBrokers.recentlyRemovedBrokers() : Collections.emptySet();
        Set<String> excludedTopics = kafkaCruiseControl.excludedTopics(clusterModel, excludedTopicsPattern);
        LOG.debug("Topics excluded from partition movement: {}", excludedTopics);
        return new OptimizationOptions(excludedTopics, excludedBrokersForLeadership, excludedBrokersForReplicaMove, isTriggeredByGoalViolation, requestedDestinationBrokerIds, onlyMoveImmigrantReplicas);
    }

    private static RecentBrokers maybeDropFromRecentBrokers(KafkaCruiseControl kafkaCruiseControl, Set<Integer> brokersToDrop, boolean dryRun) {
        ExecutorState executorState = kafkaCruiseControl.executorState();
        if (!dryRun) {
            kafkaCruiseControl.dropRecentBrokers(brokersToDrop, true);
            kafkaCruiseControl.dropRecentBrokers(brokersToDrop, false);
            return new RecentBrokers(executorState.recentlyRemovedBrokers(), executorState.recentlyDemotedBrokers());
        }
        HashSet<Integer> recentlyRemoved = new HashSet<Integer>(executorState.recentlyRemovedBrokers());
        recentlyRemoved.removeAll(brokersToDrop);
        HashSet<Integer> recentlyDemoted = new HashSet<Integer>(executorState.recentlyDemotedBrokers());
        recentlyDemoted.removeAll(brokersToDrop);
        return new RecentBrokers(recentlyRemoved, recentlyDemoted);
    }

    public static void sanityCheckOfflineReplicaPresence(ClusterModel clusterModel) {
        if (clusterModel.brokersHavingOfflineReplicasOnBadDisks().isEmpty()) {
            for (Broker deadBroker : clusterModel.deadBrokers()) {
                if (deadBroker.replicas().isEmpty()) continue;
                return;
            }
            throw new IllegalStateException("Cluster has no offline replica on brokers " + clusterModel.brokers() + " to fix.");
        }
    }

    public static class RecentBrokers {
        private final Set<Integer> _recentlyRemovedBrokers;
        private final Set<Integer> _recentlyDemotedBrokers;

        public RecentBrokers(Set<Integer> recentlyRemovedBrokers, Set<Integer> recentlyDemotedBrokers) {
            if (recentlyRemovedBrokers == null || recentlyDemotedBrokers == null) {
                throw new IllegalArgumentException("Attempt to set a null value for recent brokers.");
            }
            this._recentlyRemovedBrokers = recentlyRemovedBrokers;
            this._recentlyDemotedBrokers = recentlyDemotedBrokers;
        }

        public Set<Integer> recentlyRemovedBrokers() {
            return this._recentlyRemovedBrokers;
        }

        public Set<Integer> recentlyDemotedBrokers() {
            return this._recentlyDemotedBrokers;
        }
    }
}

