/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.executor;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionProposal;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTask;
import com.linkedin.kafka.cruisecontrol.executor.strategy.BaseReplicaMovementStrategy;
import com.linkedin.kafka.cruisecontrol.executor.strategy.ReplicaMovementStrategy;
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionReplica;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutionTaskPlanner {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutionTaskPlanner.class);
    private Map<Integer, SortedSet<ExecutionTask>> _interPartMoveTaskByBrokerId = new HashMap<Integer, SortedSet<ExecutionTask>>();
    private Map<Integer, SortedSet<ExecutionTask>> _intraPartMoveTaskByBrokerId = new HashMap<Integer, SortedSet<ExecutionTask>>();
    private final Set<ExecutionTask> _remainingInterBrokerReplicaMovements = new HashSet<ExecutionTask>();
    private final Set<ExecutionTask> _remainingIntraBrokerReplicaMovements = new HashSet<ExecutionTask>();
    private final Map<Long, ExecutionTask> _remainingLeadershipMovements = new HashMap<Long, ExecutionTask>();
    private long _executionId = 0L;
    private ReplicaMovementStrategy _defaultReplicaMovementTaskStrategy;
    private final AdminClient _adminClient;
    private final KafkaCruiseControlConfig _config;
    private final long _taskExecutionAlertingThresholdMs;
    private final double _interBrokerReplicaMovementRateAlertingThreshold;
    private final double _intraBrokerReplicaMovementRateAlertingThreshold;

    public ExecutionTaskPlanner(AdminClient adminClient, KafkaCruiseControlConfig config) {
        this._config = config;
        this._taskExecutionAlertingThresholdMs = config.getLong("task.execution.alerting.threshold.ms");
        this._interBrokerReplicaMovementRateAlertingThreshold = config.getDouble("inter.broker.replica.movement.rate.alerting.threshold");
        this._intraBrokerReplicaMovementRateAlertingThreshold = config.getDouble("intra.broker.replica.movement.rate.alerting.threshold");
        this._adminClient = adminClient;
        List defaultReplicaMovementStrategies = config.getList("default.replica.movement.strategies");
        if (defaultReplicaMovementStrategies == null || defaultReplicaMovementStrategies.isEmpty()) {
            this._defaultReplicaMovementTaskStrategy = new BaseReplicaMovementStrategy();
        } else {
            for (String replicaMovementStrategy : defaultReplicaMovementStrategies) {
                try {
                    if (this._defaultReplicaMovementTaskStrategy == null) {
                        this._defaultReplicaMovementTaskStrategy = (ReplicaMovementStrategy)Class.forName(replicaMovementStrategy).newInstance();
                        continue;
                    }
                    this._defaultReplicaMovementTaskStrategy = this._defaultReplicaMovementTaskStrategy.chain((ReplicaMovementStrategy)Class.forName(replicaMovementStrategy).newInstance());
                }
                catch (Exception e) {
                    throw new RuntimeException("Error occurred while setting up the replica movement strategy: " + replicaMovementStrategy + ".", e);
                }
            }
            this._defaultReplicaMovementTaskStrategy = this._defaultReplicaMovementTaskStrategy.chain(new BaseReplicaMovementStrategy());
        }
    }

    public void addExecutionProposals(Collection<ExecutionProposal> proposals, Cluster cluster, ReplicaMovementStrategy replicaMovementStrategy) {
        LOG.trace("Cluster state before adding proposals: {}.", (Object)cluster);
        this.maybeAddInterBrokerReplicaMovementTasks(proposals, cluster, replicaMovementStrategy);
        this.maybeAddIntraBrokerReplicaMovementTasks(proposals);
        this.maybeAddLeaderChangeTasks(proposals, cluster);
        this.sanityCheckExecutionTasks();
        this.maybeDropReplicaSwapTasks();
    }

    private void sanityCheckExecutionTasks() {
        if (this._remainingIntraBrokerReplicaMovements.size() > 0) {
            for (ExecutionTask task : this._remainingInterBrokerReplicaMovements) {
                if (task.proposal().replicasToAdd().size() <= 0) continue;
                throw new IllegalStateException("Intra-broker partition movement should not mingle with inter-broker partition movement.");
            }
        }
    }

    private void maybeDropReplicaSwapTasks() {
        if (this._remainingIntraBrokerReplicaMovements.size() > 0) {
            this._interPartMoveTaskByBrokerId.clear();
            this._remainingInterBrokerReplicaMovements.clear();
        }
    }

    private void maybeAddInterBrokerReplicaMovementTasks(Collection<ExecutionProposal> proposals, Cluster cluster, ReplicaMovementStrategy replicaMovementStrategy) {
        for (ExecutionProposal proposal : proposals) {
            long replicaActionExecutionId;
            TopicPartition tp = proposal.topicPartition();
            PartitionInfo partitionInfo = cluster.partition(tp);
            if (partitionInfo == null) {
                LOG.trace("Ignored the attempt to move non-existing partition for topic partition: {}", (Object)tp);
                continue;
            }
            if (proposal.isInterBrokerMovementCompleted(partitionInfo)) continue;
            ++this._executionId;
            long executionAlertingThresholdMs = Math.max(Math.round((double)proposal.dataToMoveInMB() / this._interBrokerReplicaMovementRateAlertingThreshold), this._taskExecutionAlertingThresholdMs);
            ExecutionTask executionTask = new ExecutionTask(replicaActionExecutionId, proposal, ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION, executionAlertingThresholdMs);
            this._remainingInterBrokerReplicaMovements.add(executionTask);
            LOG.trace("Added action {} as replica proposal {}", (Object)replicaActionExecutionId, (Object)proposal);
        }
        this._interPartMoveTaskByBrokerId = replicaMovementStrategy == null ? this._defaultReplicaMovementTaskStrategy.applyStrategy(this._remainingInterBrokerReplicaMovements, cluster) : replicaMovementStrategy.chain(new BaseReplicaMovementStrategy()).applyStrategy(this._remainingInterBrokerReplicaMovements, cluster);
    }

    private void maybeAddIntraBrokerReplicaMovementTasks(Collection<ExecutionProposal> proposals) {
        HashSet replicasToCheckLogdir = new HashSet();
        for (ExecutionProposal proposal : proposals) {
            proposal.replicasToMoveBetweenDisksByBroker().keySet().forEach(broker -> replicasToCheckLogdir.add(new TopicPartitionReplica(proposal.topic(), proposal.partitionId(), broker.intValue())));
        }
        if (!replicasToCheckLogdir.isEmpty()) {
            HashMap<TopicPartitionReplica, String> currentLogdirByReplica = new HashMap<TopicPartitionReplica, String>(replicasToCheckLogdir.size());
            Map logDirsByReplicas = this._adminClient.describeReplicaLogDirs(replicasToCheckLogdir).values();
            for (Map.Entry entry : logDirsByReplicas.entrySet()) {
                try {
                    DescribeReplicaLogDirsResult.ReplicaLogDirInfo info = (DescribeReplicaLogDirsResult.ReplicaLogDirInfo)((KafkaFuture)entry.getValue()).get(this._config.getLong("logdir.response.timeout.ms").longValue(), TimeUnit.MILLISECONDS);
                    currentLogdirByReplica.put((TopicPartitionReplica)entry.getKey(), info.getCurrentReplicaLogDir());
                }
                catch (InterruptedException | ExecutionException | TimeoutException e) {
                    LOG.warn("Encounter exception {} when fetching logdir information for replica {}.", (Object)e.getMessage(), entry.getKey());
                }
            }
            for (ExecutionProposal proposal : proposals) {
                proposal.replicasToMoveBetweenDisksByBroker().values().forEach(r -> {
                    String currentLogdir = (String)currentLogdirByReplica.get(new TopicPartitionReplica(proposal.topic(), proposal.partitionId(), r.brokerId().intValue()));
                    if (currentLogdir != null && !currentLogdir.equals(r.logdir())) {
                        long replicaActionExecutionId = this._executionId++;
                        ExecutionTask task = new ExecutionTask(replicaActionExecutionId, proposal, r.brokerId(), ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION, Math.max(Math.round((double)proposal.dataToMoveInMB() / this._intraBrokerReplicaMovementRateAlertingThreshold), this._taskExecutionAlertingThresholdMs));
                        this._intraPartMoveTaskByBrokerId.putIfAbsent(r.brokerId(), new TreeSet());
                        this._intraPartMoveTaskByBrokerId.get(r.brokerId()).add(task);
                        this._remainingIntraBrokerReplicaMovements.add(task);
                    }
                });
            }
        }
    }

    private void maybeAddLeaderChangeTasks(Collection<ExecutionProposal> proposals, Cluster cluster) {
        for (ExecutionProposal proposal : proposals) {
            long leaderActionExecutionId;
            Node currentLeader;
            if (!proposal.hasLeaderAction() || (currentLeader = cluster.leaderFor(proposal.topicPartition())) == null || currentLeader.id() == proposal.newLeader().brokerId().intValue()) continue;
            ++this._executionId;
            ExecutionTask leaderActionTask = new ExecutionTask(leaderActionExecutionId, proposal, ExecutionTask.TaskType.LEADER_ACTION, this._taskExecutionAlertingThresholdMs);
            this._remainingLeadershipMovements.put(leaderActionExecutionId, leaderActionTask);
            LOG.trace("Added action {} as leader proposal {}", (Object)leaderActionExecutionId, (Object)proposal);
        }
    }

    public Set<ExecutionTask> remainingInterBrokerReplicaMovements() {
        return this._remainingInterBrokerReplicaMovements;
    }

    public Set<ExecutionTask> remainingIntraBrokerReplicaMovements() {
        return this._remainingIntraBrokerReplicaMovements;
    }

    public Collection<ExecutionTask> remainingLeadershipMovements() {
        return this._remainingLeadershipMovements.values();
    }

    public List<ExecutionTask> getLeadershipMovementTasks(int numTasks) {
        ArrayList<ExecutionTask> leadershipMovementsList = new ArrayList<ExecutionTask>();
        Iterator<ExecutionTask> leadershipMovementIter = this._remainingLeadershipMovements.values().iterator();
        for (int i = 0; i < numTasks && leadershipMovementIter.hasNext(); ++i) {
            leadershipMovementsList.add(leadershipMovementIter.next());
            leadershipMovementIter.remove();
        }
        return leadershipMovementsList;
    }

    public List<ExecutionTask> getInterBrokerReplicaMovementTasks(Map<Integer, Integer> readyBrokers, Set<TopicPartition> inProgressPartitions) {
        LOG.trace("Getting inter-broker replica movement tasks for brokers with concurrency {}", readyBrokers);
        ArrayList<ExecutionTask> executableReplicaMovements = new ArrayList<ExecutionTask>();
        boolean newTaskAdded = true;
        HashSet<Integer> brokerInvolved = new HashSet<Integer>();
        HashSet<TopicPartition> partitionsInvolved = new HashSet<TopicPartition>();
        while (newTaskAdded) {
            newTaskAdded = false;
            brokerInvolved.clear();
            block1: for (Map.Entry<Integer, Integer> brokerEntry : readyBrokers.entrySet()) {
                int brokerId = brokerEntry.getKey();
                if (brokerInvolved.contains(brokerId)) continue;
                SortedSet<ExecutionTask> proposalsForBroker = this._interPartMoveTaskByBrokerId.get(brokerId);
                LOG.trace("Execution task for broker {} are {}", (Object)brokerId, proposalsForBroker);
                if (proposalsForBroker == null) continue;
                for (ExecutionTask task : proposalsForBroker) {
                    int sourceBroker = task.proposal().oldLeader().brokerId();
                    Set<Integer> destinationBrokers = task.proposal().replicasToAdd().stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet());
                    if (brokerInvolved.contains(sourceBroker) || KafkaCruiseControlUtils.containsAny(brokerInvolved, destinationBrokers)) continue;
                    TopicPartition tp = task.proposal().topicPartition();
                    if (!this.isExecutableProposal(task.proposal(), readyBrokers) || inProgressPartitions.contains(tp) || partitionsInvolved.contains(tp)) continue;
                    partitionsInvolved.add(tp);
                    executableReplicaMovements.add(task);
                    brokerInvolved.add(sourceBroker);
                    brokerInvolved.addAll(destinationBrokers);
                    this.removeInterBrokerReplicaActionForExecution(task);
                    readyBrokers.put(sourceBroker, readyBrokers.get(sourceBroker) - 1);
                    for (int broker : destinationBrokers) {
                        readyBrokers.put(broker, readyBrokers.get(broker) - 1);
                    }
                    newTaskAdded = true;
                    LOG.debug("Found ready task {} for broker {}. Broker concurrency state: {}", new Object[]{task, brokerId, readyBrokers});
                    continue block1;
                }
            }
        }
        return executableReplicaMovements;
    }

    public List<ExecutionTask> getIntraBrokerReplicaMovementTasks(Map<Integer, Integer> readyBrokers) {
        LOG.trace("Getting intra-broker replica movement tasks for brokers with concurrency {}", readyBrokers);
        ArrayList<ExecutionTask> executableReplicaMovements = new ArrayList<ExecutionTask>();
        for (Map.Entry<Integer, Integer> brokerEntry : readyBrokers.entrySet()) {
            int brokerId = brokerEntry.getKey();
            int limit = brokerEntry.getValue();
            if (!this._intraPartMoveTaskByBrokerId.containsKey(brokerId)) continue;
            Iterator tasksForBroker = this._intraPartMoveTaskByBrokerId.get(brokerId).iterator();
            while (limit-- > 0 && tasksForBroker.hasNext()) {
                ExecutionTask task = (ExecutionTask)tasksForBroker.next();
                executableReplicaMovements.add(task);
                tasksForBroker.remove();
                this._remainingIntraBrokerReplicaMovements.remove(task);
            }
        }
        return executableReplicaMovements;
    }

    public void clear() {
        this._intraPartMoveTaskByBrokerId.clear();
        this._interPartMoveTaskByBrokerId.clear();
        this._remainingLeadershipMovements.clear();
        this._remainingInterBrokerReplicaMovements.clear();
        this._remainingIntraBrokerReplicaMovements.clear();
    }

    private boolean isExecutableProposal(ExecutionProposal proposal, Map<Integer, Integer> readyBrokers) {
        if (readyBrokers.get(proposal.oldLeader().brokerId()) <= 0) {
            return false;
        }
        for (ReplicaPlacementInfo destinationBroker : proposal.replicasToAdd()) {
            if (readyBrokers.get(destinationBroker.brokerId()) > 0) continue;
            return false;
        }
        return true;
    }

    private void removeInterBrokerReplicaActionForExecution(ExecutionTask task) {
        int sourceBroker = task.proposal().oldLeader().brokerId();
        this._interPartMoveTaskByBrokerId.get(sourceBroker).remove(task);
        for (ReplicaPlacementInfo destinationBroker : task.proposal().replicasToAdd()) {
            this._interPartMoveTaskByBrokerId.get(destinationBroker.brokerId()).remove(task);
        }
        this._remainingInterBrokerReplicaMovements.remove(task);
    }
}

