/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.analyzer;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.AnalyzerState;
import com.linkedin.kafka.cruisecontrol.analyzer.AnalyzerUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptionsGenerator;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizerResult;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.async.progress.OperationProgress;
import com.linkedin.kafka.cruisecontrol.async.progress.OptimizationForGoal;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionProposal;
import com.linkedin.kafka.cruisecontrol.executor.Executor;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.ClusterModelStats;
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils;
import com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner;
import com.linkedin.kafka.cruisecontrol.servlet.response.stats.BrokerStats;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GoalOptimizer
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(GoalOptimizer.class);
    private final List<Goal> _goalsByPriority;
    private final BalancingConstraint _balancingConstraint;
    private final Pattern _defaultExcludedTopics;
    private final LoadMonitor _loadMonitor;
    private final Time _time;
    private final int _numPrecomputingThreads;
    private final long _proposalExpirationMs;
    private final ExecutorService _proposalPrecomputingExecutor;
    private final AtomicBoolean _progressUpdateLock;
    private final AtomicReference<Exception> _proposalGenerationException;
    private final OperationProgress _proposalPrecomputingProgress;
    private final Object _cacheLock;
    private volatile OptimizerResult _cachedProposals;
    private volatile boolean _shutdown = false;
    private Thread _proposalPrecomputingSchedulerThread;
    private final boolean _allowCapacityEstimationOnProposalPrecompute;
    private final Timer _proposalComputationTimer;
    private final ModelCompletenessRequirements _defaultModelCompletenessRequirements;
    private final ModelCompletenessRequirements _requirementsWithAvailableValidWindows;
    private final Executor _executor;
    private volatile boolean _hasOngoingExplicitPrecomputation;
    private final double _priorityWeight;
    private final double _strictnessWeight;
    private final OptimizationOptionsGenerator _optimizationOptionsGenerator;

    public GoalOptimizer(KafkaCruiseControlConfig config, LoadMonitor loadMonitor, Time time, MetricRegistry dropwizardMetricRegistry, Executor executor, AdminClient adminClient) {
        this._goalsByPriority = AnalyzerUtils.getGoalsByPriority(config);
        this._defaultModelCompletenessRequirements = MonitorUtils.combineLoadRequirementOptions(this._goalsByPriority);
        this._requirementsWithAvailableValidWindows = new ModelCompletenessRequirements(1, this._defaultModelCompletenessRequirements.minMonitoredPartitionsPercentage(), this._defaultModelCompletenessRequirements.includeAllTopics());
        this._numPrecomputingThreads = config.getInt("num.proposal.precompute.threads");
        LOG.info("Goals by priority for precomputing: {}", this._goalsByPriority);
        this._balancingConstraint = new BalancingConstraint(config);
        this._defaultExcludedTopics = Pattern.compile(config.getString("topics.excluded.from.partition.movement"));
        this._proposalExpirationMs = config.getLong("proposal.expiration.ms");
        this._proposalPrecomputingExecutor = Executors.newScheduledThreadPool(this.numProposalComputingThreads(), new KafkaCruiseControlThreadFactory("ProposalPrecomputingExecutor", false, LOG));
        this._loadMonitor = loadMonitor;
        this._time = time;
        this._cacheLock = new ReentrantLock();
        this._cachedProposals = null;
        this._progressUpdateLock = new AtomicBoolean(false);
        this._proposalGenerationException = new AtomicReference();
        this._proposalPrecomputingProgress = new OperationProgress();
        this._proposalComputationTimer = dropwizardMetricRegistry.timer(MetricRegistry.name((String)"GoalOptimizer", (String[])new String[]{"proposal-computation-timer"}));
        this._executor = executor;
        this._hasOngoingExplicitPrecomputation = false;
        this._priorityWeight = config.getDouble("goal.balancedness.priority.weight");
        this._strictnessWeight = config.getDouble("goal.balancedness.strictness.weight");
        this._allowCapacityEstimationOnProposalPrecompute = config.getBoolean("allow.capacity.estimation.on.proposal.precompute");
        HashMap<String, Object> overrideConfigs = new HashMap<String, Object>(2);
        overrideConfigs.put("kafka.cruise.control.config.object", (Object)config);
        overrideConfigs.put("admin.client.object", adminClient);
        this._optimizationOptionsGenerator = config.getConfiguredInstance("optimization.options.generator.class", OptimizationOptionsGenerator.class, overrideConfigs);
    }

    @Override
    public void run() {
        this._proposalPrecomputingSchedulerThread = Thread.currentThread();
        LOG.info("Starting proposal candidate computation.");
        while (!this._shutdown && this._numPrecomputingThreads > 0) {
            LoadMonitorTaskRunner.LoadMonitorTaskRunnerState loadMonitorTaskRunnerState = this._loadMonitor.taskRunnerState();
            long sleepTime = this._proposalExpirationMs;
            if (loadMonitorTaskRunnerState == LoadMonitorTaskRunner.LoadMonitorTaskRunnerState.LOADING || loadMonitorTaskRunnerState == LoadMonitorTaskRunner.LoadMonitorTaskRunnerState.BOOTSTRAPPING) {
                LOG.info("Skipping proposal precomputing because load monitor is in " + loadMonitorTaskRunnerState + " state.");
                sleepTime = 30000L;
            } else if (!this._loadMonitor.meetCompletenessRequirements(this._requirementsWithAvailableValidWindows)) {
                LOG.info("Skipping proposal precomputing because load monitor does not have enough snapshots.");
                sleepTime = 30000L;
            } else {
                try {
                    if (!this.validCachedProposal()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Invalidated cache. Model generation (cached: {}, current: {}).{}", new Object[]{this._cachedProposals == null ? null : this._cachedProposals.modelGeneration(), this._loadMonitor.clusterModelGeneration(), this._cachedProposals == null ? "" : String.format(" Cached was excluding default topics: %s.", this._cachedProposals.excludedTopics())});
                        }
                        this.clearCachedProposal();
                        long start = System.nanoTime();
                        this.computeCachedProposal(this._allowCapacityEstimationOnProposalPrecompute);
                        this._proposalComputationTimer.update(System.nanoTime() - start, TimeUnit.NANOSECONDS);
                    } else {
                        LOG.debug("Skipping proposal precomputing because the cached proposal result is still valid. Cached generation: {}", (Object)this._cachedProposals.modelGeneration());
                    }
                }
                catch (KafkaCruiseControlException e) {
                    sleepTime = 30000L;
                    LOG.debug("Skipping proposal precomputing because there is an ongoing execution.", (Throwable)e);
                }
            }
            long deadline = this._time.milliseconds() + sleepTime;
            if (this._shutdown || this._time.milliseconds() >= deadline) continue;
            try {
                Thread.sleep(deadline - this._time.milliseconds());
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    private int numProposalComputingThreads() {
        return this._numPrecomputingThreads > 0 ? this._numPrecomputingThreads : 2;
    }

    private void computeCachedProposal(boolean allowCapacityEstimation) {
        long start = this._time.milliseconds();
        Future<?> future = this._proposalPrecomputingExecutor.submit(new ProposalCandidateComputer(allowCapacityEstimation));
        try {
            boolean done = false;
            while (!this._shutdown && !done) {
                try {
                    future.get();
                    done = true;
                }
                catch (InterruptedException ie) {
                    LOG.debug("Goal optimizer received exception when precomputing the proposal candidates.", (Throwable)ie);
                }
            }
        }
        catch (ExecutionException ee) {
            LOG.error("Goal optimizer received exception when precomputing the proposal candidates.", (Throwable)ee);
        }
        LOG.info("Finished the precomputation proposal candidates in {} ms", (Object)(this._time.milliseconds() - start));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean validCachedProposal() throws KafkaCruiseControlException {
        if (this._executor.hasOngoingExecution()) {
            throw new KafkaCruiseControlException("Attempt to use proposal cache during ongoing execution.");
        }
        Object object = this._cacheLock;
        synchronized (object) {
            return this._cachedProposals != null && !this._cachedProposals.modelGeneration().isStale(this._loadMonitor.clusterModelGeneration());
        }
    }

    public void shutdown() {
        LOG.info("Shutting down goal optimizer.");
        this._shutdown = true;
        this._proposalPrecomputingExecutor.shutdown();
        try {
            this._proposalPrecomputingExecutor.awaitTermination(30000L, TimeUnit.MILLISECONDS);
            if (!this._proposalPrecomputingExecutor.isTerminated()) {
                LOG.warn("The goal optimizer failed to shutdown in 30000 ms.");
            }
        }
        catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for goal optimizer to shutdown.");
        }
        LOG.info("Goal optimizer shutdown completed.");
    }

    public ModelCompletenessRequirements defaultModelCompletenessRequirements() {
        return this._defaultModelCompletenessRequirements;
    }

    public ModelCompletenessRequirements modelCompletenessRequirementsForPrecomputing() {
        return this._requirementsWithAvailableValidWindows;
    }

    public AnalyzerState state(Cluster cluster) {
        LinkedHashMap<Goal, Boolean> goalReadiness = new LinkedHashMap<Goal, Boolean>(this._goalsByPriority.size());
        for (Goal goal : this._goalsByPriority) {
            goalReadiness.put(goal, this._loadMonitor.meetCompletenessRequirements(cluster, goal.clusterModelCompletenessRequirements()));
        }
        return new AnalyzerState(this._cachedProposals != null, goalReadiness);
    }

    private void sanityCheckReadyForGettingCachedProposals() {
        LoadMonitorTaskRunner.LoadMonitorTaskRunnerState loadMonitorTaskRunnerState = this._loadMonitor.taskRunnerState();
        if (loadMonitorTaskRunnerState == LoadMonitorTaskRunner.LoadMonitorTaskRunnerState.LOADING || loadMonitorTaskRunnerState == LoadMonitorTaskRunner.LoadMonitorTaskRunnerState.BOOTSTRAPPING) {
            throw new IllegalStateException("Cannot get proposal because load monitor is in " + loadMonitorTaskRunnerState + " state.");
        }
        if (!this._loadMonitor.meetCompletenessRequirements(this._requirementsWithAvailableValidWindows)) {
            throw new IllegalStateException("Cannot get proposal because model completeness is not met.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public OptimizerResult optimizations(OperationProgress operationProgress, boolean allowCapacityEstimation) throws InterruptedException, KafkaCruiseControlException {
        this.sanityCheckReadyForGettingCachedProposals();
        Object object = this._cacheLock;
        synchronized (object) {
            if (!this.validCachedProposal() || !allowCapacityEstimation && this._cachedProposals.isCapacityEstimated()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Cached proposal result is not usable. Model generation (cached: {}, current: {}). Capacity estimation (cached: {}, allowed: {}).{} Wait for cache update.", new Object[]{this._cachedProposals == null ? null : this._cachedProposals.modelGeneration(), this._loadMonitor.clusterModelGeneration(), this._cachedProposals == null ? null : Boolean.valueOf(this._cachedProposals.isCapacityEstimated()), allowCapacityEstimation, this._cachedProposals == null ? "" : String.format(" Cached was excluding default topics: %s.", this._cachedProposals.excludedTopics())});
                }
                this.clearCachedProposal();
                while (!this.validCachedProposal()) {
                    try {
                        operationProgress.clear();
                        if (this._numPrecomputingThreads > 0 && (allowCapacityEstimation || !this._allowCapacityEstimationOnProposalPrecompute)) {
                            this._proposalPrecomputingSchedulerThread.interrupt();
                        } else if (!this._hasOngoingExplicitPrecomputation) {
                            this._hasOngoingExplicitPrecomputation = true;
                            this._proposalPrecomputingExecutor.submit(() -> this.computeCachedProposal(allowCapacityEstimation));
                        }
                        operationProgress.refer(this._proposalPrecomputingProgress);
                        this._cacheLock.wait();
                    }
                    finally {
                        Exception proposalGenerationException = this._proposalGenerationException.get();
                        if (proposalGenerationException == null) continue;
                        LOG.error("Cannot create cached proposals due to exception.", (Throwable)proposalGenerationException);
                        throw new KafkaCruiseControlException(proposalGenerationException);
                    }
                }
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Returning optimization result from cache. Model generation (cached: {}, current: {}).", (Object)this._cachedProposals.modelGeneration(), (Object)this._loadMonitor.clusterModelGeneration());
            }
            return this._cachedProposals;
        }
    }

    public OptimizerResult optimizations(ClusterModel clusterModel, OperationProgress operationProgress) throws KafkaCruiseControlException {
        return this.optimizations(clusterModel, this._goalsByPriority, operationProgress);
    }

    public OptimizerResult optimizations(ClusterModel clusterModel, List<Goal> goalsByPriority, OperationProgress operationProgress) throws KafkaCruiseControlException {
        if (clusterModel == null) {
            throw new IllegalArgumentException("The cluster model cannot be null");
        }
        if (goalsByPriority.isEmpty()) {
            throw new IllegalArgumentException("At least one goal must be provided to get an optimization result.");
        }
        if (!clusterModel.isClusterAlive()) {
            throw new IllegalArgumentException("All brokers are dead in the cluster.");
        }
        Set<String> excludedTopics = this.excludedTopics(clusterModel, null);
        LOG.debug("Topics excluded from partition movement: {}", excludedTopics);
        OptimizationOptions optimizationOptions = this._optimizationOptionsGenerator.optimizationOptionsForCachedProposalCalculation(clusterModel, excludedTopics);
        return this.optimizations(clusterModel, goalsByPriority, operationProgress, null, optimizationOptions);
    }

    public OptimizerResult optimizations(ClusterModel clusterModel, List<Goal> goalsByPriority, OperationProgress operationProgress, Map<TopicPartition, List<ReplicaPlacementInfo>> initReplicaDistributionForProposalGeneration, OptimizationOptions optimizationOptions) throws KafkaCruiseControlException {
        LOG.trace("Cluster before optimization is {}", (Object)clusterModel);
        BrokerStats brokerStatsBeforeOptimization = clusterModel.brokerStats(null);
        Map<TopicPartition, List<ReplicaPlacementInfo>> initReplicaDistribution = clusterModel.getReplicaDistribution();
        Map<TopicPartition, ReplicaPlacementInfo> initLeaderDistribution = clusterModel.getLeaderDistribution();
        boolean isSelfHealing = !clusterModel.selfHealingEligibleReplicas().isEmpty();
        HashSet<Goal> optimizedGoals = new HashSet<Goal>(goalsByPriority.size());
        HashSet<String> violatedGoalNamesBeforeOptimization = new HashSet<String>();
        HashSet<String> violatedGoalNamesAfterOptimization = new HashSet<String>();
        LinkedHashMap<Goal, ClusterModelStats> statsByGoalPriority = new LinkedHashMap<Goal, ClusterModelStats>(goalsByPriority.size());
        Map<TopicPartition, List<ReplicaPlacementInfo>> preOptimizedReplicaDistribution = null;
        Map<TopicPartition, ReplicaPlacementInfo> preOptimizedLeaderDistribution = null;
        for (Goal goal : goalsByPriority) {
            preOptimizedReplicaDistribution = preOptimizedReplicaDistribution == null ? initReplicaDistribution : clusterModel.getReplicaDistribution();
            preOptimizedLeaderDistribution = preOptimizedLeaderDistribution == null ? initLeaderDistribution : clusterModel.getLeaderDistribution();
            OptimizationForGoal step = new OptimizationForGoal(goal.name());
            operationProgress.addStep(step);
            LOG.debug("Optimizing goal {}", (Object)goal.name());
            boolean succeeded = goal.optimize(clusterModel, optimizedGoals, optimizationOptions);
            optimizedGoals.add(goal);
            statsByGoalPriority.put(goal, clusterModel.getClusterStats(this._balancingConstraint));
            Set<ExecutionProposal> goalProposals = AnalyzerUtils.getDiff(preOptimizedReplicaDistribution, preOptimizedLeaderDistribution, clusterModel);
            if (!goalProposals.isEmpty() || !succeeded) {
                violatedGoalNamesBeforeOptimization.add(goal.name());
            }
            if (!succeeded) {
                violatedGoalNamesAfterOptimization.add(goal.name());
            }
            this.logProgress(isSelfHealing, goal.name(), optimizedGoals.size(), goalProposals);
            step.done();
            if (!LOG.isDebugEnabled()) continue;
            LOG.debug("Broker level stats after optimization: {}", (Object)clusterModel.brokerStats(null));
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Broker level stats after optimization: {}%n", (Object)clusterModel.brokerStats(null));
        }
        Set<ExecutionProposal> proposals = AnalyzerUtils.getDiff(initReplicaDistributionForProposalGeneration != null ? initReplicaDistributionForProposalGeneration : initReplicaDistribution, initLeaderDistribution, clusterModel, true);
        return new OptimizerResult(statsByGoalPriority, violatedGoalNamesBeforeOptimization, violatedGoalNamesAfterOptimization, proposals, brokerStatsBeforeOptimization, clusterModel.brokerStats(null), clusterModel.generation(), clusterModel.getClusterStats(this._balancingConstraint), clusterModel.capacityEstimationInfoByBrokerId(), optimizationOptions, KafkaCruiseControlUtils.balancednessCostByGoal(goalsByPriority, this._priorityWeight, this._strictnessWeight));
    }

    public Set<String> excludedTopics(ClusterModel clusterModel, Pattern requestedExcludedTopics) {
        Pattern topicsToExclude = requestedExcludedTopics != null ? requestedExcludedTopics : this._defaultExcludedTopics;
        return clusterModel.topics().stream().filter(topic -> topicsToExclude.matcher((CharSequence)topic).matches()).collect(Collectors.toSet());
    }

    private void logProgress(boolean isSelfHeal, String goalName, int numOptimizedGoals, Set<ExecutionProposal> proposals) {
        LOG.debug("[{}/{}] Generated {} proposals for {}{}.", new Object[]{numOptimizedGoals, this._goalsByPriority.size(), proposals.size(), isSelfHeal ? "self-healing " : "", goalName});
        LOG.trace("Proposals for {}{}.{}%n", new Object[]{isSelfHeal ? "self-healing " : "", goalName, proposals});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private OptimizerResult updateCachedProposals(OptimizerResult result) {
        Object object = this._cacheLock;
        synchronized (object) {
            this._hasOngoingExplicitPrecomputation = false;
            this._cachedProposals = result;
            this._cacheLock.notifyAll();
            return this._cachedProposals;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void clearCachedProposal(Exception e) {
        Object object = this._cacheLock;
        synchronized (object) {
            this._cachedProposals = null;
            this._progressUpdateLock.set(false);
            this._proposalPrecomputingProgress.clear();
            this._proposalGenerationException.set(e);
        }
    }

    private void clearCachedProposal() {
        this.clearCachedProposal(null);
    }

    private class ProposalCandidateComputer
    implements Runnable {
        private final boolean _allowCapacityEstimation;

        ProposalCandidateComputer(boolean allowCapacityEstimation) {
            this._allowCapacityEstimation = allowCapacityEstimation;
        }

        @Override
        public void run() {
            LOG.debug("Starting proposal candidate computer.");
            if (GoalOptimizer.this._loadMonitor == null) {
                LOG.warn("No load monitor available. Skip computing proposal candidate.");
                return;
            }
            OperationProgress operationProgress = GoalOptimizer.this._progressUpdateLock.compareAndSet(false, true) ? GoalOptimizer.this._proposalPrecomputingProgress : new OperationProgress();
            try (LoadMonitor.AutoCloseableSemaphore ignored = GoalOptimizer.this._loadMonitor.acquireForModelGeneration(operationProgress);){
                long startMs = GoalOptimizer.this._time.milliseconds();
                ModelCompletenessRequirements requirements = GoalOptimizer.this._loadMonitor.meetCompletenessRequirements(GoalOptimizer.this._defaultModelCompletenessRequirements) ? GoalOptimizer.this._defaultModelCompletenessRequirements : GoalOptimizer.this._requirementsWithAvailableValidWindows;
                ClusterModel clusterModel = GoalOptimizer.this._loadMonitor.clusterModel(GoalOptimizer.this._time.milliseconds(), requirements, this._allowCapacityEstimation, operationProgress);
                if (!clusterModel.topics().isEmpty()) {
                    OptimizerResult result = GoalOptimizer.this.optimizations(clusterModel, GoalOptimizer.this._goalsByPriority, operationProgress);
                    LOG.debug("Generated a proposal candidate in {} ms.", (Object)(GoalOptimizer.this._time.milliseconds() - startMs));
                    GoalOptimizer.this.updateCachedProposals(result);
                } else {
                    LOG.warn("The cluster model does not have valid topics, skipping proposal precomputation.");
                }
            }
            catch (OptimizationFailureException ofe) {
                LOG.warn("Detected unfixable proposal optimization", (Throwable)ofe);
                this.exceptionHandler(ofe);
            }
            catch (Exception e) {
                LOG.error("Proposal precomputation encountered error", (Throwable)e);
                this.exceptionHandler(e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void exceptionHandler(Exception e) {
            GoalOptimizer.this.clearCachedProposal(e);
            Object object = GoalOptimizer.this._cacheLock;
            synchronized (object) {
                GoalOptimizer.this._hasOngoingExplicitPrecomputation = false;
                GoalOptimizer.this._cacheLock.notifyAll();
            }
        }
    }
}

