package com.linkedin.kafka.cruisecontrol.executor;

import com.codahale.metrics.MetricRegistry;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorManager;
import com.linkedin.kafka.cruisecontrol.detector.notifier.SlackSelfHealingNotifier;
import com.linkedin.kafka.cruisecontrol.exception.OngoingExecutionException;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTask;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskTracker;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import com.linkedin.kafka.cruisecontrol.executor.strategy.ReplicaMovementStrategy;
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler;
import com.linkedin.kafka.cruisecontrol.servlet.UserTaskManager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/Executor.class */
public class Executor {
    private static final Logger LOG = LoggerFactory.getLogger(Executor.class);
    private static final Logger OPERATION_LOG = LoggerFactory.getLogger(KafkaCruiseControlUtils.OPERATION_LOGGER);
    private static final String ZK_EXECUTOR_METRIC_GROUP = "CruiseControlExecutor";
    private static final String ZK_EXECUTOR_METRIC_TYPE = "Executor";
    private final ExecutionTaskManager _executionTaskManager;
    private final MetadataClient _metadataClient;
    private final long _defaultExecutionProgressCheckIntervalMs;
    private Long _requestedExecutionProgressCheckIntervalMs;
    private final ExecutorService _proposalExecutor;
    private final KafkaZkClient _kafkaZkClient;
    private final AdminClient _adminClient;
    private final double _leaderMovementTimeoutMs;
    private static final int NO_STOP_EXECUTION = 0;
    private static final int STOP_EXECUTION = 1;
    private static final int FORCE_STOP_EXECUTION = 2;
    private final AtomicInteger _stopSignal;
    private final Time _time;
    private volatile boolean _hasOngoingExecution;
    private volatile ExecutorState _executorState;
    private volatile String _uuid;
    private volatile Supplier<String> _reasonSupplier;
    private final ExecutorNotifier _executorNotifier;
    private final AtomicInteger _numExecutionStopped;
    private final AtomicInteger _numExecutionStoppedByUser;
    private final AtomicBoolean _executionStoppedByUser;
    private final AtomicBoolean _ongoingExecutionIsBeingModified;
    private final AtomicInteger _numExecutionStartedInKafkaAssignerMode;
    private final AtomicInteger _numExecutionStartedInNonKafkaAssignerMode;
    private volatile boolean _isKafkaAssignerMode;
    private volatile boolean _skipInterBrokerReplicaConcurrencyAdjustment;
    private final long _demotionHistoryRetentionTimeMs;
    private final long _removalHistoryRetentionTimeMs;
    private final ConcurrentMap<Integer, Long> _latestDemoteStartTimeMsByBrokerId;
    private final ConcurrentMap<Integer, Long> _latestRemoveStartTimeMsByBrokerId;
    private final ScheduledExecutorService _executionHistoryScannerExecutor;
    private UserTaskManager _userTaskManager;
    private final AnomalyDetectorManager _anomalyDetectorManager;
    private final ConcurrencyAdjuster _concurrencyAdjuster;
    private final ScheduledExecutorService _concurrencyAdjusterExecutor;
    private final ConcurrentMap<ConcurrencyType, Boolean> _concurrencyAdjusterEnabled;
    private final long _minExecutionProgressCheckIntervalMs;
    public final long _slowTaskAlertingBackoffTimeMs;
    private final KafkaCruiseControlConfig _config;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/Executor$ConcurrencyAdjuster.class */
    public class ConcurrencyAdjuster implements Runnable {
        private LoadMonitor _loadMonitor = null;

        ConcurrencyAdjuster() {
        }

        public synchronized void initAdjustment(LoadMonitor loadMonitor, Integer num, Integer num2) {
            this._loadMonitor = loadMonitor;
            Executor.this.setRequestedInterBrokerPartitionMovementConcurrency(num);
            Executor.this.setRequestedLeadershipMovementConcurrency(num2);
        }

        private boolean canRefreshConcurrency(ConcurrencyType concurrencyType) {
            if (!Executor.this._concurrencyAdjusterEnabled.get(concurrencyType).booleanValue() || this._loadMonitor == null) {
                return false;
            }
            switch (concurrencyType) {
                case LEADERSHIP:
                    return Executor.this._executorState.state() == ExecutorState.State.LEADER_MOVEMENT_TASK_IN_PROGRESS;
                case INTER_BROKER_REPLICA:
                    return Executor.this._executorState.state() == ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS && !Executor.this._skipInterBrokerReplicaConcurrencyAdjustment;
                default:
                    throw new IllegalArgumentException("Unsupported concurrency type " + concurrencyType + " is provided.");
            }
        }

        private synchronized void refreshInterBrokerReplicaConcurrency() {
            Integer recommendedConcurrency;
            if (!canRefreshConcurrency(ConcurrencyType.INTER_BROKER_REPLICA) || (recommendedConcurrency = ExecutionUtils.recommendedConcurrency(this._loadMonitor.currentBrokerMetricValues(), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), ConcurrencyType.INTER_BROKER_REPLICA)) == null) {
                return;
            }
            Executor.this.setRequestedInterBrokerPartitionMovementConcurrency(recommendedConcurrency);
        }

        private synchronized void refreshLeadershipConcurrency() {
            Integer recommendedConcurrency;
            if (!canRefreshConcurrency(ConcurrencyType.LEADERSHIP) || (recommendedConcurrency = ExecutionUtils.recommendedConcurrency(this._loadMonitor.currentBrokerMetricValues(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), ConcurrencyType.LEADERSHIP)) == null) {
                return;
            }
            Executor.this.setRequestedLeadershipMovementConcurrency(recommendedConcurrency);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                refreshInterBrokerReplicaConcurrency();
                refreshLeadershipConcurrency();
            } catch (Throwable th) {
                Executor.LOG.warn("Received exception when trying to adjust reassignment concurrency.", th);
            }
        }
    }

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/Executor$ExecutionHistoryScanner.class */
    private class ExecutionHistoryScanner implements Runnable {
        private ExecutionHistoryScanner() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Executor.this.removeExpiredDemotionHistory();
                Executor.this.removeExpiredRemovalHistory();
            } catch (Throwable th) {
                Executor.LOG.warn("Received exception when trying to expire execution history.", th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/Executor$ProposalExecutionRunnable.class */
    public class ProposalExecutionRunnable implements Runnable {
        private final LoadMonitor _loadMonitor;
        private final Set<Integer> _recentlyDemotedBrokers;
        private final Set<Integer> _recentlyRemovedBrokers;
        private final Long _replicationThrottle;
        private Throwable _executionException = null;
        private final boolean _isTriggeredByUserRequest;
        private long _lastSlowTaskReportingTimeMs;
        private static final boolean FORCE_PAUSE_SAMPLING = true;

        ProposalExecutionRunnable(LoadMonitor loadMonitor, Collection<Integer> collection, Collection<Integer> collection2, Long l, boolean z) {
            this._loadMonitor = loadMonitor;
            if (z && Executor.this._userTaskManager == null) {
                Executor.this.processExecuteProposalsFailure();
                Executor.this._hasOngoingExecution = false;
                Executor.this._stopSignal.set(0);
                Executor.this._executionStoppedByUser.set(false);
                Executor.LOG.error("Failed to initialize proposal execution.");
                throw new IllegalStateException("User task manager cannot be null.");
            }
            if (collection != null) {
                collection.forEach(num -> {
                    Long l2 = Executor.this._latestDemoteStartTimeMsByBrokerId.get(num);
                    if (l2 == null || l2.longValue() != 0) {
                        Executor.this._latestDemoteStartTimeMsByBrokerId.put(num, Long.valueOf(Executor.this._time.milliseconds()));
                    }
                });
            }
            if (collection2 != null) {
                collection2.forEach(num2 -> {
                    Long l2 = Executor.this._latestRemoveStartTimeMsByBrokerId.get(num2);
                    if (l2 == null || l2.longValue() != 0) {
                        Executor.this._latestRemoveStartTimeMsByBrokerId.put(num2, Long.valueOf(Executor.this._time.milliseconds()));
                    }
                });
            }
            this._recentlyDemotedBrokers = Executor.this.recentlyDemotedBrokers();
            this._recentlyRemovedBrokers = Executor.this.recentlyRemovedBrokers();
            this._replicationThrottle = l;
            this._isTriggeredByUserRequest = z;
            this._lastSlowTaskReportingTimeMs = -1L;
        }

        @Override // java.lang.Runnable
        public void run() {
            Executor.LOG.info("Starting executing balancing proposals.");
            execute(initExecution());
            Executor.LOG.info("Execution finished.");
        }

        private UserTaskManager.UserTaskInfo initExecution() {
            UserTaskManager.UserTaskInfo userTaskInfo = null;
            if (this._isTriggeredByUserRequest) {
                userTaskInfo = Executor.this._userTaskManager.markTaskExecutionBegan(Executor.this._uuid);
            }
            String str = Executor.this._reasonSupplier.get();
            Executor.this._executorState = ExecutorState.executionStarting(Executor.this._uuid, str, this._recentlyDemotedBrokers, this._recentlyRemovedBrokers, this._isTriggeredByUserRequest);
            Executor.OPERATION_LOG.info("Task [{}] execution starts. The reason of execution is {}.", Executor.this._uuid, str);
            Executor.this._ongoingExecutionIsBeingModified.set(false);
            return userTaskInfo;
        }

        private void adjustSamplingModeBeforeExecution() throws InterruptedException {
            while (this._loadMonitor.samplingMode() != MetricSampler.SamplingMode.BROKER_METRICS_ONLY) {
                try {
                    this._loadMonitor.pauseMetricSampling(String.format("Paused-By-Cruise-Control-Before-Starting-Execution (Date: %s)", KafkaCruiseControlUtils.currentUtcDate()), true);
                    this._loadMonitor.setSamplingMode(MetricSampler.SamplingMode.BROKER_METRICS_ONLY);
                    break;
                } catch (IllegalStateException e) {
                    Thread.sleep(Executor.this.executionProgressCheckIntervalMs());
                    Executor.LOG.debug("Waiting for the load monitor to be ready to adjust sampling mode.", e);
                }
            }
            this._loadMonitor.resumeMetricSampling(String.format("Resumed-By-Cruise-Control-Before-Starting-Execution (Date: %s)", KafkaCruiseControlUtils.currentUtcDate()));
        }

        private void execute(UserTaskManager.UserTaskInfo userTaskInfo) {
            try {
                adjustSamplingModeBeforeExecution();
                if (Executor.this._executorState.state() == ExecutorState.State.STARTING_EXECUTION) {
                    Executor.this._executorState = ExecutorState.operationInProgress(ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS, Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION)), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), Executor.this._uuid, Executor.this._reasonSupplier.get(), this._recentlyDemotedBrokers, this._recentlyRemovedBrokers, this._isTriggeredByUserRequest);
                    interBrokerMoveReplicas();
                    updateOngoingExecutionState();
                }
                if (Executor.this._executorState.state() == ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS) {
                    Executor.this._executorState = ExecutorState.operationInProgress(ExecutorState.State.INTRA_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS, Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION)), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), Executor.this._uuid, Executor.this._reasonSupplier.get(), this._recentlyDemotedBrokers, this._recentlyRemovedBrokers, this._isTriggeredByUserRequest);
                    intraBrokerMoveReplicas();
                    updateOngoingExecutionState();
                }
                boolean z = false;
                if (Executor.this._executorState.state() == ExecutorState.State.INTRA_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS) {
                    Executor.this._executorState = ExecutorState.operationInProgress(ExecutorState.State.LEADER_MOVEMENT_TASK_IN_PROGRESS, Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.LEADER_ACTION)), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), Executor.this._uuid, Executor.this._reasonSupplier.get(), this._recentlyDemotedBrokers, this._recentlyRemovedBrokers, this._isTriggeredByUserRequest);
                    z = moveLeaderships();
                    updateOngoingExecutionState();
                }
                if (Executor.this._executorState.state() == ExecutorState.State.STOPPING_EXECUTION && Executor.this._stopSignal.get() == 2 && z) {
                    ExecutionUtils.deleteZNodesToForceStopLeadershipMoves(Executor.this._kafkaZkClient);
                }
            } catch (Throwable th) {
                Executor.LOG.error("Executor got exception during execution", th);
                this._executionException = th;
            } finally {
                notifyFinishedTask(userTaskInfo);
                clearCompletedExecution();
            }
        }

        private void notifyFinishedTask(UserTaskManager.UserTaskInfo userTaskInfo) {
            if (userTaskInfo != null) {
                Executor.this._userTaskManager.markTaskExecutionFinished(Executor.this._uuid, Executor.this._executorState.state() == ExecutorState.State.STOPPING_EXECUTION || this._executionException != null);
            } else {
                Executor.this._anomalyDetectorManager.markSelfHealingFinished(Executor.this._uuid);
            }
            Object[] objArr = new Object[2];
            objArr[0] = Executor.this._uuid;
            objArr[1] = userTaskInfo != null ? "user" + userTaskInfo.requestUrl() : "self-healing";
            String format = String.format("Task [%s] %s execution is ", objArr);
            if (Executor.this._executorState.state() == ExecutorState.State.STOPPING_EXECUTION) {
                Object[] objArr2 = new Object[2];
                objArr2[0] = format;
                objArr2[1] = Executor.this._executionStoppedByUser.get() ? "user" : SlackSelfHealingNotifier.DEFAULT_SLACK_SELF_HEALING_NOTIFIER_USER;
                notifyExecutionFinished(String.format("%sstopped by %s.", objArr2), true);
                return;
            }
            if (this._executionException != null) {
                notifyExecutionFinished(String.format("%sinterrupted with exception %s.", format, this._executionException.getMessage()), true);
            } else {
                notifyExecutionFinished(String.format("%sfinished.", format), false);
            }
        }

        private void notifyExecutionFinished(String str, boolean z) {
            if (z) {
                Executor.this._executorNotifier.sendAlert(str);
                Executor.OPERATION_LOG.warn(str);
            } else {
                Executor.this._executorNotifier.sendNotification(str);
                Executor.OPERATION_LOG.info(str);
            }
        }

        private void clearCompletedExecution() {
            Executor.this._executionTaskManager.clear();
            Executor.this._uuid = null;
            Executor.this._reasonSupplier = null;
            Executor.this._executorState = ExecutorState.noTaskInProgress(this._recentlyDemotedBrokers, this._recentlyRemovedBrokers);
            Executor.this._hasOngoingExecution = false;
            Executor.this._stopSignal.set(0);
            Executor.this._executionStoppedByUser.set(false);
            this._loadMonitor.setSamplingMode(MetricSampler.SamplingMode.ALL);
        }

        private void updateOngoingExecutionState() {
            if (Executor.this._stopSignal.get() != 0) {
                Executor.this._executorState = ExecutorState.operationInProgress(ExecutorState.State.STOPPING_EXECUTION, Executor.this._executionTaskManager.getExecutionTasksSummary(new HashSet(ExecutionTask.TaskType.cachedValues())), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), Executor.this._uuid, Executor.this._reasonSupplier.get(), this._recentlyDemotedBrokers, this._recentlyRemovedBrokers, this._isTriggeredByUserRequest);
                return;
            }
            switch (Executor.this._executorState.state()) {
                case LEADER_MOVEMENT_TASK_IN_PROGRESS:
                    Executor.this._executorState = ExecutorState.operationInProgress(ExecutorState.State.LEADER_MOVEMENT_TASK_IN_PROGRESS, Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.LEADER_ACTION)), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), Executor.this._uuid, Executor.this._reasonSupplier.get(), this._recentlyDemotedBrokers, this._recentlyRemovedBrokers, this._isTriggeredByUserRequest);
                    return;
                case INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS:
                    Executor.this._executorState = ExecutorState.operationInProgress(ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS, Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION)), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), Executor.this._uuid, Executor.this._reasonSupplier.get(), this._recentlyDemotedBrokers, this._recentlyRemovedBrokers, this._isTriggeredByUserRequest);
                    return;
                case INTRA_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS:
                    Executor.this._executorState = ExecutorState.operationInProgress(ExecutorState.State.INTRA_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS, Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION)), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), Executor.this._uuid, Executor.this._reasonSupplier.get(), this._recentlyDemotedBrokers, this._recentlyRemovedBrokers, this._isTriggeredByUserRequest);
                    return;
                default:
                    throw new IllegalStateException("Unexpected ongoing execution state " + Executor.this._executorState.state());
            }
        }

        private void interBrokerMoveReplicas() throws InterruptedException, ExecutionException, TimeoutException {
            ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(Executor.this._kafkaZkClient, this._replicationThrottle);
            int numRemainingInterBrokerPartitionMovements = Executor.this._executionTaskManager.numRemainingInterBrokerPartitionMovements();
            long remainingInterBrokerDataToMoveInMB = Executor.this._executionTaskManager.remainingInterBrokerDataToMoveInMB();
            Executor.LOG.info("Starting {} inter-broker partition movements.", Integer.valueOf(numRemainingInterBrokerPartitionMovements));
            int i = numRemainingInterBrokerPartitionMovements;
            while (true) {
                if ((i > 0 || !Executor.this.inExecutionTasks().isEmpty()) && Executor.this._stopSignal.get() == 0) {
                    List<ExecutionTask> interBrokerReplicaMovementTasks = Executor.this._executionTaskManager.getInterBrokerReplicaMovementTasks();
                    Executor.LOG.info("Executor will execute {} task(s)", Integer.valueOf(interBrokerReplicaMovementTasks.size()));
                    AlterPartitionReassignmentsResult alterPartitionReassignmentsResult = null;
                    if (!interBrokerReplicaMovementTasks.isEmpty()) {
                        replicationThrottleHelper.setThrottles((List) interBrokerReplicaMovementTasks.stream().map((v0) -> {
                            return v0.proposal();
                        }).collect(Collectors.toList()));
                        Executor.this._executionTaskManager.markTasksInProgress(interBrokerReplicaMovementTasks);
                        alterPartitionReassignmentsResult = ExecutionUtils.submitReplicaReassignmentTasks(Executor.this._adminClient, interBrokerReplicaMovementTasks);
                    }
                    List<ExecutionTask> waitForInterBrokerReplicaTasksToFinish = waitForInterBrokerReplicaTasksToFinish(alterPartitionReassignmentsResult);
                    i = Executor.this._executionTaskManager.numRemainingInterBrokerPartitionMovements();
                    int numFinishedInterBrokerPartitionMovements = Executor.this._executionTaskManager.numFinishedInterBrokerPartitionMovements();
                    long finishedInterBrokerDataMovementInMB = Executor.this._executionTaskManager.finishedInterBrokerDataMovementInMB();
                    Logger logger = Executor.LOG;
                    Object[] objArr = new Object[6];
                    objArr[0] = Integer.valueOf(numFinishedInterBrokerPartitionMovements);
                    objArr[1] = Integer.valueOf(numRemainingInterBrokerPartitionMovements);
                    objArr[2] = String.format("%.2f", Double.valueOf((numFinishedInterBrokerPartitionMovements * 100.0d) / numRemainingInterBrokerPartitionMovements));
                    objArr[3] = Long.valueOf(finishedInterBrokerDataMovementInMB);
                    objArr[4] = Long.valueOf(remainingInterBrokerDataToMoveInMB);
                    objArr[5] = remainingInterBrokerDataToMoveInMB == 0 ? 100 : String.format("%.2f", Double.valueOf((finishedInterBrokerDataMovementInMB * 100.0d) / remainingInterBrokerDataToMoveInMB));
                    logger.info("{}/{} ({}%) inter-broker partition movements completed. {}/{} ({}%) MB have been moved.", objArr);
                    replicationThrottleHelper.clearThrottles(waitForInterBrokerReplicaTasksToFinish, (List) interBrokerReplicaMovementTasks.stream().filter(executionTask -> {
                        return executionTask.state() == ExecutionTaskState.IN_PROGRESS;
                    }).collect(Collectors.toList()));
                }
            }
            if (Executor.this._stopSignal.get() == 0) {
                Executor.LOG.info("Inter-broker partition movements finished.");
                return;
            }
            ExecutionTaskTracker.ExecutionTasksSummary executionTasksSummary = Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.emptySet());
            Map<ExecutionTaskState, Integer> map = executionTasksSummary.taskStat().get(ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION);
            Executor.LOG.info("Inter-broker partition movements stopped. For inter-broker partition movements {} tasks cancelled, {} tasks in-progress, {} tasks aborting, {} tasks aborted, {} tasks dead, {} tasks completed, {} remaining data to move; for intra-broker partition movement {} tasks cancelled; for leadership movements {} task cancelled.", new Object[]{map.get(ExecutionTaskState.PENDING), map.get(ExecutionTaskState.IN_PROGRESS), map.get(ExecutionTaskState.ABORTING), map.get(ExecutionTaskState.ABORTED), map.get(ExecutionTaskState.DEAD), map.get(ExecutionTaskState.COMPLETED), Long.valueOf(executionTasksSummary.remainingInterBrokerDataToMoveInMB()), executionTasksSummary.taskStat().get(ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION).get(ExecutionTaskState.PENDING), executionTasksSummary.taskStat().get(ExecutionTask.TaskType.LEADER_ACTION).get(ExecutionTaskState.PENDING)});
        }

        private void intraBrokerMoveReplicas() {
            int numRemainingIntraBrokerPartitionMovements = Executor.this._executionTaskManager.numRemainingIntraBrokerPartitionMovements();
            long remainingIntraBrokerDataToMoveInMB = Executor.this._executionTaskManager.remainingIntraBrokerDataToMoveInMB();
            Executor.LOG.info("Starting {} intra-broker partition movements.", Integer.valueOf(numRemainingIntraBrokerPartitionMovements));
            int i = numRemainingIntraBrokerPartitionMovements;
            while (true) {
                if ((i > 0 || !Executor.this.inExecutionTasks().isEmpty()) && Executor.this._stopSignal.get() == 0) {
                    List<ExecutionTask> intraBrokerReplicaMovementTasks = Executor.this._executionTaskManager.getIntraBrokerReplicaMovementTasks();
                    Executor.LOG.info("Executor will execute {} task(s)", Integer.valueOf(intraBrokerReplicaMovementTasks.size()));
                    if (!intraBrokerReplicaMovementTasks.isEmpty()) {
                        Executor.this._executionTaskManager.markTasksInProgress(intraBrokerReplicaMovementTasks);
                        ExecutorAdminUtils.executeIntraBrokerReplicaMovements(intraBrokerReplicaMovementTasks, Executor.this._adminClient, Executor.this._executionTaskManager, Executor.this._config);
                    }
                    waitForIntraBrokerReplicaTasksToFinish();
                    i = Executor.this._executionTaskManager.numRemainingIntraBrokerPartitionMovements();
                    int numFinishedIntraBrokerPartitionMovements = Executor.this._executionTaskManager.numFinishedIntraBrokerPartitionMovements();
                    long finishedIntraBrokerDataToMoveInMB = Executor.this._executionTaskManager.finishedIntraBrokerDataToMoveInMB();
                    Logger logger = Executor.LOG;
                    Object[] objArr = new Object[6];
                    objArr[0] = Integer.valueOf(numFinishedIntraBrokerPartitionMovements);
                    objArr[1] = Integer.valueOf(numRemainingIntraBrokerPartitionMovements);
                    objArr[2] = String.format("%.2f", Double.valueOf((numFinishedIntraBrokerPartitionMovements * 100.0d) / numRemainingIntraBrokerPartitionMovements));
                    objArr[3] = Long.valueOf(finishedIntraBrokerDataToMoveInMB);
                    objArr[4] = Long.valueOf(remainingIntraBrokerDataToMoveInMB);
                    objArr[5] = remainingIntraBrokerDataToMoveInMB == 0 ? 100 : String.format("%.2f", Double.valueOf((finishedIntraBrokerDataToMoveInMB * 100.0d) / remainingIntraBrokerDataToMoveInMB));
                    logger.info("{}/{} ({}%) intra-broker partition movements completed. {}/{} ({}%) MB have been moved.", objArr);
                }
            }
            Set<ExecutionTask> inExecutionTasks = Executor.this.inExecutionTasks();
            while (true) {
                Set<ExecutionTask> set = inExecutionTasks;
                if (set.isEmpty()) {
                    break;
                }
                Executor.LOG.info("Waiting for {} tasks moving {} MB to finish", Integer.valueOf(set.size()), Long.valueOf(Executor.this._executionTaskManager.inExecutionIntraBrokerDataMovementInMB()));
                waitForIntraBrokerReplicaTasksToFinish();
                inExecutionTasks = Executor.this.inExecutionTasks();
            }
            if (Executor.this.inExecutionTasks().isEmpty()) {
                Executor.LOG.info("Intra-broker partition movements finished.");
            } else if (Executor.this._stopSignal.get() != 0) {
                ExecutionTaskTracker.ExecutionTasksSummary executionTasksSummary = Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.emptySet());
                Map<ExecutionTaskState, Integer> map = executionTasksSummary.taskStat().get(ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION);
                Executor.LOG.info("Intra-broker partition movements stopped. For intra-broker partition movements {} tasks cancelled, {} tasks in-progress, {} tasks aborting, {} tasks aborted, {} tasks dead, {} tasks completed, {} remaining data to move; for leadership movements {} task cancelled.", new Object[]{map.get(ExecutionTaskState.PENDING), map.get(ExecutionTaskState.IN_PROGRESS), map.get(ExecutionTaskState.ABORTING), map.get(ExecutionTaskState.ABORTED), map.get(ExecutionTaskState.DEAD), map.get(ExecutionTaskState.COMPLETED), Long.valueOf(executionTasksSummary.remainingIntraBrokerDataToMoveInMB()), executionTasksSummary.taskStat().get(ExecutionTask.TaskType.LEADER_ACTION).get(ExecutionTaskState.PENDING)});
            }
        }

        private boolean moveLeaderships() {
            int numRemainingLeadershipMovements = Executor.this._executionTaskManager.numRemainingLeadershipMovements();
            Executor.LOG.info("Starting {} leadership movements.", Integer.valueOf(numRemainingLeadershipMovements));
            int i = 0;
            while (Executor.this._executionTaskManager.numRemainingLeadershipMovements() != 0 && Executor.this._stopSignal.get() == 0) {
                updateOngoingExecutionState();
                i += moveLeadershipInBatch();
                Executor.LOG.info("{}/{} ({}%) leadership movements completed.", new Object[]{Integer.valueOf(i), Integer.valueOf(numRemainingLeadershipMovements), Integer.valueOf((i * 100) / numRemainingLeadershipMovements)});
            }
            if (Executor.this.inExecutionTasks().isEmpty()) {
                Executor.LOG.info("Leadership movements finished.");
            } else if (Executor.this._stopSignal.get() != 0) {
                Map<ExecutionTaskState, Integer> map = Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.emptySet()).taskStat().get(ExecutionTask.TaskType.LEADER_ACTION);
                Executor.LOG.info("Leadership movements stopped. {} tasks cancelled, {} tasks in-progress, {} tasks aborting, {} tasks aborted, {} tasks dead, {} tasks completed.", new Object[]{map.get(ExecutionTaskState.PENDING), map.get(ExecutionTaskState.IN_PROGRESS), map.get(ExecutionTaskState.ABORTING), map.get(ExecutionTaskState.ABORTED), map.get(ExecutionTaskState.DEAD), map.get(ExecutionTaskState.COMPLETED)});
            }
            return Executor.this._stopSignal.get() == 2;
        }

        private int moveLeadershipInBatch() {
            List<ExecutionTask> leadershipMovementTasks = Executor.this._executionTaskManager.getLeadershipMovementTasks();
            int size = leadershipMovementTasks.size();
            Executor.LOG.debug("Executing {} leadership movements in a batch.", Integer.valueOf(size));
            if (!leadershipMovementTasks.isEmpty() && Executor.this._stopSignal.get() == 0) {
                while (Executor.this.hasOngoingLeaderElection()) {
                    try {
                        Executor.LOG.error("Waiting for Kafka Controller to delete /admin/preferred_replica_election zNode. Are other admin tools triggering a PLE?");
                        Thread.sleep(Executor.this.executionProgressCheckIntervalMs());
                    } catch (InterruptedException e) {
                        Executor.LOG.warn("Interrupted while waiting for Kafka Controller to delete /admin/preferred_replica_election zNode.");
                    }
                }
                Executor.this._executionTaskManager.markTasksInProgress(leadershipMovementTasks);
                ExecutorUtils.executePreferredLeaderElection(Executor.this._kafkaZkClient, leadershipMovementTasks);
                Executor.LOG.trace("Waiting for leadership movement batch to finish.");
                while (!Executor.this.inExecutionTasks().isEmpty() && Executor.this._stopSignal.get() == 0) {
                    waitForLeadershipTasksToFinish();
                }
            }
            return size;
        }

        private Cluster getClusterForExecutionProgressCheck() {
            try {
                Thread.sleep(Executor.this.executionProgressCheckIntervalMs());
            } catch (InterruptedException e) {
            }
            if (Executor.LOG.isDebugEnabled()) {
                Executor.LOG.debug("Tasks in execution: {}", Executor.this.inExecutionTasks());
            }
            return Executor.this._metadataClient.refreshMetadata().cluster();
        }

        private List<ExecutionTask> waitForInterBrokerReplicaTasksToFinish(AlterPartitionReassignmentsResult alterPartitionReassignmentsResult) throws InterruptedException, ExecutionException, TimeoutException {
            boolean z;
            ArrayList arrayList = new ArrayList();
            HashSet hashSet = new HashSet();
            HashSet hashSet2 = new HashSet();
            HashSet hashSet3 = new HashSet();
            HashSet hashSet4 = new HashSet();
            HashSet hashSet5 = new HashSet();
            HashSet hashSet6 = new HashSet(0);
            ExecutionUtils.processAlterPartitionReassignmentsResult(alterPartitionReassignmentsResult, hashSet4, hashSet5, hashSet6);
            if (!hashSet6.isEmpty()) {
                throw new IllegalStateException(String.format("Attempt to cancel reassignment of partitions %s during regular execution.", hashSet6));
            }
            do {
                Cluster clusterForExecutionProgressCheck = getClusterForExecutionProgressCheck();
                ArrayList arrayList2 = new ArrayList();
                ArrayList arrayList3 = new ArrayList();
                ArrayList arrayList4 = new ArrayList();
                boolean z2 = Executor.this._time.milliseconds() - this._lastSlowTaskReportingTimeMs > Executor.this._slowTaskAlertingBackoffTimeMs;
                for (ExecutionTask executionTask : Executor.this.inExecutionTasks()) {
                    TopicPartition topicPartition = executionTask.proposal().topicPartition();
                    if (Executor.this._stopSignal.get() != 0) {
                        Executor.LOG.debug("Task {} is marked as dead to stop the execution with a rollback.", executionTask);
                        arrayList.add(executionTask);
                        hashSet.add(Long.valueOf(executionTask.executionId()));
                        Executor.this._executionTaskManager.markTaskDead(executionTask);
                        arrayList3.add(executionTask);
                    } else if (clusterForExecutionProgressCheck.partition(topicPartition) == null || hashSet4.contains(topicPartition)) {
                        handleProgressWithTopicDeletion(executionTask, arrayList, hashSet2);
                    } else if (ExecutionUtils.isInterBrokerReplicaActionDone(clusterForExecutionProgressCheck, executionTask)) {
                        handleProgressWithCompletion(executionTask, arrayList);
                    } else {
                        if (z2) {
                            executionTask.maybeReportExecutionTooSlow(Executor.this._time.milliseconds(), arrayList4);
                        }
                        if (maybeMarkTaskAsDead(clusterForExecutionProgressCheck, null, executionTask, hashSet5)) {
                            hashSet3.add(Long.valueOf(executionTask.executionId()));
                            arrayList2.add(executionTask);
                            arrayList.add(executionTask);
                        }
                    }
                }
                sendSlowExecutionAlert(arrayList4);
                handleDeadInterBrokerReplicaTasks(arrayList2, arrayList3);
                updateOngoingExecutionState();
                z = !Executor.this.inExecutionTasks().isEmpty() && arrayList.isEmpty();
                if (z) {
                    maybeReexecuteInterBrokerReplicaTasks(hashSet4, hashSet5);
                }
            } while (z);
            Logger logger = Executor.LOG;
            Object[] objArr = new Object[4];
            objArr[0] = arrayList;
            objArr[1] = hashSet.isEmpty() ? "" : String.format(". [Stopped: %s]", hashSet);
            objArr[2] = hashSet2.isEmpty() ? "" : String.format(". [Deleted: %s]", hashSet2);
            objArr[3] = hashSet3.isEmpty() ? "" : String.format(". [Dead: %s]", hashSet3);
            logger.info("Finished tasks: {}.{}{}{}", objArr);
            return arrayList;
        }

        private void handleProgressWithTopicDeletion(ExecutionTask executionTask, List<ExecutionTask> list, Set<Long> set) {
            Executor.LOG.debug("Task {} is marked as finished because the topic has been deleted.", executionTask);
            list.add(executionTask);
            set.add(Long.valueOf(executionTask.executionId()));
            Executor.this._executionTaskManager.markTaskAborting(executionTask);
            Executor.this._executionTaskManager.markTaskDone(executionTask);
        }

        private void handleProgressWithCompletion(ExecutionTask executionTask, List<ExecutionTask> list) {
            list.add(executionTask);
            Executor.this._executionTaskManager.markTaskDone(executionTask);
        }

        private void waitForLeadershipTasksToFinish() {
            ArrayList arrayList = new ArrayList();
            HashSet hashSet = new HashSet();
            HashSet hashSet2 = new HashSet();
            HashSet hashSet3 = new HashSet();
            do {
                maybeReexecuteLeadershipTasks();
                Cluster clusterForExecutionProgressCheck = getClusterForExecutionProgressCheck();
                ArrayList arrayList2 = new ArrayList();
                boolean z = Executor.this._time.milliseconds() - this._lastSlowTaskReportingTimeMs > Executor.this._slowTaskAlertingBackoffTimeMs;
                for (ExecutionTask executionTask : Executor.this.inExecutionTasks()) {
                    TopicPartition topicPartition = executionTask.proposal().topicPartition();
                    if (Executor.this._stopSignal.get() == 2) {
                        Executor.LOG.debug("Task {} is marked as dead to force-stop the execution with a controller bounce.", executionTask);
                        arrayList.add(executionTask);
                        hashSet.add(Long.valueOf(executionTask.executionId()));
                        Executor.this._executionTaskManager.markTaskDead(executionTask);
                    } else if (clusterForExecutionProgressCheck.partition(topicPartition) == null) {
                        handleProgressWithTopicDeletion(executionTask, arrayList, hashSet2);
                    } else if (ExecutionUtils.isLeadershipMovementDone(clusterForExecutionProgressCheck, executionTask)) {
                        handleProgressWithCompletion(executionTask, arrayList);
                    } else {
                        if (z) {
                            executionTask.maybeReportExecutionTooSlow(Executor.this._time.milliseconds(), arrayList2);
                        }
                        if (maybeMarkTaskAsDead(clusterForExecutionProgressCheck, null, executionTask, null)) {
                            hashSet3.add(Long.valueOf(executionTask.executionId()));
                            arrayList.add(executionTask);
                        }
                    }
                }
                sendSlowExecutionAlert(arrayList2);
                updateOngoingExecutionState();
                if (Executor.this.inExecutionTasks().isEmpty()) {
                    break;
                }
            } while (arrayList.isEmpty());
            Logger logger = Executor.LOG;
            Object[] objArr = new Object[4];
            objArr[0] = arrayList;
            objArr[1] = hashSet.isEmpty() ? "" : String.format(". [Force-Stopped: %s]", hashSet);
            objArr[2] = hashSet2.isEmpty() ? "" : String.format(". [Deleted: %s]", hashSet2);
            objArr[3] = hashSet3.isEmpty() ? "" : String.format(". [Dead: %s]", hashSet3);
            logger.info("Finished tasks: {}.{}{}{}", objArr);
        }

        private void waitForIntraBrokerReplicaTasksToFinish() {
            ArrayList arrayList = new ArrayList();
            HashSet hashSet = new HashSet();
            HashSet hashSet2 = new HashSet();
            do {
                maybeReexecuteIntraBrokerReplicaTasks();
                Cluster clusterForExecutionProgressCheck = getClusterForExecutionProgressCheck();
                Map<ExecutionTask, DescribeReplicaLogDirsResult.ReplicaLogDirInfo> logdirInfoForExecutionTask = ExecutorAdminUtils.getLogdirInfoForExecutionTask(Executor.this._executionTaskManager.inExecutionTasks(Collections.singleton(ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION)), Executor.this._adminClient, Executor.this._config);
                ArrayList arrayList2 = new ArrayList();
                boolean z = Executor.this._time.milliseconds() - this._lastSlowTaskReportingTimeMs > Executor.this._slowTaskAlertingBackoffTimeMs;
                for (ExecutionTask executionTask : Executor.this.inExecutionTasks()) {
                    if (clusterForExecutionProgressCheck.partition(executionTask.proposal().topicPartition()) == null) {
                        handleProgressWithTopicDeletion(executionTask, arrayList, hashSet);
                    } else if (ExecutionUtils.isIntraBrokerReplicaActionDone(logdirInfoForExecutionTask, executionTask)) {
                        handleProgressWithCompletion(executionTask, arrayList);
                    } else {
                        if (z) {
                            executionTask.maybeReportExecutionTooSlow(Executor.this._time.milliseconds(), arrayList2);
                        }
                        if (maybeMarkTaskAsDead(clusterForExecutionProgressCheck, logdirInfoForExecutionTask, executionTask, null)) {
                            hashSet2.add(Long.valueOf(executionTask.executionId()));
                            arrayList.add(executionTask);
                        }
                    }
                }
                sendSlowExecutionAlert(arrayList2);
                updateOngoingExecutionState();
                if (Executor.this.inExecutionTasks().isEmpty()) {
                    break;
                }
            } while (arrayList.isEmpty());
            Logger logger = Executor.LOG;
            Object[] objArr = new Object[3];
            objArr[0] = arrayList;
            objArr[1] = hashSet.isEmpty() ? "" : String.format(". [Deleted: %s]", hashSet);
            objArr[2] = hashSet2.isEmpty() ? "" : String.format(". [Dead: %s]", hashSet2);
            logger.info("Finished tasks: {}.{}{}", objArr);
        }

        private void handleDeadInterBrokerReplicaTasks(List<ExecutionTask> list, List<ExecutionTask> list2) throws InterruptedException, ExecutionException, TimeoutException {
            ArrayList arrayList = new ArrayList(list);
            arrayList.addAll(list2);
            if (arrayList.isEmpty()) {
                return;
            }
            arrayList.stream().filter(executionTask -> {
                return executionTask.state() != ExecutionTaskState.DEAD;
            }).forEach(executionTask2 -> {
                throw new IllegalArgumentException(String.format("Unexpected state for task %s (expected: %s).", executionTask2, ExecutionTaskState.DEAD));
            });
            AlterPartitionReassignmentsResult submitReplicaReassignmentTasks = ExecutionUtils.submitReplicaReassignmentTasks(Executor.this._adminClient, arrayList);
            HashSet hashSet = new HashSet();
            HashSet hashSet2 = new HashSet();
            HashSet hashSet3 = new HashSet();
            ExecutionUtils.processAlterPartitionReassignmentsResult(submitReplicaReassignmentTasks, hashSet, hashSet2, hashSet3);
            Executor.LOG.debug("Handling dead inter-broker replica tasks {} (deleted: {} dead: {} noReassignmentToCancel: {})", new Object[]{arrayList, hashSet, hashSet2, hashSet3});
            if (Executor.this._stopSignal.get() == 0) {
                Executor.LOG.info("Stop the execution due to {} dead tasks: {}.", Integer.valueOf(arrayList.size()), arrayList);
                Executor.this.stopExecution(false);
            }
            if (!list.isEmpty()) {
                return;
            }
            Set set = (Set) arrayList.stream().map(executionTask3 -> {
                return executionTask3.proposal().topicPartition();
            }).collect(Collectors.toSet());
            set.removeAll(hashSet);
            set.removeAll(hashSet2);
            set.removeAll(hashSet3);
            while (true) {
                HashSet hashSet4 = new HashSet(ExecutionUtils.partitionsBeingReassigned(Executor.this._adminClient));
                hashSet4.retainAll(set);
                if (hashSet4.isEmpty()) {
                    return;
                }
                try {
                    Executor.LOG.info("Waiting for the rollback of ongoing inter-broker replica reassignments for {}.", hashSet4);
                    Thread.sleep(Executor.this.executionProgressCheckIntervalMs());
                } catch (InterruptedException e) {
                }
            }
        }

        private void sendSlowExecutionAlert(List<ExecutionTask> list) {
            if (list.isEmpty()) {
                return;
            }
            StringBuilder sb = new StringBuilder();
            sb.append("Slow tasks are detected:\n");
            for (ExecutionTask executionTask : list) {
                sb.append(String.format("\tID: %s\tstart_time:%s\tdetail:%s%n", Long.valueOf(executionTask.executionId()), KafkaCruiseControlUtils.toDateString(executionTask.startTimeMs(), KafkaCruiseControlUtils.DATE_FORMAT, KafkaCruiseControlUtils.TIME_ZONE), executionTask));
            }
            Executor.this._executorNotifier.sendAlert(sb.toString());
            this._lastSlowTaskReportingTimeMs = Executor.this._time.milliseconds();
        }

        private boolean maybeMarkTaskAsDead(Cluster cluster, Map<ExecutionTask, DescribeReplicaLogDirsResult.ReplicaLogDirInfo> map, ExecutionTask executionTask, Set<TopicPartition> set) {
            if (executionTask.state() != ExecutionTaskState.IN_PROGRESS && executionTask.state() != ExecutionTaskState.ABORTING) {
                return false;
            }
            switch (executionTask.type()) {
                case LEADER_ACTION:
                    if (cluster.nodeById(executionTask.proposal().newLeader().brokerId().intValue()) == null) {
                        Executor.this._executionTaskManager.markTaskDead(executionTask);
                        Executor.LOG.warn("Killing execution for task {} because the target leader is down.", executionTask);
                        return true;
                    }
                    if (Executor.this._time.milliseconds() <= executionTask.startTimeMs() + Executor.this._leaderMovementTimeoutMs) {
                        return false;
                    }
                    Executor.this._executionTaskManager.markTaskDead(executionTask);
                    Executor.LOG.warn("Killing execution for task {} because it took longer than {} to finish.", executionTask, Double.valueOf(Executor.this._leaderMovementTimeoutMs));
                    return true;
                case INTER_BROKER_REPLICA_ACTION:
                    for (ReplicaPlacementInfo replicaPlacementInfo : executionTask.proposal().newReplicas()) {
                        if (cluster.nodeById(replicaPlacementInfo.brokerId().intValue()) == null || set.contains(executionTask.proposal().topicPartition())) {
                            Executor.this._executionTaskManager.markTaskDead(executionTask);
                            Executor.LOG.warn("Killing execution for task {} because the new replica {} is down.", executionTask, replicaPlacementInfo);
                            return true;
                        }
                    }
                    return false;
                case INTRA_BROKER_REPLICA_ACTION:
                    if (map.containsKey(executionTask)) {
                        return false;
                    }
                    Executor.this._executionTaskManager.markTaskDead(executionTask);
                    Executor.LOG.warn("Killing execution for task {} because the destination disk is down.", executionTask);
                    return true;
                default:
                    throw new IllegalStateException("Unknown task type " + executionTask.type());
            }
        }

        private void maybeReexecuteInterBrokerReplicaTasks(Set<TopicPartition> set, Set<TopicPartition> set2) {
            ArrayList arrayList = new ArrayList(Executor.this._executionTaskManager.inExecutionTasks(Collections.singleton(ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION)));
            boolean z = false;
            try {
                z = !ExecutionUtils.isSubset(ExecutionUtils.partitionsBeingReassigned(Executor.this._adminClient), arrayList);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                Executor.LOG.warn("Failed to retrieve partitions being reassigned. Skipping reexecution check for inter-broker replica actions.", e);
            }
            if (z) {
                Executor.LOG.info("Reexecuting tasks {}", arrayList);
                AlterPartitionReassignmentsResult submitReplicaReassignmentTasks = ExecutionUtils.submitReplicaReassignmentTasks(Executor.this._adminClient, arrayList);
                HashSet hashSet = new HashSet();
                ExecutionUtils.processAlterPartitionReassignmentsResult(submitReplicaReassignmentTasks, set, set2, hashSet);
                if (!hashSet.isEmpty()) {
                    throw new IllegalStateException(String.format("Attempt to cancel reassignment of partitions %s during re-execution.", hashSet));
                }
            }
        }

        private void maybeReexecuteIntraBrokerReplicaTasks() {
            ArrayList arrayList = new ArrayList(Executor.this._executionTaskManager.inExecutionTasks(Collections.singleton(ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION)));
            ExecutorAdminUtils.getLogdirInfoForExecutionTask(arrayList, Executor.this._adminClient, Executor.this._config).forEach((executionTask, replicaLogDirInfo) -> {
                String logdir = executionTask.proposal().replicasToMoveBetweenDisksByBroker().get(Integer.valueOf(executionTask.brokerId())).logdir();
                if (logdir.equals(replicaLogDirInfo.getCurrentReplicaLogDir()) || logdir.equals(replicaLogDirInfo.getFutureReplicaLogDir())) {
                    arrayList.remove(executionTask);
                }
            });
            if (arrayList.isEmpty()) {
                return;
            }
            Executor.LOG.info("Reexecuting tasks {}", arrayList);
            ExecutorAdminUtils.executeIntraBrokerReplicaMovements(arrayList, Executor.this._adminClient, Executor.this._executionTaskManager, Executor.this._config);
        }

        private void maybeReexecuteLeadershipTasks() {
            if (Executor.this.hasOngoingLeaderElection()) {
                return;
            }
            ArrayList arrayList = new ArrayList(Executor.this._executionTaskManager.inExecutionTasks(Collections.singleton(ExecutionTask.TaskType.LEADER_ACTION)));
            if (arrayList.isEmpty()) {
                return;
            }
            Executor.LOG.info("Reexecuting tasks {}", arrayList);
            ExecutorUtils.executePreferredLeaderElection(Executor.this._kafkaZkClient, arrayList);
        }
    }

    public Executor(KafkaCruiseControlConfig kafkaCruiseControlConfig, Time time, MetricRegistry metricRegistry, AnomalyDetectorManager anomalyDetectorManager) {
        this(kafkaCruiseControlConfig, time, metricRegistry, null, null, anomalyDetectorManager);
    }

    Executor(KafkaCruiseControlConfig kafkaCruiseControlConfig, Time time, MetricRegistry metricRegistry, MetadataClient metadataClient, ExecutorNotifier executorNotifier, AnomalyDetectorManager anomalyDetectorManager) {
        String string = kafkaCruiseControlConfig.getString(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG);
        this._numExecutionStopped = new AtomicInteger(0);
        this._numExecutionStoppedByUser = new AtomicInteger(0);
        this._executionStoppedByUser = new AtomicBoolean(false);
        this._ongoingExecutionIsBeingModified = new AtomicBoolean(false);
        this._numExecutionStartedInKafkaAssignerMode = new AtomicInteger(0);
        this._numExecutionStartedInNonKafkaAssignerMode = new AtomicInteger(0);
        this._isKafkaAssignerMode = false;
        this._skipInterBrokerReplicaConcurrencyAdjustment = false;
        ExecutionUtils.init(kafkaCruiseControlConfig);
        this._config = kafkaCruiseControlConfig;
        this._time = time;
        this._kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(string, ZK_EXECUTOR_METRIC_GROUP, ZK_EXECUTOR_METRIC_TYPE, kafkaCruiseControlConfig.getBoolean(ExecutorConfig.ZOOKEEPER_SECURITY_ENABLED_CONFIG).booleanValue());
        this._adminClient = KafkaCruiseControlUtils.createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(kafkaCruiseControlConfig));
        this._executionTaskManager = new ExecutionTaskManager(this._adminClient, metricRegistry, time, kafkaCruiseControlConfig);
        registerGaugeSensors(metricRegistry);
        this._metadataClient = metadataClient != null ? metadataClient : new MetadataClient(kafkaCruiseControlConfig, new Metadata(100L, ExecutionUtils.METADATA_EXPIRY_MS, new LogContext(), new ClusterResourceListeners()), -1L, time);
        this._defaultExecutionProgressCheckIntervalMs = kafkaCruiseControlConfig.getLong(ExecutorConfig.EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG).longValue();
        this._leaderMovementTimeoutMs = kafkaCruiseControlConfig.getLong(ExecutorConfig.LEADER_MOVEMENT_TIMEOUT_MS_CONFIG).longValue();
        this._requestedExecutionProgressCheckIntervalMs = null;
        this._proposalExecutor = Executors.newSingleThreadExecutor(new KafkaCruiseControlThreadFactory("ProposalExecutor", false, LOG));
        this._latestDemoteStartTimeMsByBrokerId = new ConcurrentHashMap();
        this._latestRemoveStartTimeMsByBrokerId = new ConcurrentHashMap();
        this._executorState = ExecutorState.noTaskInProgress(recentlyDemotedBrokers(), recentlyRemovedBrokers());
        this._stopSignal = new AtomicInteger(0);
        this._hasOngoingExecution = false;
        this._uuid = null;
        this._reasonSupplier = null;
        this._executorNotifier = executorNotifier != null ? executorNotifier : (ExecutorNotifier) kafkaCruiseControlConfig.getConfiguredInstance(ExecutorConfig.EXECUTOR_NOTIFIER_CLASS_CONFIG, ExecutorNotifier.class);
        this._userTaskManager = null;
        if (anomalyDetectorManager == null) {
            throw new IllegalStateException("Anomaly detector manager cannot be null.");
        }
        this._anomalyDetectorManager = anomalyDetectorManager;
        this._demotionHistoryRetentionTimeMs = kafkaCruiseControlConfig.getLong(ExecutorConfig.DEMOTION_HISTORY_RETENTION_TIME_MS_CONFIG).longValue();
        this._removalHistoryRetentionTimeMs = kafkaCruiseControlConfig.getLong(ExecutorConfig.REMOVAL_HISTORY_RETENTION_TIME_MS_CONFIG).longValue();
        this._minExecutionProgressCheckIntervalMs = kafkaCruiseControlConfig.getLong(ExecutorConfig.MIN_EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG).longValue();
        this._slowTaskAlertingBackoffTimeMs = kafkaCruiseControlConfig.getLong(ExecutorConfig.SLOW_TASK_ALERTING_BACKOFF_TIME_MS_CONFIG).longValue();
        this._concurrencyAdjusterEnabled = new ConcurrentHashMap(ConcurrencyType.cachedValues().size());
        boolean booleanValue = kafkaCruiseControlConfig.getBoolean(ExecutorConfig.CONCURRENCY_ADJUSTER_ENABLED_CONFIG).booleanValue();
        this._concurrencyAdjusterEnabled.put(ConcurrencyType.INTER_BROKER_REPLICA, Boolean.valueOf(booleanValue || kafkaCruiseControlConfig.getBoolean(ExecutorConfig.CONCURRENCY_ADJUSTER_INTER_BROKER_REPLICA_ENABLED_CONFIG).booleanValue()));
        this._concurrencyAdjusterEnabled.put(ConcurrencyType.LEADERSHIP, Boolean.valueOf(booleanValue || kafkaCruiseControlConfig.getBoolean(ExecutorConfig.CONCURRENCY_ADJUSTER_LEADERSHIP_ENABLED_CONFIG).booleanValue()));
        this._concurrencyAdjusterEnabled.put(ConcurrencyType.INTRA_BROKER_REPLICA, false);
        this._concurrencyAdjusterExecutor = Executors.newSingleThreadScheduledExecutor(new KafkaCruiseControlThreadFactory(ConcurrencyAdjuster.class.getSimpleName(), true, null));
        long longValue = kafkaCruiseControlConfig.getLong(ExecutorConfig.CONCURRENCY_ADJUSTER_INTERVAL_MS_CONFIG).longValue();
        this._concurrencyAdjuster = new ConcurrencyAdjuster();
        this._concurrencyAdjusterExecutor.scheduleAtFixedRate(this._concurrencyAdjuster, longValue, longValue, TimeUnit.MILLISECONDS);
        this._executionHistoryScannerExecutor = Executors.newSingleThreadScheduledExecutor(new KafkaCruiseControlThreadFactory(ExecutionHistoryScanner.class.getSimpleName(), true, null));
        this._executionHistoryScannerExecutor.scheduleAtFixedRate(new ExecutionHistoryScanner(), 0L, 5L, TimeUnit.SECONDS);
    }

    public Set<ExecutionTask> inExecutionTasks() {
        return this._executionTaskManager.inExecutionTasks();
    }

    public synchronized void setRequestedExecutionProgressCheckIntervalMs(Long l) {
        if (l != null && l.longValue() < this._minExecutionProgressCheckIntervalMs) {
            throw new IllegalArgumentException("Attempt to set execution progress check interval [" + l + "ms] to smaller than the minimum execution progress check interval in cluster [" + this._minExecutionProgressCheckIntervalMs + "ms].");
        }
        this._requestedExecutionProgressCheckIntervalMs = l;
    }

    public long executionProgressCheckIntervalMs() {
        return this._requestedExecutionProgressCheckIntervalMs == null ? this._defaultExecutionProgressCheckIntervalMs : this._requestedExecutionProgressCheckIntervalMs.longValue();
    }

    private void registerGaugeSensors(MetricRegistry metricRegistry) {
        metricRegistry.register(MetricRegistry.name(ZK_EXECUTOR_METRIC_TYPE, new String[]{"execution-stopped"}), this::numExecutionStopped);
        metricRegistry.register(MetricRegistry.name(ZK_EXECUTOR_METRIC_TYPE, new String[]{ExecutionUtils.GAUGE_EXECUTION_STOPPED_BY_USER}), this::numExecutionStoppedByUser);
        metricRegistry.register(MetricRegistry.name(ZK_EXECUTOR_METRIC_TYPE, new String[]{ExecutionUtils.GAUGE_EXECUTION_STARTED_IN_KAFKA_ASSIGNER_MODE}), this::numExecutionStartedInKafkaAssignerMode);
        metricRegistry.register(MetricRegistry.name(ZK_EXECUTOR_METRIC_TYPE, new String[]{ExecutionUtils.GAUGE_EXECUTION_STARTED_IN_NON_KAFKA_ASSIGNER_MODE}), this::numExecutionStartedInNonKafkaAssignerMode);
        String name = MetricRegistry.name(ZK_EXECUTOR_METRIC_TYPE, new String[]{ExecutionUtils.GAUGE_EXECUTION_INTER_BROKER_PARTITION_MOVEMENTS_PER_BROKER_CAP});
        ExecutionTaskManager executionTaskManager = this._executionTaskManager;
        Objects.requireNonNull(executionTaskManager);
        metricRegistry.register(name, executionTaskManager::interBrokerPartitionMovementConcurrency);
        String name2 = MetricRegistry.name(ZK_EXECUTOR_METRIC_TYPE, new String[]{ExecutionUtils.GAUGE_EXECUTION_INTRA_BROKER_PARTITION_MOVEMENTS_PER_BROKER_CAP});
        ExecutionTaskManager executionTaskManager2 = this._executionTaskManager;
        Objects.requireNonNull(executionTaskManager2);
        metricRegistry.register(name2, executionTaskManager2::intraBrokerPartitionMovementConcurrency);
        String name3 = MetricRegistry.name(ZK_EXECUTOR_METRIC_TYPE, new String[]{ExecutionUtils.GAUGE_EXECUTION_LEADERSHIP_MOVEMENTS_GLOBAL_CAP});
        ExecutionTaskManager executionTaskManager3 = this._executionTaskManager;
        Objects.requireNonNull(executionTaskManager3);
        metricRegistry.register(name3, executionTaskManager3::leadershipMovementConcurrency);
    }

    private void removeExpiredDemotionHistory() {
        LOG.debug("Remove expired demotion history");
        this._latestDemoteStartTimeMsByBrokerId.entrySet().removeIf(entry -> {
            long longValue = ((Long) entry.getValue()).longValue();
            return longValue != 0 && longValue + this._demotionHistoryRetentionTimeMs < this._time.milliseconds();
        });
    }

    private void removeExpiredRemovalHistory() {
        LOG.debug("Remove expired broker removal history");
        this._latestRemoveStartTimeMsByBrokerId.entrySet().removeIf(entry -> {
            long longValue = ((Long) entry.getValue()).longValue();
            return longValue != 0 && longValue + this._removalHistoryRetentionTimeMs < this._time.milliseconds();
        });
    }

    public Set<Integer> recentlyDemotedBrokers() {
        return Collections.unmodifiableSet(this._latestDemoteStartTimeMsByBrokerId.keySet());
    }

    public Set<Integer> recentlyRemovedBrokers() {
        return Collections.unmodifiableSet(this._latestRemoveStartTimeMsByBrokerId.keySet());
    }

    public boolean dropRecentlyRemovedBrokers(Set<Integer> set) {
        return this._latestRemoveStartTimeMsByBrokerId.entrySet().removeIf(entry -> {
            return set.contains(entry.getKey());
        });
    }

    public boolean dropRecentlyDemotedBrokers(Set<Integer> set) {
        return this._latestDemoteStartTimeMsByBrokerId.entrySet().removeIf(entry -> {
            return set.contains(entry.getKey());
        });
    }

    public void addRecentlyRemovedBrokers(Set<Integer> set) {
        set.forEach(num -> {
            this._latestRemoveStartTimeMsByBrokerId.put(num, 0L);
        });
    }

    public void addRecentlyDemotedBrokers(Set<Integer> set) {
        set.forEach(num -> {
            this._latestDemoteStartTimeMsByBrokerId.put(num, 0L);
        });
    }

    public ExecutorState state() {
        return this._executorState;
    }

    public Boolean setConcurrencyAdjusterFor(ConcurrencyType concurrencyType, boolean z) {
        if (concurrencyType == ConcurrencyType.INTER_BROKER_REPLICA || concurrencyType == ConcurrencyType.LEADERSHIP) {
            return this._concurrencyAdjusterEnabled.put(concurrencyType, Boolean.valueOf(z));
        }
        throw new IllegalArgumentException(String.format("Concurrency adjuster for %s is not yet supported.", concurrencyType));
    }

    public synchronized void executeProposals(Collection<ExecutionProposal> collection, Set<Integer> set, Set<Integer> set2, LoadMonitor loadMonitor, Integer num, Integer num2, Integer num3, Long l, ReplicaMovementStrategy replicaMovementStrategy, Long l2, boolean z, String str, boolean z2, boolean z3) throws OngoingExecutionException {
        setExecutionMode(z2);
        sanityCheckExecuteProposals(loadMonitor, str);
        this._skipInterBrokerReplicaConcurrencyAdjustment = z3;
        try {
            initProposalExecution(collection, set, num, num2, num3, l, replicaMovementStrategy, z, loadMonitor);
            startExecution(loadMonitor, null, set2, l2, z);
        } catch (Exception e) {
            processExecuteProposalsFailure();
            throw e;
        }
    }

    private void sanityCheckExecuteProposals(LoadMonitor loadMonitor, String str) throws OngoingExecutionException {
        if (this._hasOngoingExecution) {
            throw new OngoingExecutionException("Cannot execute new proposals while there is an ongoing execution.");
        }
        if (loadMonitor == null) {
            throw new IllegalArgumentException("Load monitor cannot be null.");
        }
        if (this._executorState.state() != ExecutorState.State.GENERATING_PROPOSALS_FOR_EXECUTION) {
            throw new IllegalStateException(String.format("Unexpected executor state %s. Initializing proposal execution requires generating proposals for execution.", this._executorState.state()));
        }
        if (str == null || !str.equals(this._uuid)) {
            throw new IllegalStateException(String.format("Attempt to initialize proposal execution with a UUID %s that differs from the UUID used for generating proposals for execution %s.", str, this._uuid));
        }
    }

    private synchronized void initProposalExecution(Collection<ExecutionProposal> collection, Collection<Integer> collection2, Integer num, Integer num2, Integer num3, Long l, ReplicaMovementStrategy replicaMovementStrategy, boolean z, LoadMonitor loadMonitor) {
        this._executorState = ExecutorState.initializeProposalExecution(this._uuid, this._reasonSupplier.get(), recentlyDemotedBrokers(), recentlyRemovedBrokers(), z);
        this._executionTaskManager.setExecutionModeForTaskTracker(this._isKafkaAssignerMode);
        this._executionTaskManager.addExecutionProposals(collection, collection2, this._metadataClient.refreshMetadata().cluster(), replicaMovementStrategy);
        this._concurrencyAdjuster.initAdjustment(loadMonitor, num, num3);
        setRequestedIntraBrokerPartitionMovementConcurrency(num2);
        setRequestedExecutionProgressCheckIntervalMs(l);
    }

    public synchronized void executeDemoteProposals(Collection<ExecutionProposal> collection, Collection<Integer> collection2, LoadMonitor loadMonitor, Integer num, Integer num2, Long l, ReplicaMovementStrategy replicaMovementStrategy, Long l2, boolean z, String str) throws OngoingExecutionException {
        setExecutionMode(false);
        sanityCheckExecuteProposals(loadMonitor, str);
        this._skipInterBrokerReplicaConcurrencyAdjustment = true;
        try {
            initProposalExecution(collection, collection2, num, 0, num2, l, replicaMovementStrategy, z, loadMonitor);
            startExecution(loadMonitor, collection2, null, l2, z);
        } catch (Exception e) {
            processExecuteProposalsFailure();
            throw e;
        }
    }

    public void setRequestedInterBrokerPartitionMovementConcurrency(Integer num) {
        this._executionTaskManager.setRequestedInterBrokerPartitionMovementConcurrency(num);
    }

    public void setRequestedIntraBrokerPartitionMovementConcurrency(Integer num) {
        this._executionTaskManager.setRequestedIntraBrokerPartitionMovementConcurrency(num);
    }

    public void setRequestedLeadershipMovementConcurrency(Integer num) {
        this._executionTaskManager.setRequestedLeadershipMovementConcurrency(num);
    }

    private void setExecutionMode(boolean z) {
        this._isKafkaAssignerMode = z;
    }

    public void setUserTaskManager(UserTaskManager userTaskManager) {
        this._userTaskManager = userTaskManager;
    }

    private int numExecutionStopped() {
        return this._numExecutionStopped.get();
    }

    private int numExecutionStoppedByUser() {
        return this._numExecutionStoppedByUser.get();
    }

    private int numExecutionStartedInKafkaAssignerMode() {
        return this._numExecutionStartedInKafkaAssignerMode.get();
    }

    private int numExecutionStartedInNonKafkaAssignerMode() {
        return this._numExecutionStartedInNonKafkaAssignerMode.get();
    }

    private void startExecution(LoadMonitor loadMonitor, Collection<Integer> collection, Collection<Integer> collection2, Long l, boolean z) throws OngoingExecutionException {
        this._executionStoppedByUser.set(false);
        sanityCheckOngoingMovement();
        this._hasOngoingExecution = true;
        this._anomalyDetectorManager.maybeClearOngoingAnomalyDetectionTimeMs();
        this._anomalyDetectorManager.resetHasUnfixableGoals();
        this._stopSignal.set(0);
        this._executionStoppedByUser.set(false);
        if (this._isKafkaAssignerMode) {
            this._numExecutionStartedInKafkaAssignerMode.incrementAndGet();
        } else {
            this._numExecutionStartedInNonKafkaAssignerMode.incrementAndGet();
        }
        this._proposalExecutor.submit(new ProposalExecutionRunnable(loadMonitor, collection, collection2, l, z));
    }

    private void sanityCheckOngoingMovement() throws OngoingExecutionException {
        try {
            if (hasOngoingPartitionReassignments()) {
                throw new OngoingExecutionException("There are ongoing inter-broker partition movements.");
            }
            try {
                if (ExecutorAdminUtils.hasOngoingIntraBrokerReplicaMovement((Collection) this._metadataClient.cluster().nodes().stream().mapToInt((v0) -> {
                    return v0.id();
                }).boxed().collect(Collectors.toSet()), this._adminClient, this._config)) {
                    throw new OngoingExecutionException("There are ongoing intra-broker partition movements.");
                }
                if (hasOngoingLeaderElection()) {
                    throw new OngoingExecutionException("There are ongoing leadership movements.");
                }
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                throw new IllegalStateException("Failed to retrieve if there are already ongoing intra-broker replica reassignments.", e);
            }
        } catch (InterruptedException | ExecutionException | TimeoutException e2) {
            throw new IllegalStateException("Failed to retrieve if there are already ongoing partition reassignments.", e2);
        }
    }

    private void processExecuteProposalsFailure() {
        this._executionTaskManager.clear();
        this._uuid = null;
        this._reasonSupplier = null;
        this._executorState = ExecutorState.noTaskInProgress(recentlyDemotedBrokers(), recentlyRemovedBrokers());
    }

    public synchronized void setGeneratingProposalsForExecution(String str, Supplier<String> supplier, boolean z) throws OngoingExecutionException {
        ExecutorState.State state = this._executorState.state();
        if (state != ExecutorState.State.NO_TASK_IN_PROGRESS) {
            throw new OngoingExecutionException(String.format("Cannot generate proposals while the executor is in %s state.", state));
        }
        if (str == null) {
            throw new IllegalArgumentException("UUID of the execution cannot be null.");
        }
        if (supplier == null) {
            throw new IllegalArgumentException("Reason supplier cannot be null.");
        }
        this._uuid = str;
        this._reasonSupplier = supplier;
        this._executorState = ExecutorState.generatingProposalsForExecution(this._uuid, this._reasonSupplier.get(), recentlyDemotedBrokers(), recentlyRemovedBrokers(), z);
    }

    public synchronized void failGeneratingProposalsForExecution(String str) {
        if (this._executorState.state() == ExecutorState.State.GENERATING_PROPOSALS_FOR_EXECUTION) {
            if (str == null || !str.equals(this._uuid)) {
                LOG.warn("UUID mismatch in attempt to report failure to generate proposals (received: {} expected: {})", str, this._uuid);
                return;
            }
            LOG.info("Failed to generate proposals for execution (UUID: {} reason: {}).", str, this._reasonSupplier.get());
            this._uuid = null;
            this._reasonSupplier = null;
            this._executorState = ExecutorState.noTaskInProgress(recentlyDemotedBrokers(), recentlyRemovedBrokers());
        }
    }

    public synchronized void userTriggeredStopExecution(boolean z) {
        if (stopExecution(z)) {
            LOG.info("User requested to stop the ongoing proposal execution" + (z ? " forcefully." : "."));
            this._numExecutionStoppedByUser.incrementAndGet();
            this._executionStoppedByUser.set(true);
        }
    }

    private synchronized boolean stopExecution(boolean z) {
        if (!(z && (this._stopSignal.compareAndSet(0, 2) || this._stopSignal.compareAndSet(1, 2))) && (z || !this._stopSignal.compareAndSet(0, 1))) {
            return false;
        }
        this._numExecutionStopped.incrementAndGet();
        this._executionTaskManager.setStopRequested();
        return true;
    }

    public synchronized void shutdown() {
        LOG.info("Shutting down executor.");
        if (this._hasOngoingExecution) {
            LOG.warn("Shutdown executor may take long because execution is still in progress.");
        }
        this._proposalExecutor.shutdown();
        try {
            this._proposalExecutor.awaitTermination(ExecutionUtils.METADATA_EXPIRY_MS, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for anomaly detector to shutdown.");
        }
        this._metadataClient.close();
        KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(this._kafkaZkClient);
        KafkaCruiseControlUtils.closeAdminClientWithTimeout(this._adminClient);
        this._executionHistoryScannerExecutor.shutdownNow();
        this._concurrencyAdjusterExecutor.shutdownNow();
        LOG.info("Executor shutdown completed.");
    }

    public boolean modifyOngoingExecution(boolean z) {
        return this._ongoingExecutionIsBeingModified.compareAndSet(!z, z);
    }

    public boolean hasOngoingExecution() {
        return this._hasOngoingExecution;
    }

    public boolean hasOngoingPartitionReassignments() throws InterruptedException, ExecutionException, TimeoutException {
        return !ExecutionUtils.partitionsBeingReassigned(this._adminClient).isEmpty();
    }

    public boolean hasOngoingLeaderElection() {
        return !ExecutorUtils.ongoingLeaderElection(this._kafkaZkClient).isEmpty();
    }
}
