package com.linkedin.kafka.cruisecontrol.analyzer;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.async.progress.OperationProgress;
import com.linkedin.kafka.cruisecontrol.async.progress.OptimizationForGoal;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.constants.AnalyzerConfig;
import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionProposal;
import com.linkedin.kafka.cruisecontrol.executor.Executor;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils;
import com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner;
import com.linkedin.kafka.cruisecontrol.servlet.KafkaCruiseControlServletUtils;
import com.linkedin.kafka.cruisecontrol.servlet.response.stats.BrokerStats;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/GoalOptimizer.class */
public class GoalOptimizer implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(GoalOptimizer.class);
    private final List<Goal> _goalsByPriority;
    private final BalancingConstraint _balancingConstraint;
    private final Pattern _defaultExcludedTopics;
    private final LoadMonitor _loadMonitor;
    private final Time _time;
    private final int _numPrecomputingThreads;
    private final long _proposalExpirationMs;
    private final ExecutorService _proposalPrecomputingExecutor;
    private final AtomicBoolean _progressUpdateLock;
    private final AtomicReference<Exception> _proposalGenerationException;
    private final OperationProgress _proposalPrecomputingProgress;
    private final Object _cacheLock;
    private volatile OptimizerResult _cachedProposals;
    private volatile boolean _shutdown = false;
    private Thread _proposalPrecomputingSchedulerThread;
    private final boolean _allowCapacityEstimationOnProposalPrecompute;
    private final Timer _proposalComputationTimer;
    private final ModelCompletenessRequirements _defaultModelCompletenessRequirements;
    private final ModelCompletenessRequirements _requirementsWithAvailableValidWindows;
    private final Executor _executor;
    private volatile boolean _hasOngoingExplicitPrecomputation;
    private final double _priorityWeight;
    private final double _strictnessWeight;
    private final OptimizationOptionsGenerator _optimizationOptionsGenerator;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/GoalOptimizer$ProposalCandidateComputer.class */
    public class ProposalCandidateComputer implements Runnable {
        private final boolean _allowCapacityEstimation;

        ProposalCandidateComputer(boolean z) {
            this._allowCapacityEstimation = z;
        }

        @Override // java.lang.Runnable
        public void run() {
            GoalOptimizer.LOG.debug("Starting proposal candidate computer.");
            if (GoalOptimizer.this._loadMonitor == null) {
                GoalOptimizer.LOG.warn("No load monitor available. Skip computing proposal candidate.");
                return;
            }
            OperationProgress operationProgress = GoalOptimizer.this._progressUpdateLock.compareAndSet(false, true) ? GoalOptimizer.this._proposalPrecomputingProgress : new OperationProgress();
            try {
                LoadMonitor.AutoCloseableSemaphore acquireForModelGeneration = GoalOptimizer.this._loadMonitor.acquireForModelGeneration(operationProgress);
                try {
                    long milliseconds = GoalOptimizer.this._time.milliseconds();
                    ClusterModel clusterModel = GoalOptimizer.this._loadMonitor.clusterModel(GoalOptimizer.this._time.milliseconds(), GoalOptimizer.this._loadMonitor.meetCompletenessRequirements(GoalOptimizer.this._defaultModelCompletenessRequirements) ? GoalOptimizer.this._defaultModelCompletenessRequirements : GoalOptimizer.this._requirementsWithAvailableValidWindows, this._allowCapacityEstimation, operationProgress);
                    if (clusterModel.topics().isEmpty()) {
                        GoalOptimizer.LOG.warn("The cluster model does not have valid topics, skipping proposal precomputation.");
                    } else {
                        OptimizerResult optimizations = GoalOptimizer.this.optimizations(clusterModel, GoalOptimizer.this._goalsByPriority, operationProgress);
                        GoalOptimizer.LOG.debug("Generated a proposal candidate in {} ms.", Long.valueOf(GoalOptimizer.this._time.milliseconds() - milliseconds));
                        GoalOptimizer.this.updateCachedProposals(optimizations);
                    }
                    if (acquireForModelGeneration != null) {
                        acquireForModelGeneration.close();
                    }
                } finally {
                }
            } catch (OptimizationFailureException e) {
                GoalOptimizer.LOG.warn("Detected unfixable proposal optimization", e);
                exceptionHandler(e);
            } catch (Exception e2) {
                GoalOptimizer.LOG.error("Proposal precomputation encountered error", e2);
                exceptionHandler(e2);
            }
        }

        private void exceptionHandler(Exception exc) {
            GoalOptimizer.this.clearCachedProposal(exc);
            synchronized (GoalOptimizer.this._cacheLock) {
                GoalOptimizer.this._hasOngoingExplicitPrecomputation = false;
                GoalOptimizer.this._cacheLock.notifyAll();
            }
        }
    }

    public GoalOptimizer(KafkaCruiseControlConfig kafkaCruiseControlConfig, LoadMonitor loadMonitor, Time time, MetricRegistry metricRegistry, Executor executor, AdminClient adminClient) {
        this._goalsByPriority = AnalyzerUtils.getGoalsByPriority(kafkaCruiseControlConfig);
        this._defaultModelCompletenessRequirements = MonitorUtils.combineLoadRequirementOptions(this._goalsByPriority);
        this._requirementsWithAvailableValidWindows = new ModelCompletenessRequirements(1, this._defaultModelCompletenessRequirements.minMonitoredPartitionsPercentage(), this._defaultModelCompletenessRequirements.includeAllTopics());
        this._numPrecomputingThreads = kafkaCruiseControlConfig.getInt(AnalyzerConfig.NUM_PROPOSAL_PRECOMPUTE_THREADS_CONFIG).intValue();
        LOG.info("Goals by priority for precomputing: {}", this._goalsByPriority);
        this._balancingConstraint = new BalancingConstraint(kafkaCruiseControlConfig);
        this._defaultExcludedTopics = Pattern.compile(kafkaCruiseControlConfig.getString(AnalyzerConfig.TOPICS_EXCLUDED_FROM_PARTITION_MOVEMENT_CONFIG));
        this._proposalExpirationMs = kafkaCruiseControlConfig.getLong(AnalyzerConfig.PROPOSAL_EXPIRATION_MS_CONFIG).longValue();
        this._proposalPrecomputingExecutor = Executors.newScheduledThreadPool(numProposalComputingThreads(), new KafkaCruiseControlThreadFactory("ProposalPrecomputingExecutor", false, LOG));
        this._loadMonitor = loadMonitor;
        this._time = time;
        this._cacheLock = new ReentrantLock();
        this._cachedProposals = null;
        this._progressUpdateLock = new AtomicBoolean(false);
        this._proposalGenerationException = new AtomicReference<>();
        this._proposalPrecomputingProgress = new OperationProgress();
        this._proposalComputationTimer = metricRegistry.timer(MetricRegistry.name("GoalOptimizer", new String[]{"proposal-computation-timer"}));
        this._executor = executor;
        this._hasOngoingExplicitPrecomputation = false;
        this._priorityWeight = kafkaCruiseControlConfig.getDouble(AnalyzerConfig.GOAL_BALANCEDNESS_PRIORITY_WEIGHT_CONFIG).doubleValue();
        this._strictnessWeight = kafkaCruiseControlConfig.getDouble(AnalyzerConfig.GOAL_BALANCEDNESS_STRICTNESS_WEIGHT_CONFIG).doubleValue();
        this._allowCapacityEstimationOnProposalPrecompute = kafkaCruiseControlConfig.getBoolean(AnalyzerConfig.ALLOW_CAPACITY_ESTIMATION_ON_PROPOSAL_PRECOMPUTE_CONFIG).booleanValue();
        HashMap hashMap = new HashMap(2);
        hashMap.put(KafkaCruiseControlServletUtils.KAFKA_CRUISE_CONTROL_CONFIG_OBJECT_CONFIG, kafkaCruiseControlConfig);
        hashMap.put(KafkaCruiseControlUtils.ADMIN_CLIENT_CONFIG, adminClient);
        this._optimizationOptionsGenerator = (OptimizationOptionsGenerator) kafkaCruiseControlConfig.getConfiguredInstance(AnalyzerConfig.OPTIMIZATION_OPTIONS_GENERATOR_CLASS_CONFIG, OptimizationOptionsGenerator.class, hashMap);
    }

    @Override // java.lang.Runnable
    public void run() {
        this._proposalPrecomputingSchedulerThread = Thread.currentThread();
        LOG.info("Starting proposal candidate computation.");
        while (!this._shutdown && this._numPrecomputingThreads > 0) {
            LoadMonitorTaskRunner.LoadMonitorTaskRunnerState taskRunnerState = this._loadMonitor.taskRunnerState();
            long j = this._proposalExpirationMs;
            if (taskRunnerState == LoadMonitorTaskRunner.LoadMonitorTaskRunnerState.LOADING || taskRunnerState == LoadMonitorTaskRunner.LoadMonitorTaskRunnerState.BOOTSTRAPPING) {
                LOG.info("Skipping proposal precomputing because load monitor is in " + taskRunnerState + " state.");
                j = 30000;
            } else if (this._loadMonitor.meetCompletenessRequirements(this._requirementsWithAvailableValidWindows)) {
                try {
                    if (validCachedProposal()) {
                        LOG.debug("Skipping proposal precomputing because the cached proposal result is still valid. Cached generation: {}", this._cachedProposals.modelGeneration());
                    } else {
                        if (LOG.isDebugEnabled()) {
                            Logger logger = LOG;
                            Object[] objArr = new Object[3];
                            objArr[0] = this._cachedProposals == null ? null : this._cachedProposals.modelGeneration();
                            objArr[1] = this._loadMonitor.clusterModelGeneration();
                            objArr[2] = this._cachedProposals == null ? "" : String.format(" Cached was excluding default topics: %s.", this._cachedProposals.excludedTopics());
                            logger.debug("Invalidated cache. Model generation (cached: {}, current: {}).{}", objArr);
                        }
                        clearCachedProposal();
                        long nanoTime = System.nanoTime();
                        computeCachedProposal(this._allowCapacityEstimationOnProposalPrecompute);
                        this._proposalComputationTimer.update(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                    }
                } catch (KafkaCruiseControlException e) {
                    j = 30000;
                    LOG.debug("Skipping proposal precomputing because there is an ongoing execution.", e);
                }
            } else {
                LOG.info("Skipping proposal precomputing because load monitor does not have enough snapshots.");
                j = 30000;
            }
            long milliseconds = this._time.milliseconds() + j;
            if (!this._shutdown && this._time.milliseconds() < milliseconds) {
                try {
                    Thread.sleep(milliseconds - this._time.milliseconds());
                } catch (InterruptedException e2) {
                }
            }
        }
    }

    private int numProposalComputingThreads() {
        if (this._numPrecomputingThreads > 0) {
            return this._numPrecomputingThreads;
        }
        return 2;
    }

    private void computeCachedProposal(boolean z) {
        long milliseconds = this._time.milliseconds();
        Future<?> submit = this._proposalPrecomputingExecutor.submit(new ProposalCandidateComputer(z));
        boolean z2 = false;
        while (!this._shutdown && !z2) {
            try {
                try {
                    submit.get();
                    z2 = true;
                } catch (InterruptedException e) {
                    LOG.debug("Goal optimizer received exception when precomputing the proposal candidates.", e);
                }
            } catch (ExecutionException e2) {
                LOG.error("Goal optimizer received exception when precomputing the proposal candidates.", e2);
            }
        }
        LOG.info("Finished the precomputation proposal candidates in {} ms", Long.valueOf(this._time.milliseconds() - milliseconds));
    }

    private boolean validCachedProposal() throws KafkaCruiseControlException {
        boolean z;
        if (this._executor.hasOngoingExecution()) {
            throw new KafkaCruiseControlException("Attempt to use proposal cache during ongoing execution.");
        }
        synchronized (this._cacheLock) {
            z = (this._cachedProposals == null || this._cachedProposals.modelGeneration().isStale(this._loadMonitor.clusterModelGeneration())) ? false : true;
        }
        return z;
    }

    public void shutdown() {
        LOG.info("Shutting down goal optimizer.");
        this._shutdown = true;
        this._proposalPrecomputingExecutor.shutdown();
        try {
            this._proposalPrecomputingExecutor.awaitTermination(30000L, TimeUnit.MILLISECONDS);
            if (!this._proposalPrecomputingExecutor.isTerminated()) {
                LOG.warn("The goal optimizer failed to shutdown in 30000 ms.");
            }
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for goal optimizer to shutdown.");
        }
        LOG.info("Goal optimizer shutdown completed.");
    }

    public ModelCompletenessRequirements defaultModelCompletenessRequirements() {
        return this._defaultModelCompletenessRequirements;
    }

    public ModelCompletenessRequirements modelCompletenessRequirementsForPrecomputing() {
        return this._requirementsWithAvailableValidWindows;
    }

    public AnalyzerState state(Cluster cluster) {
        LinkedHashMap linkedHashMap = new LinkedHashMap(this._goalsByPriority.size());
        for (Goal goal : this._goalsByPriority) {
            linkedHashMap.put(goal, Boolean.valueOf(this._loadMonitor.meetCompletenessRequirements(cluster, goal.clusterModelCompletenessRequirements())));
        }
        return new AnalyzerState(this._cachedProposals != null, linkedHashMap);
    }

    private void sanityCheckReadyForGettingCachedProposals() {
        LoadMonitorTaskRunner.LoadMonitorTaskRunnerState taskRunnerState = this._loadMonitor.taskRunnerState();
        if (taskRunnerState == LoadMonitorTaskRunner.LoadMonitorTaskRunnerState.LOADING || taskRunnerState == LoadMonitorTaskRunner.LoadMonitorTaskRunnerState.BOOTSTRAPPING) {
            throw new IllegalStateException("Cannot get proposal because load monitor is in " + taskRunnerState + " state.");
        }
        if (!this._loadMonitor.meetCompletenessRequirements(this._requirementsWithAvailableValidWindows)) {
            throw new IllegalStateException("Cannot get proposal because model completeness is not met.");
        }
    }

    public OptimizerResult optimizations(OperationProgress operationProgress, boolean z) throws InterruptedException, KafkaCruiseControlException {
        OptimizerResult optimizerResult;
        sanityCheckReadyForGettingCachedProposals();
        synchronized (this._cacheLock) {
            if (!validCachedProposal() || (!z && this._cachedProposals.isCapacityEstimated())) {
                if (LOG.isDebugEnabled()) {
                    Logger logger = LOG;
                    Object[] objArr = new Object[5];
                    objArr[0] = this._cachedProposals == null ? null : this._cachedProposals.modelGeneration();
                    objArr[1] = this._loadMonitor.clusterModelGeneration();
                    objArr[2] = this._cachedProposals == null ? null : Boolean.valueOf(this._cachedProposals.isCapacityEstimated());
                    objArr[3] = Boolean.valueOf(z);
                    objArr[4] = this._cachedProposals == null ? "" : String.format(" Cached was excluding default topics: %s.", this._cachedProposals.excludedTopics());
                    logger.debug("Cached proposal result is not usable. Model generation (cached: {}, current: {}). Capacity estimation (cached: {}, allowed: {}).{} Wait for cache update.", objArr);
                }
                clearCachedProposal();
                while (!validCachedProposal()) {
                    try {
                        operationProgress.clear();
                        if (this._numPrecomputingThreads > 0 && (z || !this._allowCapacityEstimationOnProposalPrecompute)) {
                            this._proposalPrecomputingSchedulerThread.interrupt();
                        } else if (!this._hasOngoingExplicitPrecomputation) {
                            this._hasOngoingExplicitPrecomputation = true;
                            this._proposalPrecomputingExecutor.submit(() -> {
                                computeCachedProposal(z);
                            });
                        }
                        operationProgress.refer(this._proposalPrecomputingProgress);
                        this._cacheLock.wait();
                        Exception exc = this._proposalGenerationException.get();
                        if (exc != null) {
                            LOG.error("Cannot create cached proposals due to exception.", exc);
                            throw new KafkaCruiseControlException(exc);
                        }
                    } catch (Throwable th) {
                        Exception exc2 = this._proposalGenerationException.get();
                        if (exc2 == null) {
                            throw th;
                        }
                        LOG.error("Cannot create cached proposals due to exception.", exc2);
                        throw new KafkaCruiseControlException(exc2);
                    }
                }
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Returning optimization result from cache. Model generation (cached: {}, current: {}).", this._cachedProposals.modelGeneration(), this._loadMonitor.clusterModelGeneration());
            }
            optimizerResult = this._cachedProposals;
        }
        return optimizerResult;
    }

    public OptimizerResult optimizations(ClusterModel clusterModel, OperationProgress operationProgress) throws KafkaCruiseControlException {
        return optimizations(clusterModel, this._goalsByPriority, operationProgress);
    }

    public OptimizerResult optimizations(ClusterModel clusterModel, List<Goal> list, OperationProgress operationProgress) throws KafkaCruiseControlException {
        if (clusterModel == null) {
            throw new IllegalArgumentException("The cluster model cannot be null");
        }
        if (list.isEmpty()) {
            throw new IllegalArgumentException("At least one goal must be provided to get an optimization result.");
        }
        if (!clusterModel.isClusterAlive()) {
            throw new IllegalArgumentException("All brokers are dead in the cluster.");
        }
        Set<String> excludedTopics = excludedTopics(clusterModel, null);
        LOG.debug("Topics excluded from partition movement: {}", excludedTopics);
        return optimizations(clusterModel, list, operationProgress, null, this._optimizationOptionsGenerator.optimizationOptionsForCachedProposalCalculation(clusterModel, excludedTopics));
    }

    public OptimizerResult optimizations(ClusterModel clusterModel, List<Goal> list, OperationProgress operationProgress, Map<TopicPartition, List<ReplicaPlacementInfo>> map, OptimizationOptions optimizationOptions) throws KafkaCruiseControlException {
        LOG.trace("Cluster before optimization is {}", clusterModel);
        BrokerStats brokerStats = clusterModel.brokerStats(null);
        Map<TopicPartition, List<ReplicaPlacementInfo>> replicaDistribution = clusterModel.getReplicaDistribution();
        Map<TopicPartition, ReplicaPlacementInfo> leaderDistribution = clusterModel.getLeaderDistribution();
        boolean z = !clusterModel.selfHealingEligibleReplicas().isEmpty();
        HashSet hashSet = new HashSet(list.size());
        HashSet hashSet2 = new HashSet();
        HashSet hashSet3 = new HashSet();
        LinkedHashMap linkedHashMap = new LinkedHashMap(list.size());
        Map<TopicPartition, List<ReplicaPlacementInfo>> map2 = null;
        Map<TopicPartition, ReplicaPlacementInfo> map3 = null;
        for (Goal goal : list) {
            map2 = map2 == null ? replicaDistribution : clusterModel.getReplicaDistribution();
            map3 = map3 == null ? leaderDistribution : clusterModel.getLeaderDistribution();
            OptimizationForGoal optimizationForGoal = new OptimizationForGoal(goal.name());
            operationProgress.addStep(optimizationForGoal);
            LOG.debug("Optimizing goal {}", goal.name());
            boolean optimize = goal.optimize(clusterModel, hashSet, optimizationOptions);
            hashSet.add(goal);
            linkedHashMap.put(goal, clusterModel.getClusterStats(this._balancingConstraint));
            Set<ExecutionProposal> diff = AnalyzerUtils.getDiff(map2, map3, clusterModel);
            if (!diff.isEmpty() || !optimize) {
                hashSet2.add(goal.name());
            }
            if (!optimize) {
                hashSet3.add(goal.name());
            }
            logProgress(z, goal.name(), hashSet.size(), diff);
            optimizationForGoal.done();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Broker level stats after optimization: {}", clusterModel.brokerStats(null));
            }
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Broker level stats after optimization: {}%n", clusterModel.brokerStats(null));
        }
        return new OptimizerResult(linkedHashMap, hashSet2, hashSet3, AnalyzerUtils.getDiff(map != null ? map : replicaDistribution, leaderDistribution, clusterModel, true), brokerStats, clusterModel.brokerStats(null), clusterModel.generation(), clusterModel.getClusterStats(this._balancingConstraint), clusterModel.capacityEstimationInfoByBrokerId(), optimizationOptions, KafkaCruiseControlUtils.balancednessCostByGoal(list, this._priorityWeight, this._strictnessWeight));
    }

    public Set<String> excludedTopics(ClusterModel clusterModel, Pattern pattern) {
        Pattern pattern2 = pattern != null ? pattern : this._defaultExcludedTopics;
        return (Set) clusterModel.topics().stream().filter(str -> {
            return pattern2.matcher(str).matches();
        }).collect(Collectors.toSet());
    }

    private void logProgress(boolean z, String str, int i, Set<ExecutionProposal> set) {
        Logger logger = LOG;
        Object[] objArr = new Object[5];
        objArr[0] = Integer.valueOf(i);
        objArr[1] = Integer.valueOf(this._goalsByPriority.size());
        objArr[2] = Integer.valueOf(set.size());
        objArr[3] = z ? "self-healing " : "";
        objArr[4] = str;
        logger.debug("[{}/{}] Generated {} proposals for {}{}.", objArr);
        Logger logger2 = LOG;
        Object[] objArr2 = new Object[3];
        objArr2[0] = z ? "self-healing " : "";
        objArr2[1] = str;
        objArr2[2] = set;
        logger2.trace("Proposals for {}{}.{}%n", objArr2);
    }

    private OptimizerResult updateCachedProposals(OptimizerResult optimizerResult) {
        OptimizerResult optimizerResult2;
        synchronized (this._cacheLock) {
            this._hasOngoingExplicitPrecomputation = false;
            this._cachedProposals = optimizerResult;
            this._cacheLock.notifyAll();
            optimizerResult2 = this._cachedProposals;
        }
        return optimizerResult2;
    }

    private void clearCachedProposal(Exception exc) {
        synchronized (this._cacheLock) {
            this._cachedProposals = null;
            this._progressUpdateLock.set(false);
            this._proposalPrecomputingProgress.clear();
            this._proposalGenerationException.set(exc);
        }
    }

    private void clearCachedProposal() {
        clearCachedProposal(null);
    }
}
