/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.executor;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorManager;
import com.linkedin.kafka.cruisecontrol.exception.OngoingExecutionException;
import com.linkedin.kafka.cruisecontrol.executor.ConcurrencyType;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionProposal;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTask;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskManager;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskState;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskTracker;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionUtils;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorAdminUtils;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorNotifier;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorUtils;
import com.linkedin.kafka.cruisecontrol.executor.ReplicationThrottleHelper;
import com.linkedin.kafka.cruisecontrol.executor.strategy.ReplicaMovementStrategy;
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler;
import com.linkedin.kafka.cruisecontrol.servlet.UserTaskManager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Executor {
    private static final Logger LOG = LoggerFactory.getLogger(Executor.class);
    private static final Logger OPERATION_LOG = LoggerFactory.getLogger((String)"operationLogger");
    private static final String ZK_EXECUTOR_METRIC_GROUP = "CruiseControlExecutor";
    private static final String ZK_EXECUTOR_METRIC_TYPE = "Executor";
    private final ExecutionTaskManager _executionTaskManager;
    private final MetadataClient _metadataClient;
    private final long _defaultExecutionProgressCheckIntervalMs;
    private Long _requestedExecutionProgressCheckIntervalMs;
    private final ExecutorService _proposalExecutor;
    private final KafkaZkClient _kafkaZkClient;
    private final AdminClient _adminClient;
    private final double _leaderMovementTimeoutMs;
    private static final int NO_STOP_EXECUTION = 0;
    private static final int STOP_EXECUTION = 1;
    private static final int FORCE_STOP_EXECUTION = 2;
    private final AtomicInteger _stopSignal;
    private final Time _time;
    private volatile boolean _hasOngoingExecution;
    private volatile ExecutorState _executorState;
    private volatile String _uuid;
    private volatile Supplier<String> _reasonSupplier;
    private final ExecutorNotifier _executorNotifier;
    private final AtomicInteger _numExecutionStopped;
    private final AtomicInteger _numExecutionStoppedByUser;
    private final AtomicBoolean _executionStoppedByUser;
    private final AtomicBoolean _ongoingExecutionIsBeingModified;
    private final AtomicInteger _numExecutionStartedInKafkaAssignerMode;
    private final AtomicInteger _numExecutionStartedInNonKafkaAssignerMode;
    private volatile boolean _isKafkaAssignerMode;
    private volatile boolean _skipInterBrokerReplicaConcurrencyAdjustment;
    private final long _demotionHistoryRetentionTimeMs;
    private final long _removalHistoryRetentionTimeMs;
    private final ConcurrentMap<Integer, Long> _latestDemoteStartTimeMsByBrokerId;
    private final ConcurrentMap<Integer, Long> _latestRemoveStartTimeMsByBrokerId;
    private final ScheduledExecutorService _executionHistoryScannerExecutor;
    private UserTaskManager _userTaskManager;
    private final AnomalyDetectorManager _anomalyDetectorManager;
    private final ConcurrencyAdjuster _concurrencyAdjuster;
    private final ScheduledExecutorService _concurrencyAdjusterExecutor;
    private final ConcurrentMap<ConcurrencyType, Boolean> _concurrencyAdjusterEnabled;
    private final long _minExecutionProgressCheckIntervalMs;
    public final long _slowTaskAlertingBackoffTimeMs;
    private final KafkaCruiseControlConfig _config;

    public Executor(KafkaCruiseControlConfig config, Time time, MetricRegistry dropwizardMetricRegistry, AnomalyDetectorManager anomalyDetectorManager) {
        this(config, time, dropwizardMetricRegistry, null, null, anomalyDetectorManager);
    }

    Executor(KafkaCruiseControlConfig config, Time time, MetricRegistry dropwizardMetricRegistry, MetadataClient metadataClient, ExecutorNotifier executorNotifier, AnomalyDetectorManager anomalyDetectorManager) {
        String zkUrl = config.getString(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG);
        this._numExecutionStopped = new AtomicInteger(0);
        this._numExecutionStoppedByUser = new AtomicInteger(0);
        this._executionStoppedByUser = new AtomicBoolean(false);
        this._ongoingExecutionIsBeingModified = new AtomicBoolean(false);
        this._numExecutionStartedInKafkaAssignerMode = new AtomicInteger(0);
        this._numExecutionStartedInNonKafkaAssignerMode = new AtomicInteger(0);
        this._isKafkaAssignerMode = false;
        this._skipInterBrokerReplicaConcurrencyAdjustment = false;
        ExecutionUtils.init(config);
        this._config = config;
        this._time = time;
        boolean zkSecurityEnabled = config.getBoolean("zookeeper.security.enabled");
        this._kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zkUrl, ZK_EXECUTOR_METRIC_GROUP, ZK_EXECUTOR_METRIC_TYPE, zkSecurityEnabled);
        this._adminClient = KafkaCruiseControlUtils.createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(config));
        this._executionTaskManager = new ExecutionTaskManager(this._adminClient, dropwizardMetricRegistry, time, config);
        this.registerGaugeSensors(dropwizardMetricRegistry);
        this._metadataClient = metadataClient != null ? metadataClient : new MetadataClient(config, new Metadata(100L, Long.MAX_VALUE, new LogContext(), new ClusterResourceListeners()), -1L, time);
        this._defaultExecutionProgressCheckIntervalMs = config.getLong("execution.progress.check.interval.ms");
        this._leaderMovementTimeoutMs = config.getLong("leader.movement.timeout.ms").longValue();
        this._requestedExecutionProgressCheckIntervalMs = null;
        this._proposalExecutor = Executors.newSingleThreadExecutor(new KafkaCruiseControlThreadFactory("ProposalExecutor", false, LOG));
        this._latestDemoteStartTimeMsByBrokerId = new ConcurrentHashMap<Integer, Long>();
        this._latestRemoveStartTimeMsByBrokerId = new ConcurrentHashMap<Integer, Long>();
        this._executorState = ExecutorState.noTaskInProgress(this.recentlyDemotedBrokers(), this.recentlyRemovedBrokers());
        this._stopSignal = new AtomicInteger(0);
        this._hasOngoingExecution = false;
        this._uuid = null;
        this._reasonSupplier = null;
        this._executorNotifier = executorNotifier != null ? executorNotifier : config.getConfiguredInstance("executor.notifier.class", ExecutorNotifier.class);
        this._userTaskManager = null;
        if (anomalyDetectorManager == null) {
            throw new IllegalStateException("Anomaly detector manager cannot be null.");
        }
        this._anomalyDetectorManager = anomalyDetectorManager;
        this._demotionHistoryRetentionTimeMs = config.getLong("demotion.history.retention.time.ms");
        this._removalHistoryRetentionTimeMs = config.getLong("removal.history.retention.time.ms");
        this._minExecutionProgressCheckIntervalMs = config.getLong("min.execution.progress.check.interval.ms");
        this._slowTaskAlertingBackoffTimeMs = config.getLong("slow.task.alerting.backoff.ms");
        this._concurrencyAdjusterEnabled = new ConcurrentHashMap<ConcurrencyType, Boolean>(ConcurrencyType.cachedValues().size());
        boolean allEnabled = config.getBoolean("concurrency.adjuster.enabled");
        this._concurrencyAdjusterEnabled.put(ConcurrencyType.INTER_BROKER_REPLICA, allEnabled || config.getBoolean("concurrency.adjuster.inter.broker.replica.enabled") != false);
        this._concurrencyAdjusterEnabled.put(ConcurrencyType.LEADERSHIP, allEnabled || config.getBoolean("concurrency.adjuster.leadership.enabled") != false);
        this._concurrencyAdjusterEnabled.put(ConcurrencyType.INTRA_BROKER_REPLICA, false);
        this._concurrencyAdjusterExecutor = Executors.newSingleThreadScheduledExecutor(new KafkaCruiseControlThreadFactory(ConcurrencyAdjuster.class.getSimpleName(), true, null));
        long intervalMs = config.getLong("concurrency.adjuster.interval.ms");
        this._concurrencyAdjuster = new ConcurrencyAdjuster();
        this._concurrencyAdjusterExecutor.scheduleAtFixedRate(this._concurrencyAdjuster, intervalMs, intervalMs, TimeUnit.MILLISECONDS);
        this._executionHistoryScannerExecutor = Executors.newSingleThreadScheduledExecutor(new KafkaCruiseControlThreadFactory(ExecutionHistoryScanner.class.getSimpleName(), true, null));
        this._executionHistoryScannerExecutor.scheduleAtFixedRate(new ExecutionHistoryScanner(), 0L, 5L, TimeUnit.SECONDS);
    }

    public Set<ExecutionTask> inExecutionTasks() {
        return this._executionTaskManager.inExecutionTasks();
    }

    public synchronized void setRequestedExecutionProgressCheckIntervalMs(Long requestedExecutionProgressCheckIntervalMs) {
        if (requestedExecutionProgressCheckIntervalMs != null && requestedExecutionProgressCheckIntervalMs < this._minExecutionProgressCheckIntervalMs) {
            throw new IllegalArgumentException("Attempt to set execution progress check interval [" + requestedExecutionProgressCheckIntervalMs + "ms] to smaller than the minimum execution progress check interval in cluster [" + this._minExecutionProgressCheckIntervalMs + "ms].");
        }
        this._requestedExecutionProgressCheckIntervalMs = requestedExecutionProgressCheckIntervalMs;
    }

    public long executionProgressCheckIntervalMs() {
        return this._requestedExecutionProgressCheckIntervalMs == null ? this._defaultExecutionProgressCheckIntervalMs : this._requestedExecutionProgressCheckIntervalMs;
    }

    private void registerGaugeSensors(MetricRegistry dropwizardMetricRegistry) {
        String name = ZK_EXECUTOR_METRIC_TYPE;
        dropwizardMetricRegistry.register(MetricRegistry.name((String)name, (String[])new String[]{"execution-stopped"}), (Metric)((Gauge)this::numExecutionStopped));
        dropwizardMetricRegistry.register(MetricRegistry.name((String)name, (String[])new String[]{"execution-stopped-by-user"}), (Metric)((Gauge)this::numExecutionStoppedByUser));
        dropwizardMetricRegistry.register(MetricRegistry.name((String)name, (String[])new String[]{"execution-started-kafka_assigner"}), (Metric)((Gauge)this::numExecutionStartedInKafkaAssignerMode));
        dropwizardMetricRegistry.register(MetricRegistry.name((String)name, (String[])new String[]{"execution-started-non-kafka_assigner"}), (Metric)((Gauge)this::numExecutionStartedInNonKafkaAssignerMode));
        dropwizardMetricRegistry.register(MetricRegistry.name((String)name, (String[])new String[]{"inter-broker-partition-movements-per-broker-cap"}), (Metric)((Gauge)this._executionTaskManager::interBrokerPartitionMovementConcurrency));
        dropwizardMetricRegistry.register(MetricRegistry.name((String)name, (String[])new String[]{"intra-broker-partition-movements-per-broker-cap"}), (Metric)((Gauge)this._executionTaskManager::intraBrokerPartitionMovementConcurrency));
        dropwizardMetricRegistry.register(MetricRegistry.name((String)name, (String[])new String[]{"leadership-movements-global-cap"}), (Metric)((Gauge)this._executionTaskManager::leadershipMovementConcurrency));
    }

    private void removeExpiredDemotionHistory() {
        LOG.debug("Remove expired demotion history");
        this._latestDemoteStartTimeMsByBrokerId.entrySet().removeIf(entry -> {
            long startTime = (Long)entry.getValue();
            return startTime != 0L && startTime + this._demotionHistoryRetentionTimeMs < this._time.milliseconds();
        });
    }

    private void removeExpiredRemovalHistory() {
        LOG.debug("Remove expired broker removal history");
        this._latestRemoveStartTimeMsByBrokerId.entrySet().removeIf(entry -> {
            long startTime = (Long)entry.getValue();
            return startTime != 0L && startTime + this._removalHistoryRetentionTimeMs < this._time.milliseconds();
        });
    }

    public Set<Integer> recentlyDemotedBrokers() {
        return Collections.unmodifiableSet(this._latestDemoteStartTimeMsByBrokerId.keySet());
    }

    public Set<Integer> recentlyRemovedBrokers() {
        return Collections.unmodifiableSet(this._latestRemoveStartTimeMsByBrokerId.keySet());
    }

    public boolean dropRecentlyRemovedBrokers(Set<Integer> brokersToDrop) {
        return this._latestRemoveStartTimeMsByBrokerId.entrySet().removeIf(entry -> brokersToDrop.contains(entry.getKey()));
    }

    public boolean dropRecentlyDemotedBrokers(Set<Integer> brokersToDrop) {
        return this._latestDemoteStartTimeMsByBrokerId.entrySet().removeIf(entry -> brokersToDrop.contains(entry.getKey()));
    }

    public void addRecentlyRemovedBrokers(Set<Integer> brokersToAdd) {
        brokersToAdd.forEach(brokerId -> this._latestRemoveStartTimeMsByBrokerId.put((Integer)brokerId, 0L));
    }

    public void addRecentlyDemotedBrokers(Set<Integer> brokersToAdd) {
        brokersToAdd.forEach(brokerId -> this._latestDemoteStartTimeMsByBrokerId.put((Integer)brokerId, 0L));
    }

    public ExecutorState state() {
        return this._executorState;
    }

    public Boolean setConcurrencyAdjusterFor(ConcurrencyType concurrencyType, boolean isConcurrencyAdjusterEnabled) {
        if (concurrencyType != ConcurrencyType.INTER_BROKER_REPLICA && concurrencyType != ConcurrencyType.LEADERSHIP) {
            throw new IllegalArgumentException(String.format("Concurrency adjuster for %s is not yet supported.", new Object[]{concurrencyType}));
        }
        return this._concurrencyAdjusterEnabled.put(concurrencyType, isConcurrencyAdjusterEnabled);
    }

    public synchronized void executeProposals(Collection<ExecutionProposal> proposals, Set<Integer> unthrottledBrokers, Set<Integer> removedBrokers, LoadMonitor loadMonitor, Integer requestedInterBrokerPartitionMovementConcurrency, Integer requestedIntraBrokerPartitionMovementConcurrency, Integer requestedLeadershipMovementConcurrency, Long requestedExecutionProgressCheckIntervalMs, ReplicaMovementStrategy replicaMovementStrategy, Long replicationThrottle, boolean isTriggeredByUserRequest, String uuid, boolean isKafkaAssignerMode, boolean skipInterBrokerReplicaConcurrencyAdjustment) throws OngoingExecutionException {
        this.setExecutionMode(isKafkaAssignerMode);
        this.sanityCheckExecuteProposals(loadMonitor, uuid);
        this._skipInterBrokerReplicaConcurrencyAdjustment = skipInterBrokerReplicaConcurrencyAdjustment;
        try {
            this.initProposalExecution(proposals, unthrottledBrokers, requestedInterBrokerPartitionMovementConcurrency, requestedIntraBrokerPartitionMovementConcurrency, requestedLeadershipMovementConcurrency, requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy, isTriggeredByUserRequest, loadMonitor);
            this.startExecution(loadMonitor, null, removedBrokers, replicationThrottle, isTriggeredByUserRequest);
        }
        catch (Exception e) {
            this.processExecuteProposalsFailure();
            throw e;
        }
    }

    private void sanityCheckExecuteProposals(LoadMonitor loadMonitor, String uuid) throws OngoingExecutionException {
        if (this._hasOngoingExecution) {
            throw new OngoingExecutionException("Cannot execute new proposals while there is an ongoing execution.");
        }
        if (loadMonitor == null) {
            throw new IllegalArgumentException("Load monitor cannot be null.");
        }
        if (this._executorState.state() != ExecutorState.State.GENERATING_PROPOSALS_FOR_EXECUTION) {
            throw new IllegalStateException(String.format("Unexpected executor state %s. Initializing proposal execution requires generating proposals for execution.", new Object[]{this._executorState.state()}));
        }
        if (uuid == null || !uuid.equals(this._uuid)) {
            throw new IllegalStateException(String.format("Attempt to initialize proposal execution with a UUID %s that differs from the UUID used for generating proposals for execution %s.", uuid, this._uuid));
        }
    }

    private synchronized void initProposalExecution(Collection<ExecutionProposal> proposals, Collection<Integer> brokersToSkipConcurrencyCheck, Integer requestedInterBrokerPartitionMovementConcurrency, Integer requestedIntraBrokerPartitionMovementConcurrency, Integer requestedLeadershipMovementConcurrency, Long requestedExecutionProgressCheckIntervalMs, ReplicaMovementStrategy replicaMovementStrategy, boolean isTriggeredByUserRequest, LoadMonitor loadMonitor) {
        this._executorState = ExecutorState.initializeProposalExecution(this._uuid, this._reasonSupplier.get(), this.recentlyDemotedBrokers(), this.recentlyRemovedBrokers(), isTriggeredByUserRequest);
        this._executionTaskManager.setExecutionModeForTaskTracker(this._isKafkaAssignerMode);
        this._executionTaskManager.addExecutionProposals(proposals, brokersToSkipConcurrencyCheck, this._metadataClient.refreshMetadata().cluster(), replicaMovementStrategy);
        this._concurrencyAdjuster.initAdjustment(loadMonitor, requestedInterBrokerPartitionMovementConcurrency, requestedLeadershipMovementConcurrency);
        this.setRequestedIntraBrokerPartitionMovementConcurrency(requestedIntraBrokerPartitionMovementConcurrency);
        this.setRequestedExecutionProgressCheckIntervalMs(requestedExecutionProgressCheckIntervalMs);
    }

    public synchronized void executeDemoteProposals(Collection<ExecutionProposal> proposals, Collection<Integer> demotedBrokers, LoadMonitor loadMonitor, Integer concurrentSwaps, Integer requestedLeadershipMovementConcurrency, Long requestedExecutionProgressCheckIntervalMs, ReplicaMovementStrategy replicaMovementStrategy, Long replicationThrottle, boolean isTriggeredByUserRequest, String uuid) throws OngoingExecutionException {
        this.setExecutionMode(false);
        this.sanityCheckExecuteProposals(loadMonitor, uuid);
        this._skipInterBrokerReplicaConcurrencyAdjustment = true;
        try {
            this.initProposalExecution(proposals, demotedBrokers, concurrentSwaps, 0, requestedLeadershipMovementConcurrency, requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy, isTriggeredByUserRequest, loadMonitor);
            this.startExecution(loadMonitor, demotedBrokers, null, replicationThrottle, isTriggeredByUserRequest);
        }
        catch (Exception e) {
            this.processExecuteProposalsFailure();
            throw e;
        }
    }

    public void setRequestedInterBrokerPartitionMovementConcurrency(Integer requestedInterBrokerPartitionMovementConcurrency) {
        this._executionTaskManager.setRequestedInterBrokerPartitionMovementConcurrency(requestedInterBrokerPartitionMovementConcurrency);
    }

    public void setRequestedIntraBrokerPartitionMovementConcurrency(Integer requestedIntraBrokerPartitionMovementConcurrency) {
        this._executionTaskManager.setRequestedIntraBrokerPartitionMovementConcurrency(requestedIntraBrokerPartitionMovementConcurrency);
    }

    public void setRequestedLeadershipMovementConcurrency(Integer requestedLeadershipMovementConcurrency) {
        this._executionTaskManager.setRequestedLeadershipMovementConcurrency(requestedLeadershipMovementConcurrency);
    }

    private void setExecutionMode(boolean isKafkaAssignerMode) {
        this._isKafkaAssignerMode = isKafkaAssignerMode;
    }

    public void setUserTaskManager(UserTaskManager userTaskManager) {
        this._userTaskManager = userTaskManager;
    }

    private int numExecutionStopped() {
        return this._numExecutionStopped.get();
    }

    private int numExecutionStoppedByUser() {
        return this._numExecutionStoppedByUser.get();
    }

    private int numExecutionStartedInKafkaAssignerMode() {
        return this._numExecutionStartedInKafkaAssignerMode.get();
    }

    private int numExecutionStartedInNonKafkaAssignerMode() {
        return this._numExecutionStartedInNonKafkaAssignerMode.get();
    }

    private void startExecution(LoadMonitor loadMonitor, Collection<Integer> demotedBrokers, Collection<Integer> removedBrokers, Long replicationThrottle, boolean isTriggeredByUserRequest) throws OngoingExecutionException {
        this._executionStoppedByUser.set(false);
        this.sanityCheckOngoingMovement();
        this._hasOngoingExecution = true;
        this._anomalyDetectorManager.maybeClearOngoingAnomalyDetectionTimeMs();
        this._anomalyDetectorManager.resetHasUnfixableGoals();
        this._stopSignal.set(0);
        this._executionStoppedByUser.set(false);
        if (this._isKafkaAssignerMode) {
            this._numExecutionStartedInKafkaAssignerMode.incrementAndGet();
        } else {
            this._numExecutionStartedInNonKafkaAssignerMode.incrementAndGet();
        }
        this._proposalExecutor.submit(new ProposalExecutionRunnable(loadMonitor, demotedBrokers, removedBrokers, replicationThrottle, isTriggeredByUserRequest));
    }

    private void sanityCheckOngoingMovement() throws OngoingExecutionException {
        boolean hasOngoingIntraBrokerReplicaMovement;
        boolean hasOngoingPartitionReassignments;
        try {
            hasOngoingPartitionReassignments = this.hasOngoingPartitionReassignments();
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new IllegalStateException("Failed to retrieve if there are already ongoing partition reassignments.", e);
        }
        if (hasOngoingPartitionReassignments) {
            throw new OngoingExecutionException("There are ongoing inter-broker partition movements.");
        }
        try {
            hasOngoingIntraBrokerReplicaMovement = ExecutorAdminUtils.hasOngoingIntraBrokerReplicaMovement(this._metadataClient.cluster().nodes().stream().mapToInt(Node::id).boxed().collect(Collectors.toSet()), this._adminClient, this._config);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new IllegalStateException("Failed to retrieve if there are already ongoing intra-broker replica reassignments.", e);
        }
        if (hasOngoingIntraBrokerReplicaMovement) {
            throw new OngoingExecutionException("There are ongoing intra-broker partition movements.");
        }
        if (this.hasOngoingLeaderElection()) {
            throw new OngoingExecutionException("There are ongoing leadership movements.");
        }
    }

    private void processExecuteProposalsFailure() {
        this._executionTaskManager.clear();
        this._uuid = null;
        this._reasonSupplier = null;
        this._executorState = ExecutorState.noTaskInProgress(this.recentlyDemotedBrokers(), this.recentlyRemovedBrokers());
    }

    public synchronized void setGeneratingProposalsForExecution(String uuid, Supplier<String> reasonSupplier, boolean isTriggeredByUserRequest) throws OngoingExecutionException {
        ExecutorState.State currentExecutorState = this._executorState.state();
        if (currentExecutorState != ExecutorState.State.NO_TASK_IN_PROGRESS) {
            throw new OngoingExecutionException(String.format("Cannot generate proposals while the executor is in %s state.", new Object[]{currentExecutorState}));
        }
        if (uuid == null) {
            throw new IllegalArgumentException("UUID of the execution cannot be null.");
        }
        if (reasonSupplier == null) {
            throw new IllegalArgumentException("Reason supplier cannot be null.");
        }
        this._uuid = uuid;
        this._reasonSupplier = reasonSupplier;
        this._executorState = ExecutorState.generatingProposalsForExecution(this._uuid, this._reasonSupplier.get(), this.recentlyDemotedBrokers(), this.recentlyRemovedBrokers(), isTriggeredByUserRequest);
    }

    public synchronized void failGeneratingProposalsForExecution(String uuid) {
        if (this._executorState.state() == ExecutorState.State.GENERATING_PROPOSALS_FOR_EXECUTION) {
            if (uuid != null && uuid.equals(this._uuid)) {
                LOG.info("Failed to generate proposals for execution (UUID: {} reason: {}).", (Object)uuid, (Object)this._reasonSupplier.get());
                this._uuid = null;
                this._reasonSupplier = null;
                this._executorState = ExecutorState.noTaskInProgress(this.recentlyDemotedBrokers(), this.recentlyRemovedBrokers());
            } else {
                LOG.warn("UUID mismatch in attempt to report failure to generate proposals (received: {} expected: {})", (Object)uuid, (Object)this._uuid);
            }
        }
    }

    public synchronized void userTriggeredStopExecution(boolean forceExecutionStop) {
        if (this.stopExecution(forceExecutionStop)) {
            LOG.info("User requested to stop the ongoing proposal execution" + (forceExecutionStop ? " forcefully." : "."));
            this._numExecutionStoppedByUser.incrementAndGet();
            this._executionStoppedByUser.set(true);
        }
    }

    private synchronized boolean stopExecution(boolean forceExecutionStop) {
        if (forceExecutionStop && (this._stopSignal.compareAndSet(0, 2) || this._stopSignal.compareAndSet(1, 2)) || !forceExecutionStop && this._stopSignal.compareAndSet(0, 1)) {
            this._numExecutionStopped.incrementAndGet();
            this._executionTaskManager.setStopRequested();
            return true;
        }
        return false;
    }

    public synchronized void shutdown() {
        LOG.info("Shutting down executor.");
        if (this._hasOngoingExecution) {
            LOG.warn("Shutdown executor may take long because execution is still in progress.");
        }
        this._proposalExecutor.shutdown();
        try {
            this._proposalExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for anomaly detector to shutdown.");
        }
        this._metadataClient.close();
        KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(this._kafkaZkClient);
        KafkaCruiseControlUtils.closeAdminClientWithTimeout(this._adminClient);
        this._executionHistoryScannerExecutor.shutdownNow();
        this._concurrencyAdjusterExecutor.shutdownNow();
        LOG.info("Executor shutdown completed.");
    }

    public boolean modifyOngoingExecution(boolean modify) {
        return this._ongoingExecutionIsBeingModified.compareAndSet(!modify, modify);
    }

    public boolean hasOngoingExecution() {
        return this._hasOngoingExecution;
    }

    public boolean hasOngoingPartitionReassignments() throws InterruptedException, ExecutionException, TimeoutException {
        return !ExecutionUtils.partitionsBeingReassigned(this._adminClient).isEmpty();
    }

    public boolean hasOngoingLeaderElection() {
        return !ExecutorUtils.ongoingLeaderElection(this._kafkaZkClient).isEmpty();
    }

    private class ProposalExecutionRunnable
    implements Runnable {
        private final LoadMonitor _loadMonitor;
        private final Set<Integer> _recentlyDemotedBrokers;
        private final Set<Integer> _recentlyRemovedBrokers;
        private final Long _replicationThrottle;
        private Throwable _executionException;
        private final boolean _isTriggeredByUserRequest;
        private long _lastSlowTaskReportingTimeMs;
        private static final boolean FORCE_PAUSE_SAMPLING = true;

        ProposalExecutionRunnable(LoadMonitor loadMonitor, Collection<Integer> demotedBrokers, Collection<Integer> removedBrokers, Long replicationThrottle, boolean isTriggeredByUserRequest) {
            this._loadMonitor = loadMonitor;
            this._executionException = null;
            if (isTriggeredByUserRequest && Executor.this._userTaskManager == null) {
                Executor.this.processExecuteProposalsFailure();
                Executor.this._hasOngoingExecution = false;
                Executor.this._stopSignal.set(0);
                Executor.this._executionStoppedByUser.set(false);
                LOG.error("Failed to initialize proposal execution.");
                throw new IllegalStateException("User task manager cannot be null.");
            }
            if (demotedBrokers != null) {
                demotedBrokers.forEach(id -> {
                    Long demoteStartTime = (Long)Executor.this._latestDemoteStartTimeMsByBrokerId.get(id);
                    if (demoteStartTime == null || demoteStartTime != 0L) {
                        Executor.this._latestDemoteStartTimeMsByBrokerId.put((Integer)id, Executor.this._time.milliseconds());
                    }
                });
            }
            if (removedBrokers != null) {
                removedBrokers.forEach(id -> {
                    Long removeStartTime = (Long)Executor.this._latestRemoveStartTimeMsByBrokerId.get(id);
                    if (removeStartTime == null || removeStartTime != 0L) {
                        Executor.this._latestRemoveStartTimeMsByBrokerId.put((Integer)id, Executor.this._time.milliseconds());
                    }
                });
            }
            this._recentlyDemotedBrokers = Executor.this.recentlyDemotedBrokers();
            this._recentlyRemovedBrokers = Executor.this.recentlyRemovedBrokers();
            this._replicationThrottle = replicationThrottle;
            this._isTriggeredByUserRequest = isTriggeredByUserRequest;
            this._lastSlowTaskReportingTimeMs = -1L;
        }

        @Override
        public void run() {
            LOG.info("Starting executing balancing proposals.");
            UserTaskManager.UserTaskInfo userTaskInfo = this.initExecution();
            this.execute(userTaskInfo);
            LOG.info("Execution finished.");
        }

        private UserTaskManager.UserTaskInfo initExecution() {
            UserTaskManager.UserTaskInfo userTaskInfo = null;
            if (this._isTriggeredByUserRequest) {
                userTaskInfo = Executor.this._userTaskManager.markTaskExecutionBegan(Executor.this._uuid);
            }
            String reason = Executor.this._reasonSupplier.get();
            Executor.this._executorState = ExecutorState.executionStarting(Executor.this._uuid, reason, this._recentlyDemotedBrokers, this._recentlyRemovedBrokers, this._isTriggeredByUserRequest);
            OPERATION_LOG.info("Task [{}] execution starts. The reason of execution is {}.", (Object)Executor.this._uuid, (Object)reason);
            Executor.this._ongoingExecutionIsBeingModified.set(false);
            return userTaskInfo;
        }

        private void adjustSamplingModeBeforeExecution() throws InterruptedException {
            while (this._loadMonitor.samplingMode() != MetricSampler.SamplingMode.BROKER_METRICS_ONLY) {
                try {
                    String reasonForPause = String.format("Paused-By-Cruise-Control-Before-Starting-Execution (Date: %s)", KafkaCruiseControlUtils.currentUtcDate());
                    this._loadMonitor.pauseMetricSampling(reasonForPause, true);
                    this._loadMonitor.setSamplingMode(MetricSampler.SamplingMode.BROKER_METRICS_ONLY);
                    break;
                }
                catch (IllegalStateException e) {
                    Thread.sleep(Executor.this.executionProgressCheckIntervalMs());
                    LOG.debug("Waiting for the load monitor to be ready to adjust sampling mode.", (Throwable)e);
                }
            }
            this._loadMonitor.resumeMetricSampling(String.format("Resumed-By-Cruise-Control-Before-Starting-Execution (Date: %s)", KafkaCruiseControlUtils.currentUtcDate()));
        }

        private void execute(UserTaskManager.UserTaskInfo userTaskInfo) {
            try {
                this.adjustSamplingModeBeforeExecution();
                if (Executor.this._executorState.state() == ExecutorState.State.STARTING_EXECUTION) {
                    Executor.this._executorState = ExecutorState.operationInProgress(ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS, Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION)), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), Executor.this._uuid, Executor.this._reasonSupplier.get(), this._recentlyDemotedBrokers, this._recentlyRemovedBrokers, this._isTriggeredByUserRequest);
                    this.interBrokerMoveReplicas();
                    this.updateOngoingExecutionState();
                }
                if (Executor.this._executorState.state() == ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS) {
                    Executor.this._executorState = ExecutorState.operationInProgress(ExecutorState.State.INTRA_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS, Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION)), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), Executor.this._uuid, Executor.this._reasonSupplier.get(), this._recentlyDemotedBrokers, this._recentlyRemovedBrokers, this._isTriggeredByUserRequest);
                    this.intraBrokerMoveReplicas();
                    this.updateOngoingExecutionState();
                }
                boolean forceStoppedLeadershipMoves = false;
                if (Executor.this._executorState.state() == ExecutorState.State.INTRA_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS) {
                    Executor.this._executorState = ExecutorState.operationInProgress(ExecutorState.State.LEADER_MOVEMENT_TASK_IN_PROGRESS, Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.LEADER_ACTION)), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), Executor.this._uuid, Executor.this._reasonSupplier.get(), this._recentlyDemotedBrokers, this._recentlyRemovedBrokers, this._isTriggeredByUserRequest);
                    forceStoppedLeadershipMoves = this.moveLeaderships();
                    this.updateOngoingExecutionState();
                }
                if (Executor.this._executorState.state() == ExecutorState.State.STOPPING_EXECUTION && Executor.this._stopSignal.get() == 2 && forceStoppedLeadershipMoves) {
                    ExecutionUtils.deleteZNodesToForceStopLeadershipMoves(Executor.this._kafkaZkClient);
                }
            }
            catch (Throwable t) {
                LOG.error("Executor got exception during execution", t);
                this._executionException = t;
            }
            finally {
                this.notifyFinishedTask(userTaskInfo);
                this.clearCompletedExecution();
            }
        }

        private void notifyFinishedTask(UserTaskManager.UserTaskInfo userTaskInfo) {
            if (userTaskInfo != null) {
                Executor.this._userTaskManager.markTaskExecutionFinished(Executor.this._uuid, Executor.this._executorState.state() == ExecutorState.State.STOPPING_EXECUTION || this._executionException != null);
            } else {
                Executor.this._anomalyDetectorManager.markSelfHealingFinished(Executor.this._uuid);
            }
            String prefix = String.format("Task [%s] %s execution is ", Executor.this._uuid, userTaskInfo != null ? "user" + userTaskInfo.requestUrl() : "self-healing");
            if (Executor.this._executorState.state() == ExecutorState.State.STOPPING_EXECUTION) {
                this.notifyExecutionFinished(String.format("%sstopped by %s.", prefix, Executor.this._executionStoppedByUser.get() ? "user" : "Cruise Control"), true);
            } else if (this._executionException != null) {
                this.notifyExecutionFinished(String.format("%sinterrupted with exception %s.", prefix, this._executionException.getMessage()), true);
            } else {
                this.notifyExecutionFinished(String.format("%sfinished.", prefix), false);
            }
        }

        private void notifyExecutionFinished(String message, boolean isWarning) {
            if (isWarning) {
                Executor.this._executorNotifier.sendAlert(message);
                OPERATION_LOG.warn(message);
            } else {
                Executor.this._executorNotifier.sendNotification(message);
                OPERATION_LOG.info(message);
            }
        }

        private void clearCompletedExecution() {
            Executor.this._executionTaskManager.clear();
            Executor.this._uuid = null;
            Executor.this._reasonSupplier = null;
            Executor.this._executorState = ExecutorState.noTaskInProgress(this._recentlyDemotedBrokers, this._recentlyRemovedBrokers);
            Executor.this._hasOngoingExecution = false;
            Executor.this._stopSignal.set(0);
            Executor.this._executionStoppedByUser.set(false);
            this._loadMonitor.setSamplingMode(MetricSampler.SamplingMode.ALL);
        }

        private void updateOngoingExecutionState() {
            block6: {
                block5: {
                    if (Executor.this._stopSignal.get() != 0) break block5;
                    switch (Executor.this._executorState.state()) {
                        case LEADER_MOVEMENT_TASK_IN_PROGRESS: {
                            Executor.this._executorState = ExecutorState.operationInProgress(ExecutorState.State.LEADER_MOVEMENT_TASK_IN_PROGRESS, Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.LEADER_ACTION)), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), Executor.this._uuid, Executor.this._reasonSupplier.get(), this._recentlyDemotedBrokers, this._recentlyRemovedBrokers, this._isTriggeredByUserRequest);
                            break block6;
                        }
                        case INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS: {
                            Executor.this._executorState = ExecutorState.operationInProgress(ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS, Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION)), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), Executor.this._uuid, Executor.this._reasonSupplier.get(), this._recentlyDemotedBrokers, this._recentlyRemovedBrokers, this._isTriggeredByUserRequest);
                            break block6;
                        }
                        case INTRA_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS: {
                            Executor.this._executorState = ExecutorState.operationInProgress(ExecutorState.State.INTRA_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS, Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION)), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), Executor.this._uuid, Executor.this._reasonSupplier.get(), this._recentlyDemotedBrokers, this._recentlyRemovedBrokers, this._isTriggeredByUserRequest);
                            break block6;
                        }
                        default: {
                            throw new IllegalStateException("Unexpected ongoing execution state " + Executor.this._executorState.state());
                        }
                    }
                }
                HashSet<ExecutionTask.TaskType> taskTypesToGetFullList = new HashSet<ExecutionTask.TaskType>(ExecutionTask.TaskType.cachedValues());
                Executor.this._executorState = ExecutorState.operationInProgress(ExecutorState.State.STOPPING_EXECUTION, Executor.this._executionTaskManager.getExecutionTasksSummary(taskTypesToGetFullList), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), Executor.this._uuid, Executor.this._reasonSupplier.get(), this._recentlyDemotedBrokers, this._recentlyRemovedBrokers, this._isTriggeredByUserRequest);
            }
        }

        private void interBrokerMoveReplicas() throws InterruptedException, ExecutionException, TimeoutException {
            ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(Executor.this._kafkaZkClient, this._replicationThrottle);
            int numTotalPartitionMovements = Executor.this._executionTaskManager.numRemainingInterBrokerPartitionMovements();
            long totalDataToMoveInMB = Executor.this._executionTaskManager.remainingInterBrokerDataToMoveInMB();
            LOG.info("Starting {} inter-broker partition movements.", (Object)numTotalPartitionMovements);
            int partitionsToMove = numTotalPartitionMovements;
            while (!(partitionsToMove <= 0 && Executor.this.inExecutionTasks().isEmpty() || Executor.this._stopSignal.get() != 0)) {
                List<ExecutionTask> tasksToExecute = Executor.this._executionTaskManager.getInterBrokerReplicaMovementTasks();
                LOG.info("Executor will execute {} task(s)", (Object)tasksToExecute.size());
                AlterPartitionReassignmentsResult result = null;
                if (!tasksToExecute.isEmpty()) {
                    throttleHelper.setThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()));
                    Executor.this._executionTaskManager.markTasksInProgress(tasksToExecute);
                    result = ExecutionUtils.submitReplicaReassignmentTasks(Executor.this._adminClient, tasksToExecute);
                }
                List<ExecutionTask> completedTasks = this.waitForInterBrokerReplicaTasksToFinish(result);
                partitionsToMove = Executor.this._executionTaskManager.numRemainingInterBrokerPartitionMovements();
                int numFinishedPartitionMovements = Executor.this._executionTaskManager.numFinishedInterBrokerPartitionMovements();
                long finishedDataMovementInMB = Executor.this._executionTaskManager.finishedInterBrokerDataMovementInMB();
                LOG.info("{}/{} ({}%) inter-broker partition movements completed. {}/{} ({}%) MB have been moved.", new Object[]{numFinishedPartitionMovements, numTotalPartitionMovements, String.format("%.2f", (double)numFinishedPartitionMovements * 100.0 / (double)numTotalPartitionMovements), finishedDataMovementInMB, totalDataToMoveInMB, totalDataToMoveInMB == 0L ? Integer.valueOf(100) : String.format("%.2f", (double)finishedDataMovementInMB * 100.0 / (double)totalDataToMoveInMB)});
                throttleHelper.clearThrottles(completedTasks, tasksToExecute.stream().filter(t -> t.state() == ExecutionTaskState.IN_PROGRESS).collect(Collectors.toList()));
            }
            if (Executor.this._stopSignal.get() == 0) {
                LOG.info("Inter-broker partition movements finished.");
            } else {
                ExecutionTaskTracker.ExecutionTasksSummary executionTasksSummary = Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.emptySet());
                Map<ExecutionTaskState, Integer> partitionMovementTasksByState = executionTasksSummary.taskStat().get((Object)ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION);
                LOG.info("Inter-broker partition movements stopped. For inter-broker partition movements {} tasks cancelled, {} tasks in-progress, {} tasks aborting, {} tasks aborted, {} tasks dead, {} tasks completed, {} remaining data to move; for intra-broker partition movement {} tasks cancelled; for leadership movements {} task cancelled.", new Object[]{partitionMovementTasksByState.get((Object)ExecutionTaskState.PENDING), partitionMovementTasksByState.get((Object)ExecutionTaskState.IN_PROGRESS), partitionMovementTasksByState.get((Object)ExecutionTaskState.ABORTING), partitionMovementTasksByState.get((Object)ExecutionTaskState.ABORTED), partitionMovementTasksByState.get((Object)ExecutionTaskState.DEAD), partitionMovementTasksByState.get((Object)ExecutionTaskState.COMPLETED), executionTasksSummary.remainingInterBrokerDataToMoveInMB(), executionTasksSummary.taskStat().get((Object)ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION).get((Object)ExecutionTaskState.PENDING), executionTasksSummary.taskStat().get((Object)ExecutionTask.TaskType.LEADER_ACTION).get((Object)ExecutionTaskState.PENDING)});
            }
        }

        private void intraBrokerMoveReplicas() {
            int numTotalPartitionMovements = Executor.this._executionTaskManager.numRemainingIntraBrokerPartitionMovements();
            long totalDataToMoveInMB = Executor.this._executionTaskManager.remainingIntraBrokerDataToMoveInMB();
            LOG.info("Starting {} intra-broker partition movements.", (Object)numTotalPartitionMovements);
            int partitionsToMove = numTotalPartitionMovements;
            while (!(partitionsToMove <= 0 && Executor.this.inExecutionTasks().isEmpty() || Executor.this._stopSignal.get() != 0)) {
                List<ExecutionTask> tasksToExecute = Executor.this._executionTaskManager.getIntraBrokerReplicaMovementTasks();
                LOG.info("Executor will execute {} task(s)", (Object)tasksToExecute.size());
                if (!tasksToExecute.isEmpty()) {
                    Executor.this._executionTaskManager.markTasksInProgress(tasksToExecute);
                    ExecutorAdminUtils.executeIntraBrokerReplicaMovements(tasksToExecute, Executor.this._adminClient, Executor.this._executionTaskManager, Executor.this._config);
                }
                this.waitForIntraBrokerReplicaTasksToFinish();
                partitionsToMove = Executor.this._executionTaskManager.numRemainingIntraBrokerPartitionMovements();
                int numFinishedPartitionMovements = Executor.this._executionTaskManager.numFinishedIntraBrokerPartitionMovements();
                long finishedDataToMoveInMB = Executor.this._executionTaskManager.finishedIntraBrokerDataToMoveInMB();
                LOG.info("{}/{} ({}%) intra-broker partition movements completed. {}/{} ({}%) MB have been moved.", new Object[]{numFinishedPartitionMovements, numTotalPartitionMovements, String.format("%.2f", (double)numFinishedPartitionMovements * 100.0 / (double)numTotalPartitionMovements), finishedDataToMoveInMB, totalDataToMoveInMB, totalDataToMoveInMB == 0L ? Integer.valueOf(100) : String.format("%.2f", (double)finishedDataToMoveInMB * 100.0 / (double)totalDataToMoveInMB)});
            }
            Set<ExecutionTask> inExecutionTasks = Executor.this.inExecutionTasks();
            while (!inExecutionTasks.isEmpty()) {
                LOG.info("Waiting for {} tasks moving {} MB to finish", (Object)inExecutionTasks.size(), (Object)Executor.this._executionTaskManager.inExecutionIntraBrokerDataMovementInMB());
                this.waitForIntraBrokerReplicaTasksToFinish();
                inExecutionTasks = Executor.this.inExecutionTasks();
            }
            if (Executor.this.inExecutionTasks().isEmpty()) {
                LOG.info("Intra-broker partition movements finished.");
            } else if (Executor.this._stopSignal.get() != 0) {
                ExecutionTaskTracker.ExecutionTasksSummary executionTasksSummary = Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.emptySet());
                Map<ExecutionTaskState, Integer> partitionMovementTasksByState = executionTasksSummary.taskStat().get((Object)ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION);
                LOG.info("Intra-broker partition movements stopped. For intra-broker partition movements {} tasks cancelled, {} tasks in-progress, {} tasks aborting, {} tasks aborted, {} tasks dead, {} tasks completed, {} remaining data to move; for leadership movements {} task cancelled.", new Object[]{partitionMovementTasksByState.get((Object)ExecutionTaskState.PENDING), partitionMovementTasksByState.get((Object)ExecutionTaskState.IN_PROGRESS), partitionMovementTasksByState.get((Object)ExecutionTaskState.ABORTING), partitionMovementTasksByState.get((Object)ExecutionTaskState.ABORTED), partitionMovementTasksByState.get((Object)ExecutionTaskState.DEAD), partitionMovementTasksByState.get((Object)ExecutionTaskState.COMPLETED), executionTasksSummary.remainingIntraBrokerDataToMoveInMB(), executionTasksSummary.taskStat().get((Object)ExecutionTask.TaskType.LEADER_ACTION).get((Object)ExecutionTaskState.PENDING)});
            }
        }

        private boolean moveLeaderships() {
            int numTotalLeadershipMovements = Executor.this._executionTaskManager.numRemainingLeadershipMovements();
            LOG.info("Starting {} leadership movements.", (Object)numTotalLeadershipMovements);
            int numFinishedLeadershipMovements = 0;
            while (Executor.this._executionTaskManager.numRemainingLeadershipMovements() != 0 && Executor.this._stopSignal.get() == 0) {
                this.updateOngoingExecutionState();
                LOG.info("{}/{} ({}%) leadership movements completed.", new Object[]{numFinishedLeadershipMovements += this.moveLeadershipInBatch(), numTotalLeadershipMovements, numFinishedLeadershipMovements * 100 / numTotalLeadershipMovements});
            }
            if (Executor.this.inExecutionTasks().isEmpty()) {
                LOG.info("Leadership movements finished.");
            } else if (Executor.this._stopSignal.get() != 0) {
                Map<ExecutionTaskState, Integer> leadershipMovementTasksByState = Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.emptySet()).taskStat().get((Object)ExecutionTask.TaskType.LEADER_ACTION);
                LOG.info("Leadership movements stopped. {} tasks cancelled, {} tasks in-progress, {} tasks aborting, {} tasks aborted, {} tasks dead, {} tasks completed.", new Object[]{leadershipMovementTasksByState.get((Object)ExecutionTaskState.PENDING), leadershipMovementTasksByState.get((Object)ExecutionTaskState.IN_PROGRESS), leadershipMovementTasksByState.get((Object)ExecutionTaskState.ABORTING), leadershipMovementTasksByState.get((Object)ExecutionTaskState.ABORTED), leadershipMovementTasksByState.get((Object)ExecutionTaskState.DEAD), leadershipMovementTasksByState.get((Object)ExecutionTaskState.COMPLETED)});
            }
            return Executor.this._stopSignal.get() == 2;
        }

        private int moveLeadershipInBatch() {
            List<ExecutionTask> leadershipMovementTasks = Executor.this._executionTaskManager.getLeadershipMovementTasks();
            int numLeadershipToMove = leadershipMovementTasks.size();
            LOG.debug("Executing {} leadership movements in a batch.", (Object)numLeadershipToMove);
            if (!leadershipMovementTasks.isEmpty() && Executor.this._stopSignal.get() == 0) {
                while (Executor.this.hasOngoingLeaderElection()) {
                    try {
                        LOG.error("Waiting for Kafka Controller to delete /admin/preferred_replica_election zNode. Are other admin tools triggering a PLE?");
                        Thread.sleep(Executor.this.executionProgressCheckIntervalMs());
                    }
                    catch (InterruptedException e) {
                        LOG.warn("Interrupted while waiting for Kafka Controller to delete /admin/preferred_replica_election zNode.");
                    }
                }
                Executor.this._executionTaskManager.markTasksInProgress(leadershipMovementTasks);
                ExecutorUtils.executePreferredLeaderElection(Executor.this._kafkaZkClient, leadershipMovementTasks);
                LOG.trace("Waiting for leadership movement batch to finish.");
                while (!Executor.this.inExecutionTasks().isEmpty() && Executor.this._stopSignal.get() == 0) {
                    this.waitForLeadershipTasksToFinish();
                }
            }
            return numLeadershipToMove;
        }

        private Cluster getClusterForExecutionProgressCheck() {
            try {
                Thread.sleep(Executor.this.executionProgressCheckIntervalMs());
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Tasks in execution: {}", Executor.this.inExecutionTasks());
            }
            return Executor.this._metadataClient.refreshMetadata().cluster();
        }

        private List<ExecutionTask> waitForInterBrokerReplicaTasksToFinish(AlterPartitionReassignmentsResult result) throws InterruptedException, ExecutionException, TimeoutException {
            boolean retry;
            ArrayList<ExecutionTask> finishedTasks = new ArrayList<ExecutionTask>();
            HashSet<Long> stoppedTaskIds = new HashSet<Long>();
            HashSet<Long> deletedTaskIds = new HashSet<Long>();
            HashSet<Long> deadTaskIds = new HashSet<Long>();
            HashSet<TopicPartition> deletedUponSubmission = new HashSet<TopicPartition>();
            HashSet<TopicPartition> deadUponSubmission = new HashSet<TopicPartition>();
            HashSet<TopicPartition> noReassignmentToCancel = new HashSet<TopicPartition>(0);
            ExecutionUtils.processAlterPartitionReassignmentsResult(result, deletedUponSubmission, deadUponSubmission, noReassignmentToCancel);
            if (!noReassignmentToCancel.isEmpty()) {
                throw new IllegalStateException(String.format("Attempt to cancel reassignment of partitions %s during regular execution.", noReassignmentToCancel));
            }
            do {
                Cluster cluster = this.getClusterForExecutionProgressCheck();
                ArrayList<ExecutionTask> deadInterBrokerReplicaTasks = new ArrayList<ExecutionTask>();
                ArrayList<ExecutionTask> stoppedInterBrokerReplicaTasks = new ArrayList<ExecutionTask>();
                ArrayList<ExecutionTask> slowTasksToReport = new ArrayList<ExecutionTask>();
                boolean shouldReportSlowTasks = Executor.this._time.milliseconds() - this._lastSlowTaskReportingTimeMs > Executor.this._slowTaskAlertingBackoffTimeMs;
                for (ExecutionTask task : Executor.this.inExecutionTasks()) {
                    TopicPartition tp = task.proposal().topicPartition();
                    if (Executor.this._stopSignal.get() != 0) {
                        LOG.debug("Task {} is marked as dead to stop the execution with a rollback.", (Object)task);
                        finishedTasks.add(task);
                        stoppedTaskIds.add(task.executionId());
                        Executor.this._executionTaskManager.markTaskDead(task);
                        stoppedInterBrokerReplicaTasks.add(task);
                        continue;
                    }
                    if (cluster.partition(tp) == null || deletedUponSubmission.contains(tp)) {
                        this.handleProgressWithTopicDeletion(task, finishedTasks, deletedTaskIds);
                        continue;
                    }
                    if (ExecutionUtils.isInterBrokerReplicaActionDone(cluster, task)) {
                        this.handleProgressWithCompletion(task, finishedTasks);
                        continue;
                    }
                    if (shouldReportSlowTasks) {
                        task.maybeReportExecutionTooSlow(Executor.this._time.milliseconds(), slowTasksToReport);
                    }
                    if (!this.maybeMarkTaskAsDead(cluster, null, task, deadUponSubmission)) continue;
                    deadTaskIds.add(task.executionId());
                    deadInterBrokerReplicaTasks.add(task);
                    finishedTasks.add(task);
                }
                this.sendSlowExecutionAlert(slowTasksToReport);
                this.handleDeadInterBrokerReplicaTasks(deadInterBrokerReplicaTasks, stoppedInterBrokerReplicaTasks);
                this.updateOngoingExecutionState();
                boolean bl = retry = !Executor.this.inExecutionTasks().isEmpty() && finishedTasks.isEmpty();
                if (!retry) continue;
                this.maybeReexecuteInterBrokerReplicaTasks(deletedUponSubmission, deadUponSubmission);
            } while (retry);
            LOG.info("Finished tasks: {}.{}{}{}", new Object[]{finishedTasks, stoppedTaskIds.isEmpty() ? "" : String.format(". [Stopped: %s]", stoppedTaskIds), deletedTaskIds.isEmpty() ? "" : String.format(". [Deleted: %s]", deletedTaskIds), deadTaskIds.isEmpty() ? "" : String.format(". [Dead: %s]", deadTaskIds)});
            return finishedTasks;
        }

        private void handleProgressWithTopicDeletion(ExecutionTask task, List<ExecutionTask> finishedTasks, Set<Long> deletedTaskIds) {
            LOG.debug("Task {} is marked as finished because the topic has been deleted.", (Object)task);
            finishedTasks.add(task);
            deletedTaskIds.add(task.executionId());
            Executor.this._executionTaskManager.markTaskAborting(task);
            Executor.this._executionTaskManager.markTaskDone(task);
        }

        private void handleProgressWithCompletion(ExecutionTask task, List<ExecutionTask> finishedTasks) {
            finishedTasks.add(task);
            Executor.this._executionTaskManager.markTaskDone(task);
        }

        private void waitForLeadershipTasksToFinish() {
            ArrayList<ExecutionTask> finishedTasks = new ArrayList<ExecutionTask>();
            HashSet<Long> stoppedTaskIds = new HashSet<Long>();
            HashSet<Long> deletedTaskIds = new HashSet<Long>();
            HashSet<Long> deadTaskIds = new HashSet<Long>();
            do {
                this.maybeReexecuteLeadershipTasks();
                Cluster cluster = this.getClusterForExecutionProgressCheck();
                ArrayList<ExecutionTask> slowTasksToReport = new ArrayList<ExecutionTask>();
                boolean shouldReportSlowTasks = Executor.this._time.milliseconds() - this._lastSlowTaskReportingTimeMs > Executor.this._slowTaskAlertingBackoffTimeMs;
                for (ExecutionTask task : Executor.this.inExecutionTasks()) {
                    TopicPartition tp = task.proposal().topicPartition();
                    if (Executor.this._stopSignal.get() == 2) {
                        LOG.debug("Task {} is marked as dead to force-stop the execution with a controller bounce.", (Object)task);
                        finishedTasks.add(task);
                        stoppedTaskIds.add(task.executionId());
                        Executor.this._executionTaskManager.markTaskDead(task);
                        continue;
                    }
                    if (cluster.partition(tp) == null) {
                        this.handleProgressWithTopicDeletion(task, finishedTasks, deletedTaskIds);
                        continue;
                    }
                    if (ExecutionUtils.isLeadershipMovementDone(cluster, task)) {
                        this.handleProgressWithCompletion(task, finishedTasks);
                        continue;
                    }
                    if (shouldReportSlowTasks) {
                        task.maybeReportExecutionTooSlow(Executor.this._time.milliseconds(), slowTasksToReport);
                    }
                    if (!this.maybeMarkTaskAsDead(cluster, null, task, null)) continue;
                    deadTaskIds.add(task.executionId());
                    finishedTasks.add(task);
                }
                this.sendSlowExecutionAlert(slowTasksToReport);
                this.updateOngoingExecutionState();
            } while (!Executor.this.inExecutionTasks().isEmpty() && finishedTasks.isEmpty());
            LOG.info("Finished tasks: {}.{}{}{}", new Object[]{finishedTasks, stoppedTaskIds.isEmpty() ? "" : String.format(". [Force-Stopped: %s]", stoppedTaskIds), deletedTaskIds.isEmpty() ? "" : String.format(". [Deleted: %s]", deletedTaskIds), deadTaskIds.isEmpty() ? "" : String.format(". [Dead: %s]", deadTaskIds)});
        }

        private void waitForIntraBrokerReplicaTasksToFinish() {
            ArrayList<ExecutionTask> finishedTasks = new ArrayList<ExecutionTask>();
            HashSet<Long> deletedTaskIds = new HashSet<Long>();
            HashSet<Long> deadTaskIds = new HashSet<Long>();
            do {
                this.maybeReexecuteIntraBrokerReplicaTasks();
                Cluster cluster = this.getClusterForExecutionProgressCheck();
                Map<ExecutionTask, DescribeReplicaLogDirsResult.ReplicaLogDirInfo> logDirInfoByTask = ExecutorAdminUtils.getLogdirInfoForExecutionTask(Executor.this._executionTaskManager.inExecutionTasks(Collections.singleton(ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION)), Executor.this._adminClient, Executor.this._config);
                ArrayList<ExecutionTask> slowTasksToReport = new ArrayList<ExecutionTask>();
                boolean shouldReportSlowTasks = Executor.this._time.milliseconds() - this._lastSlowTaskReportingTimeMs > Executor.this._slowTaskAlertingBackoffTimeMs;
                for (ExecutionTask task : Executor.this.inExecutionTasks()) {
                    TopicPartition tp = task.proposal().topicPartition();
                    if (cluster.partition(tp) == null) {
                        this.handleProgressWithTopicDeletion(task, finishedTasks, deletedTaskIds);
                        continue;
                    }
                    if (ExecutionUtils.isIntraBrokerReplicaActionDone(logDirInfoByTask, task)) {
                        this.handleProgressWithCompletion(task, finishedTasks);
                        continue;
                    }
                    if (shouldReportSlowTasks) {
                        task.maybeReportExecutionTooSlow(Executor.this._time.milliseconds(), slowTasksToReport);
                    }
                    if (!this.maybeMarkTaskAsDead(cluster, logDirInfoByTask, task, null)) continue;
                    deadTaskIds.add(task.executionId());
                    finishedTasks.add(task);
                }
                this.sendSlowExecutionAlert(slowTasksToReport);
                this.updateOngoingExecutionState();
            } while (!Executor.this.inExecutionTasks().isEmpty() && finishedTasks.isEmpty());
            LOG.info("Finished tasks: {}.{}{}", new Object[]{finishedTasks, deletedTaskIds.isEmpty() ? "" : String.format(". [Deleted: %s]", deletedTaskIds), deadTaskIds.isEmpty() ? "" : String.format(". [Dead: %s]", deadTaskIds)});
        }

        private void handleDeadInterBrokerReplicaTasks(List<ExecutionTask> deadInterBrokerReplicaTasks, List<ExecutionTask> stoppedInterBrokerReplicaTasks) throws InterruptedException, ExecutionException, TimeoutException {
            ArrayList<ExecutionTask> tasksToCancel = new ArrayList<ExecutionTask>(deadInterBrokerReplicaTasks);
            tasksToCancel.addAll(stoppedInterBrokerReplicaTasks);
            if (!tasksToCancel.isEmpty()) {
                tasksToCancel.stream().filter(task -> task.state() != ExecutionTaskState.DEAD).forEach(task -> {
                    throw new IllegalArgumentException(String.format("Unexpected state for task %s (expected: %s).", new Object[]{task, ExecutionTaskState.DEAD}));
                });
                AlterPartitionReassignmentsResult result = ExecutionUtils.submitReplicaReassignmentTasks(Executor.this._adminClient, tasksToCancel);
                HashSet<TopicPartition> deleted = new HashSet<TopicPartition>();
                HashSet<TopicPartition> dead = new HashSet<TopicPartition>();
                HashSet<TopicPartition> noReassignmentToCancel = new HashSet<TopicPartition>();
                ExecutionUtils.processAlterPartitionReassignmentsResult(result, deleted, dead, noReassignmentToCancel);
                LOG.debug("Handling dead inter-broker replica tasks {} (deleted: {} dead: {} noReassignmentToCancel: {})", new Object[]{tasksToCancel, deleted, dead, noReassignmentToCancel});
                if (Executor.this._stopSignal.get() == 0) {
                    LOG.info("Stop the execution due to {} dead tasks: {}.", (Object)tasksToCancel.size(), tasksToCancel);
                    Executor.this.stopExecution(false);
                }
                if (deadInterBrokerReplicaTasks.isEmpty()) {
                    Set beingCancelled = tasksToCancel.stream().map(task -> task.proposal().topicPartition()).collect(Collectors.toSet());
                    beingCancelled.removeAll(deleted);
                    beingCancelled.removeAll(dead);
                    beingCancelled.removeAll(noReassignmentToCancel);
                    while (true) {
                        HashSet<TopicPartition> intersection = new HashSet<TopicPartition>(ExecutionUtils.partitionsBeingReassigned(Executor.this._adminClient));
                        intersection.retainAll(beingCancelled);
                        if (intersection.isEmpty()) break;
                        try {
                            LOG.info("Waiting for the rollback of ongoing inter-broker replica reassignments for {}.", intersection);
                            Thread.sleep(Executor.this.executionProgressCheckIntervalMs());
                        }
                        catch (InterruptedException interruptedException) {}
                    }
                }
            }
        }

        private void sendSlowExecutionAlert(List<ExecutionTask> slowTasksToReport) {
            if (!slowTasksToReport.isEmpty()) {
                StringBuilder sb = new StringBuilder();
                sb.append("Slow tasks are detected:\n");
                for (ExecutionTask task : slowTasksToReport) {
                    sb.append(String.format("\tID: %s\tstart_time:%s\tdetail:%s%n", task.executionId(), KafkaCruiseControlUtils.toDateString(task.startTimeMs(), "YYYY-MM-dd_HH:mm:ss z", "UTC"), task));
                }
                Executor.this._executorNotifier.sendAlert(sb.toString());
                this._lastSlowTaskReportingTimeMs = Executor.this._time.milliseconds();
            }
        }

        private boolean maybeMarkTaskAsDead(Cluster cluster, Map<ExecutionTask, DescribeReplicaLogDirsResult.ReplicaLogDirInfo> logdirInfoByTask, ExecutionTask task, Set<TopicPartition> deadInterBrokerReassignments) {
            if (task.state() == ExecutionTaskState.IN_PROGRESS || task.state() == ExecutionTaskState.ABORTING) {
                switch (task.type()) {
                    case LEADER_ACTION: {
                        if (cluster.nodeById(task.proposal().newLeader().brokerId().intValue()) == null) {
                            Executor.this._executionTaskManager.markTaskDead(task);
                            LOG.warn("Killing execution for task {} because the target leader is down.", (Object)task);
                            return true;
                        }
                        if (!((double)Executor.this._time.milliseconds() > (double)task.startTimeMs() + Executor.this._leaderMovementTimeoutMs)) break;
                        Executor.this._executionTaskManager.markTaskDead(task);
                        LOG.warn("Killing execution for task {} because it took longer than {} to finish.", (Object)task, (Object)Executor.this._leaderMovementTimeoutMs);
                        return true;
                    }
                    case INTER_BROKER_REPLICA_ACTION: {
                        for (ReplicaPlacementInfo broker : task.proposal().newReplicas()) {
                            if (cluster.nodeById(broker.brokerId().intValue()) != null && !deadInterBrokerReassignments.contains(task.proposal().topicPartition())) continue;
                            Executor.this._executionTaskManager.markTaskDead(task);
                            LOG.warn("Killing execution for task {} because the new replica {} is down.", (Object)task, (Object)broker);
                            return true;
                        }
                        break;
                    }
                    case INTRA_BROKER_REPLICA_ACTION: {
                        if (logdirInfoByTask.containsKey(task)) break;
                        Executor.this._executionTaskManager.markTaskDead(task);
                        LOG.warn("Killing execution for task {} because the destination disk is down.", (Object)task);
                        return true;
                    }
                    default: {
                        throw new IllegalStateException("Unknown task type " + task.type());
                    }
                }
            }
            return false;
        }

        private void maybeReexecuteInterBrokerReplicaTasks(Set<TopicPartition> deleted, Set<TopicPartition> dead) {
            ArrayList<ExecutionTask> interBrokerReplicaTasksToReexecute = new ArrayList<ExecutionTask>(Executor.this._executionTaskManager.inExecutionTasks(Collections.singleton(ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION)));
            boolean shouldReexecute = false;
            try {
                shouldReexecute = !ExecutionUtils.isSubset(ExecutionUtils.partitionsBeingReassigned(Executor.this._adminClient), interBrokerReplicaTasksToReexecute);
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                LOG.warn("Failed to retrieve partitions being reassigned. Skipping reexecution check for inter-broker replica actions.", (Throwable)e);
            }
            if (shouldReexecute) {
                LOG.info("Reexecuting tasks {}", interBrokerReplicaTasksToReexecute);
                AlterPartitionReassignmentsResult result = ExecutionUtils.submitReplicaReassignmentTasks(Executor.this._adminClient, interBrokerReplicaTasksToReexecute);
                HashSet<TopicPartition> noReassignmentToCancel = new HashSet<TopicPartition>();
                ExecutionUtils.processAlterPartitionReassignmentsResult(result, deleted, dead, noReassignmentToCancel);
                if (!noReassignmentToCancel.isEmpty()) {
                    throw new IllegalStateException(String.format("Attempt to cancel reassignment of partitions %s during re-execution.", noReassignmentToCancel));
                }
            }
        }

        private void maybeReexecuteIntraBrokerReplicaTasks() {
            ArrayList<ExecutionTask> intraBrokerReplicaTasksToReexecute = new ArrayList<ExecutionTask>(Executor.this._executionTaskManager.inExecutionTasks(Collections.singleton(ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION)));
            ExecutorAdminUtils.getLogdirInfoForExecutionTask(intraBrokerReplicaTasksToReexecute, Executor.this._adminClient, Executor.this._config).forEach((k, v) -> {
                String targetLogdir = k.proposal().replicasToMoveBetweenDisksByBroker().get(k.brokerId()).logdir();
                if (targetLogdir.equals(v.getCurrentReplicaLogDir()) || targetLogdir.equals(v.getFutureReplicaLogDir())) {
                    intraBrokerReplicaTasksToReexecute.remove(k);
                }
            });
            if (!intraBrokerReplicaTasksToReexecute.isEmpty()) {
                LOG.info("Reexecuting tasks {}", intraBrokerReplicaTasksToReexecute);
                ExecutorAdminUtils.executeIntraBrokerReplicaMovements(intraBrokerReplicaTasksToReexecute, Executor.this._adminClient, Executor.this._executionTaskManager, Executor.this._config);
            }
        }

        private void maybeReexecuteLeadershipTasks() {
            ArrayList<ExecutionTask> leaderActionsToReexecute;
            if (!Executor.this.hasOngoingLeaderElection() && !(leaderActionsToReexecute = new ArrayList<ExecutionTask>(Executor.this._executionTaskManager.inExecutionTasks(Collections.singleton(ExecutionTask.TaskType.LEADER_ACTION)))).isEmpty()) {
                LOG.info("Reexecuting tasks {}", leaderActionsToReexecute);
                ExecutorUtils.executePreferredLeaderElection(Executor.this._kafkaZkClient, leaderActionsToReexecute);
            }
        }
    }

    private class ConcurrencyAdjuster
    implements Runnable {
        private LoadMonitor _loadMonitor = null;

        ConcurrencyAdjuster() {
        }

        public synchronized void initAdjustment(LoadMonitor loadMonitor, Integer requestedInterBrokerPartitionMovementConcurrency, Integer requestedLeadershipMovementConcurrency) {
            this._loadMonitor = loadMonitor;
            Executor.this.setRequestedInterBrokerPartitionMovementConcurrency(requestedInterBrokerPartitionMovementConcurrency);
            Executor.this.setRequestedLeadershipMovementConcurrency(requestedLeadershipMovementConcurrency);
        }

        private boolean canRefreshConcurrency(ConcurrencyType concurrencyType) {
            if (!((Boolean)Executor.this._concurrencyAdjusterEnabled.get((Object)concurrencyType)).booleanValue() || this._loadMonitor == null) {
                return false;
            }
            switch (concurrencyType) {
                case LEADERSHIP: {
                    return Executor.this._executorState.state() == ExecutorState.State.LEADER_MOVEMENT_TASK_IN_PROGRESS;
                }
                case INTER_BROKER_REPLICA: {
                    return Executor.this._executorState.state() == ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS && !Executor.this._skipInterBrokerReplicaConcurrencyAdjustment;
                }
            }
            throw new IllegalArgumentException("Unsupported concurrency type " + concurrencyType + " is provided.");
        }

        private synchronized void refreshInterBrokerReplicaConcurrency() {
            Integer recommendedConcurrency;
            if (this.canRefreshConcurrency(ConcurrencyType.INTER_BROKER_REPLICA) && (recommendedConcurrency = ExecutionUtils.recommendedConcurrency(this._loadMonitor.currentBrokerMetricValues(), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), ConcurrencyType.INTER_BROKER_REPLICA)) != null) {
                Executor.this.setRequestedInterBrokerPartitionMovementConcurrency(recommendedConcurrency);
            }
        }

        private synchronized void refreshLeadershipConcurrency() {
            Integer recommendedConcurrency;
            if (this.canRefreshConcurrency(ConcurrencyType.LEADERSHIP) && (recommendedConcurrency = ExecutionUtils.recommendedConcurrency(this._loadMonitor.currentBrokerMetricValues(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), ConcurrencyType.LEADERSHIP)) != null) {
                Executor.this.setRequestedLeadershipMovementConcurrency(recommendedConcurrency);
            }
        }

        @Override
        public void run() {
            try {
                this.refreshInterBrokerReplicaConcurrency();
                this.refreshLeadershipConcurrency();
            }
            catch (Throwable t) {
                LOG.warn("Received exception when trying to adjust reassignment concurrency.", t);
            }
        }
    }

    private class ExecutionHistoryScanner
    implements Runnable {
        private ExecutionHistoryScanner() {
        }

        @Override
        public void run() {
            try {
                Executor.this.removeExpiredDemotionHistory();
                Executor.this.removeExpiredRemovalHistory();
            }
            catch (Throwable t) {
                LOG.warn("Received exception when trying to expire execution history.", t);
            }
        }
    }
}

