/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.executor;

import com.codahale.metrics.MetricRegistry;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionProposal;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTask;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskPlanner;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskState;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskTracker;
import com.linkedin.kafka.cruisecontrol.executor.strategy.ReplicaMovementStrategy;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutionTaskManager {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutionTaskManager.class);
    private final Map<Integer, Integer> _inProgressInterBrokerReplicaMovementsByBrokerId = new HashMap<Integer, Integer>();
    private final Map<Integer, Integer> _inProgressIntraBrokerReplicaMovementsByBrokerId = new HashMap<Integer, Integer>();
    private final Set<TopicPartition> _inProgressPartitionsForInterBrokerMovement = new HashSet<TopicPartition>();
    private final ExecutionTaskTracker _executionTaskTracker;
    private final ExecutionTaskPlanner _executionTaskPlanner;
    private final int _defaultInterBrokerPartitionMovementConcurrency;
    private Integer _requestedInterBrokerPartitionMovementConcurrency;
    private final int _defaultIntraBrokerPartitionMovementConcurrency;
    private Integer _requestedIntraBrokerPartitionMovementConcurrency;
    private final int _defaultLeadershipMovementConcurrency;
    private final int _maxNumClusterMovementConcurrency;
    private Integer _requestedLeadershipMovementConcurrency;
    private final Set<Integer> _brokersToSkipConcurrencyCheck;
    private boolean _isKafkaAssignerMode;

    public ExecutionTaskManager(AdminClient adminClient, MetricRegistry dropwizardMetricRegistry, Time time, KafkaCruiseControlConfig config) {
        this._executionTaskTracker = new ExecutionTaskTracker(dropwizardMetricRegistry, time);
        this._executionTaskPlanner = new ExecutionTaskPlanner(adminClient, config);
        this._defaultInterBrokerPartitionMovementConcurrency = config.getInt("num.concurrent.partition.movements.per.broker");
        this._defaultIntraBrokerPartitionMovementConcurrency = config.getInt("num.concurrent.intra.broker.partition.movements");
        this._defaultLeadershipMovementConcurrency = config.getInt("num.concurrent.leader.movements");
        this._maxNumClusterMovementConcurrency = config.getInt("max.num.cluster.movements");
        this._brokersToSkipConcurrencyCheck = new HashSet<Integer>();
        this._isKafkaAssignerMode = false;
        this._requestedInterBrokerPartitionMovementConcurrency = null;
        this._requestedIntraBrokerPartitionMovementConcurrency = null;
        this._requestedLeadershipMovementConcurrency = null;
    }

    public synchronized void setRequestedInterBrokerPartitionMovementConcurrency(Integer requestedInterBrokerPartitionMovementConcurrency) {
        if (requestedInterBrokerPartitionMovementConcurrency != null && requestedInterBrokerPartitionMovementConcurrency >= this._maxNumClusterMovementConcurrency) {
            throw new IllegalArgumentException("Attempt to set inter-broker partition movement concurrency [" + requestedInterBrokerPartitionMovementConcurrency + "] to greater than or equal to the maximum number of allowed movements in cluster [" + this._maxNumClusterMovementConcurrency + "].");
        }
        this._requestedInterBrokerPartitionMovementConcurrency = requestedInterBrokerPartitionMovementConcurrency;
    }

    public synchronized void setRequestedIntraBrokerPartitionMovementConcurrency(Integer requestedIntraBrokerPartitionMovementConcurrency) {
        if (requestedIntraBrokerPartitionMovementConcurrency != null && requestedIntraBrokerPartitionMovementConcurrency >= this._maxNumClusterMovementConcurrency) {
            throw new IllegalArgumentException("Attempt to set intra-broker partition movement concurrency [" + requestedIntraBrokerPartitionMovementConcurrency + "] to greater than or equal to the maximum number of allowed movements in cluster [" + this._maxNumClusterMovementConcurrency + "].");
        }
        this._requestedIntraBrokerPartitionMovementConcurrency = requestedIntraBrokerPartitionMovementConcurrency;
    }

    public synchronized void setRequestedLeadershipMovementConcurrency(Integer requestedLeadershipMovementConcurrency) {
        if (requestedLeadershipMovementConcurrency != null && requestedLeadershipMovementConcurrency > this._maxNumClusterMovementConcurrency) {
            throw new IllegalArgumentException("Attempt to set leadership movement concurrency [" + requestedLeadershipMovementConcurrency + "] to greater than the maximum number of allowed movements in cluster [" + this._maxNumClusterMovementConcurrency + "].");
        }
        this._requestedLeadershipMovementConcurrency = requestedLeadershipMovementConcurrency;
    }

    public synchronized int interBrokerPartitionMovementConcurrency() {
        return this._requestedInterBrokerPartitionMovementConcurrency == null ? this._defaultInterBrokerPartitionMovementConcurrency : this._requestedInterBrokerPartitionMovementConcurrency;
    }

    public synchronized int intraBrokerPartitionMovementConcurrency() {
        return this._requestedIntraBrokerPartitionMovementConcurrency == null ? this._defaultIntraBrokerPartitionMovementConcurrency : this._requestedIntraBrokerPartitionMovementConcurrency;
    }

    public synchronized int leadershipMovementConcurrency() {
        return this._requestedLeadershipMovementConcurrency == null ? this._defaultLeadershipMovementConcurrency : this._requestedLeadershipMovementConcurrency;
    }

    public synchronized List<ExecutionTask> getInterBrokerReplicaMovementTasks() {
        Map<Integer, Integer> brokersReadyForReplicaMovement = this.brokersReadyForReplicaMovement(this._inProgressInterBrokerReplicaMovementsByBrokerId, this.interBrokerPartitionMovementConcurrency());
        return this._executionTaskPlanner.getInterBrokerReplicaMovementTasks(brokersReadyForReplicaMovement, this._inProgressPartitionsForInterBrokerMovement);
    }

    public synchronized List<ExecutionTask> getIntraBrokerReplicaMovementTasks() {
        Map<Integer, Integer> brokersReadyForReplicaMovement = this.brokersReadyForReplicaMovement(this._inProgressIntraBrokerReplicaMovementsByBrokerId, this.intraBrokerPartitionMovementConcurrency());
        return this._executionTaskPlanner.getIntraBrokerReplicaMovementTasks(brokersReadyForReplicaMovement);
    }

    private int unthrottledConcurrency(Set<Integer> brokersWithReplicaMoves, int throttledConcurrency) {
        int numUnthrottledBrokers = (int)brokersWithReplicaMoves.stream().filter(this._brokersToSkipConcurrencyCheck::contains).count();
        if (numUnthrottledBrokers == 0) {
            return Integer.MAX_VALUE;
        }
        int unthrottledConcurrency = this._maxNumClusterMovementConcurrency / numUnthrottledBrokers;
        LOG.debug("Unthrottled concurrency is {} for {} brokers.", (Object)unthrottledConcurrency, (Object)numUnthrottledBrokers);
        return unthrottledConcurrency;
    }

    private Map<Integer, Integer> brokersReadyForReplicaMovement(Map<Integer, Integer> inProgressReplicaMovementsByBrokerId, int throttledConcurrency) {
        HashMap<Integer, Integer> readyBrokers = new HashMap<Integer, Integer>(inProgressReplicaMovementsByBrokerId.size());
        int unthrottledConcurrency = this.unthrottledConcurrency(inProgressReplicaMovementsByBrokerId.keySet(), throttledConcurrency);
        inProgressReplicaMovementsByBrokerId.forEach((bid, inProgressReplicaMovements) -> {
            int brokerConcurrency = this._brokersToSkipConcurrencyCheck.contains(bid) ? unthrottledConcurrency : throttledConcurrency;
            readyBrokers.put((Integer)bid, Math.max(0, brokerConcurrency - inProgressReplicaMovements));
        });
        return readyBrokers;
    }

    public synchronized List<ExecutionTask> getLeadershipMovementTasks() {
        return this._executionTaskPlanner.getLeadershipMovementTasks(this.leadershipMovementConcurrency());
    }

    public synchronized void addExecutionProposals(Collection<ExecutionProposal> proposals, Collection<Integer> brokersToSkipConcurrencyCheck, Cluster cluster, ReplicaMovementStrategy replicaMovementStrategy) {
        this._executionTaskPlanner.addExecutionProposals(proposals, cluster, replicaMovementStrategy);
        for (ExecutionProposal p : proposals) {
            p.replicasToMoveBetweenDisksByBroker().keySet().forEach(broker -> this._inProgressIntraBrokerReplicaMovementsByBrokerId.putIfAbsent((Integer)broker, 0));
            this._inProgressInterBrokerReplicaMovementsByBrokerId.putIfAbsent(p.oldLeader().brokerId(), 0);
            p.replicasToAdd().forEach(r -> this._inProgressInterBrokerReplicaMovementsByBrokerId.putIfAbsent(r.brokerId(), 0));
        }
        this._executionTaskTracker.setExecutionMode(this._isKafkaAssignerMode);
        this._executionTaskTracker.addTasksToTrace(this._executionTaskPlanner.remainingInterBrokerReplicaMovements(), ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION);
        this._executionTaskTracker.addTasksToTrace(this._executionTaskPlanner.remainingIntraBrokerReplicaMovements(), ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION);
        this._executionTaskTracker.addTasksToTrace(this._executionTaskPlanner.remainingLeadershipMovements(), ExecutionTask.TaskType.LEADER_ACTION);
        this._brokersToSkipConcurrencyCheck.clear();
        if (brokersToSkipConcurrencyCheck != null) {
            this._brokersToSkipConcurrencyCheck.addAll(brokersToSkipConcurrencyCheck);
        }
    }

    public synchronized void setExecutionModeForTaskTracker(boolean isKafkaAssignerMode) {
        this._isKafkaAssignerMode = isKafkaAssignerMode;
    }

    public synchronized void markTasksInProgress(List<ExecutionTask> tasks) {
        for (ExecutionTask task : tasks) {
            this._executionTaskTracker.markTaskState(task, ExecutionTaskState.IN_PROGRESS);
            switch (task.type()) {
                case INTER_BROKER_REPLICA_ACTION: {
                    this._inProgressPartitionsForInterBrokerMovement.add(task.proposal().topicPartition());
                    int oldLeader = task.proposal().oldLeader().brokerId();
                    this._inProgressInterBrokerReplicaMovementsByBrokerId.put(oldLeader, this._inProgressInterBrokerReplicaMovementsByBrokerId.get(oldLeader) + 1);
                    task.proposal().replicasToAdd().forEach(r -> this._inProgressInterBrokerReplicaMovementsByBrokerId.put(r.brokerId(), this._inProgressInterBrokerReplicaMovementsByBrokerId.get(r.brokerId()) + 1));
                    break;
                }
                case INTRA_BROKER_REPLICA_ACTION: {
                    this._inProgressIntraBrokerReplicaMovementsByBrokerId.put(task.brokerId(), this._inProgressIntraBrokerReplicaMovementsByBrokerId.get(task.brokerId()) + 1);
                    break;
                }
            }
        }
    }

    public synchronized void markTaskDone(ExecutionTask task) {
        if (task.state() == ExecutionTaskState.IN_PROGRESS) {
            this._executionTaskTracker.markTaskState(task, ExecutionTaskState.COMPLETED);
            this.completeTask(task);
        } else if (task.state() == ExecutionTaskState.ABORTING) {
            this._executionTaskTracker.markTaskState(task, ExecutionTaskState.ABORTED);
            this.completeTask(task);
        }
    }

    public synchronized void markTaskAborting(ExecutionTask task) {
        if (task.state() == ExecutionTaskState.IN_PROGRESS) {
            this._executionTaskTracker.markTaskState(task, ExecutionTaskState.ABORTING);
        }
    }

    public synchronized void markTaskDead(ExecutionTask task) {
        if (task.state() != ExecutionTaskState.DEAD) {
            this._executionTaskTracker.markTaskState(task, ExecutionTaskState.DEAD);
            this.completeTask(task);
        }
    }

    private void completeTask(ExecutionTask task) {
        switch (task.type()) {
            case INTER_BROKER_REPLICA_ACTION: {
                this._inProgressPartitionsForInterBrokerMovement.remove(task.proposal().topicPartition());
                int oldLeader = task.proposal().oldLeader().brokerId();
                this._inProgressInterBrokerReplicaMovementsByBrokerId.put(oldLeader, this._inProgressInterBrokerReplicaMovementsByBrokerId.get(oldLeader) - 1);
                task.proposal().replicasToAdd().forEach(r -> this._inProgressInterBrokerReplicaMovementsByBrokerId.put(r.brokerId(), this._inProgressInterBrokerReplicaMovementsByBrokerId.get(r.brokerId()) - 1));
                break;
            }
            case INTRA_BROKER_REPLICA_ACTION: {
                this._inProgressIntraBrokerReplicaMovementsByBrokerId.put(task.brokerId(), this._inProgressIntraBrokerReplicaMovementsByBrokerId.get(task.brokerId()) - 1);
                break;
            }
        }
    }

    public synchronized int numRemainingInterBrokerPartitionMovements() {
        return this._executionTaskTracker.numRemainingInterBrokerPartitionMovements();
    }

    public synchronized long remainingInterBrokerDataToMoveInMB() {
        return this._executionTaskTracker.remainingInterBrokerDataToMoveInMB();
    }

    public synchronized int numFinishedInterBrokerPartitionMovements() {
        return this._executionTaskTracker.numFinishedInterBrokerPartitionMovements();
    }

    public synchronized long finishedInterBrokerDataMovementInMB() {
        return this._executionTaskTracker.finishedInterBrokerDataMovementInMB();
    }

    public synchronized Set<ExecutionTask> inExecutionTasks() {
        return this.inExecutionTasks(ExecutionTask.TaskType.cachedValues());
    }

    public synchronized Set<ExecutionTask> inExecutionTasks(Collection<ExecutionTask.TaskType> types) {
        return this._executionTaskTracker.inExecutionTasks(types);
    }

    public synchronized long inExecutionInterBrokerDataToMoveInMB() {
        return this._executionTaskTracker.inExecutionInterBrokerDataMovementInMB();
    }

    public synchronized int numRemainingLeadershipMovements() {
        return this._executionTaskTracker.numRemainingLeadershipMovements();
    }

    public synchronized int numFinishedLeadershipMovements() {
        return this._executionTaskTracker.numFinishedLeadershipMovements();
    }

    public synchronized int numRemainingIntraBrokerPartitionMovements() {
        return this._executionTaskTracker.numRemainingIntraBrokerPartitionMovements();
    }

    public synchronized long remainingIntraBrokerDataToMoveInMB() {
        return this._executionTaskTracker.remainingIntraBrokerDataToMoveInMB();
    }

    public synchronized int numFinishedIntraBrokerPartitionMovements() {
        return this._executionTaskTracker.numFinishedIntraBrokerPartitionMovements();
    }

    public synchronized long finishedIntraBrokerDataToMoveInMB() {
        return this._executionTaskTracker.finishedIntraBrokerDataToMoveInMB();
    }

    public long inExecutionIntraBrokerDataMovementInMB() {
        return this._executionTaskTracker.inExecutionIntraBrokerDataMovementInMB();
    }

    public synchronized void clear() {
        this._brokersToSkipConcurrencyCheck.clear();
        this._inProgressInterBrokerReplicaMovementsByBrokerId.clear();
        this._inProgressIntraBrokerReplicaMovementsByBrokerId.clear();
        this._inProgressPartitionsForInterBrokerMovement.clear();
        this._executionTaskPlanner.clear();
        this._executionTaskTracker.clear();
    }

    public synchronized void setStopRequested() {
        this._executionTaskTracker.setStopRequested();
    }

    public synchronized ExecutionTaskTracker.ExecutionTasksSummary getExecutionTasksSummary(Set<ExecutionTask.TaskType> taskTypesToGetFullList) {
        return this._executionTaskTracker.getExecutionTasksSummary(taskTypesToGetFullList);
    }
}

