/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol;

import com.codahale.metrics.MetricRegistry;
import com.linkedin.cruisecontrol.detector.AnomalyType;
import com.linkedin.cruisecontrol.exception.NotEnoughValidWindowsException;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.AnalyzerState;
import com.linkedin.kafka.cruisecontrol.analyzer.GoalOptimizer;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizerResult;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.async.progress.OperationProgress;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.TopicConfigProvider;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorManager;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorState;
import com.linkedin.kafka.cruisecontrol.exception.BrokerCapacityResolutionException;
import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException;
import com.linkedin.kafka.cruisecontrol.exception.OngoingExecutionException;
import com.linkedin.kafka.cruisecontrol.executor.ConcurrencyType;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionProposal;
import com.linkedin.kafka.cruisecontrol.executor.Executor;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import com.linkedin.kafka.cruisecontrol.executor.strategy.ReplicaMovementStrategy;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.ModelParameters;
import com.linkedin.kafka.cruisecontrol.model.ModelUtils;
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitorState;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils;
import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef;
import com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner;
import com.linkedin.kafka.cruisecontrol.servlet.UserTaskManager;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable.RunnableUtils;
import com.linkedin.kafka.cruisecontrol.servlet.response.stats.BrokerStats;
import java.io.InputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaCruiseControl {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaCruiseControl.class);
    protected final KafkaCruiseControlConfig _config;
    private final LoadMonitor _loadMonitor;
    private final GoalOptimizer _goalOptimizer;
    private final ExecutorService _goalOptimizerExecutor;
    private final Executor _executor;
    private final AnomalyDetectorManager _anomalyDetectorManager;
    private final Time _time;
    private final AdminClient _adminClient;
    private static final String VERSION;
    private static final String COMMIT_ID;
    private static final boolean FORCE_PAUSE_SAMPLING = false;

    public KafkaCruiseControl(KafkaCruiseControlConfig config, MetricRegistry dropwizardMetricRegistry) {
        this._config = config;
        this._time = Time.SYSTEM;
        ModelUtils.init(config);
        ModelParameters.init(config);
        this._adminClient = KafkaCruiseControlUtils.createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(config));
        this._anomalyDetectorManager = new AnomalyDetectorManager(this, this._time, dropwizardMetricRegistry);
        this._executor = new Executor(config, this._time, dropwizardMetricRegistry, this._anomalyDetectorManager);
        this._loadMonitor = new LoadMonitor(config, this._time, dropwizardMetricRegistry, KafkaMetricDef.commonMetricDef());
        this._goalOptimizerExecutor = Executors.newSingleThreadExecutor(new KafkaCruiseControlThreadFactory("GoalOptimizerExecutor", true, null));
        this._goalOptimizer = new GoalOptimizer(config, this._loadMonitor, this._time, dropwizardMetricRegistry, this._executor, this._adminClient);
    }

    KafkaCruiseControl(KafkaCruiseControlConfig config, Time time, AnomalyDetectorManager anomalyDetectorManager, Executor executor, LoadMonitor loadMonitor, ExecutorService goalOptimizerExecutor, GoalOptimizer goalOptimizer) {
        this._config = config;
        this._time = time;
        this._adminClient = KafkaCruiseControlUtils.createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(config));
        this._anomalyDetectorManager = anomalyDetectorManager;
        this._executor = executor;
        this._loadMonitor = loadMonitor;
        this._goalOptimizerExecutor = goalOptimizerExecutor;
        this._goalOptimizer = goalOptimizer;
    }

    public LoadMonitor loadMonitor() {
        return this._loadMonitor;
    }

    public MetadataClient.ClusterAndGeneration refreshClusterAndGeneration() {
        return this._loadMonitor.refreshClusterAndGeneration();
    }

    public LoadMonitorTaskRunner.LoadMonitorTaskRunnerState getLoadMonitorTaskRunnerState() {
        return this._loadMonitor.taskRunnerState();
    }

    public AdminClient adminClient() {
        return this._adminClient;
    }

    public LoadMonitor.AutoCloseableSemaphore acquireForModelGeneration(OperationProgress operationProgress) throws InterruptedException {
        return this._loadMonitor.acquireForModelGeneration(operationProgress);
    }

    public long timeMs() {
        return this._time.milliseconds();
    }

    public void sleep(long ms) {
        this._time.sleep(ms);
    }

    public void startUp() {
        LOG.info("Starting Kafka Cruise Control...");
        this._loadMonitor.startUp();
        this._anomalyDetectorManager.startDetection();
        this._goalOptimizerExecutor.submit(this._goalOptimizer);
        LOG.info("Kafka Cruise Control started.");
    }

    public void shutdown() {
        Thread t = new Thread(){

            @Override
            public void run() {
                LOG.info("Shutting down Kafka Cruise Control...");
                KafkaCruiseControl.this._loadMonitor.shutdown();
                KafkaCruiseControl.this._executor.shutdown();
                KafkaCruiseControl.this._anomalyDetectorManager.shutdown();
                KafkaCruiseControl.this._goalOptimizer.shutdown();
                KafkaCruiseControlUtils.closeAdminClientWithTimeout(KafkaCruiseControl.this._adminClient);
                LOG.info("Kafka Cruise Control shutdown completed.");
            }
        };
        t.setDaemon(true);
        t.start();
        try {
            t.join(30000L);
        }
        catch (InterruptedException e) {
            LOG.warn("Cruise Control failed to shutdown in 30 seconds. Exit.");
        }
    }

    public void setUserTaskManagerInExecutor(UserTaskManager userTaskManager) {
        this._executor.setUserTaskManager(userTaskManager);
    }

    public void sanityCheckDryRun(boolean dryRun, boolean stopOngoingExecution) {
        if (dryRun) {
            return;
        }
        if (this.hasOngoingExecution()) {
            if (!stopOngoingExecution) {
                throw new IllegalStateException(String.format("Cannot start a new execution while there is an ongoing execution. Please use %s=true to stop ongoing execution and start a new one.", "stop_ongoing_execution"));
            }
        } else {
            boolean hasOngoingPartitionReassignments;
            try {
                hasOngoingPartitionReassignments = this._executor.hasOngoingPartitionReassignments();
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                throw new IllegalStateException("Cannot execute new proposals due to failure to retrieve whether the Kafka cluster has an already ongoing partition reassignment.", e);
            }
            if (hasOngoingPartitionReassignments) {
                throw new IllegalStateException("Cannot execute new proposals while there are ongoing partition reassignments initiated by external agent.");
            }
            if (this._executor.hasOngoingLeaderElection()) {
                throw new IllegalStateException("Cannot execute new proposals while there are ongoing leadership reassignments initiated by external agent.");
            }
        }
    }

    public boolean hasOngoingExecution() {
        return this._executor.hasOngoingExecution();
    }

    public boolean modifyOngoingExecution(boolean modify) {
        return this._executor.modifyOngoingExecution(modify);
    }

    public BrokerStats cachedBrokerLoadStats(boolean allowCapacityEstimation) {
        return this._loadMonitor.cachedBrokerLoadStats(allowCapacityEstimation);
    }

    public ClusterModel clusterModel(ModelCompletenessRequirements requirements, boolean allowCapacityEstimation, OperationProgress operationProgress) throws NotEnoughValidWindowsException, TimeoutException, BrokerCapacityResolutionException {
        return this._loadMonitor.clusterModel(this.timeMs(), requirements, allowCapacityEstimation, operationProgress);
    }

    public ClusterModel clusterModel(long from, long to, ModelCompletenessRequirements requirements, boolean populateReplicaPlacementInfo, boolean allowCapacityEstimation, OperationProgress operationProgress) throws NotEnoughValidWindowsException, TimeoutException, BrokerCapacityResolutionException {
        return this._loadMonitor.clusterModel(from, to, requirements, populateReplicaPlacementInfo, allowCapacityEstimation, operationProgress);
    }

    public ClusterModel clusterCapacity() throws TimeoutException, BrokerCapacityResolutionException {
        return this._loadMonitor.clusterCapacity();
    }

    public void bootstrap(Long startMs, Long endMs, boolean clearMetrics) {
        if (startMs != null && endMs != null) {
            this._loadMonitor.bootstrap(startMs, endMs, clearMetrics);
        } else if (startMs != null) {
            this._loadMonitor.bootstrap(startMs, clearMetrics);
        } else {
            this._loadMonitor.bootstrap(clearMetrics);
        }
    }

    public void pauseMetricSampling(String reason) {
        this._loadMonitor.pauseMetricSampling(reason, false);
    }

    public void train(Long startMs, Long endMs) {
        this._loadMonitor.train(startMs, endMs);
    }

    public boolean setSelfHealingFor(AnomalyType anomalyType, boolean isSelfHealingEnabled) {
        return this._anomalyDetectorManager.setSelfHealingFor(anomalyType, isSelfHealingEnabled);
    }

    public boolean setConcurrencyAdjusterFor(ConcurrencyType concurrencyType, boolean isConcurrencyAdjusterEnabled) {
        return this._executor.setConcurrencyAdjusterFor(concurrencyType, isConcurrencyAdjusterEnabled);
    }

    public boolean dropRecentBrokers(Set<Integer> brokersToDrop, boolean isRemoved) {
        return isRemoved ? this._executor.dropRecentlyRemovedBrokers(brokersToDrop) : this._executor.dropRecentlyDemotedBrokers(brokersToDrop);
    }

    public void addRecentBrokersPermanently(Set<Integer> brokersToAdd, boolean isRemoved) {
        if (isRemoved) {
            this._executor.addRecentlyRemovedBrokers(brokersToAdd);
        } else {
            this._executor.addRecentlyDemotedBrokers(brokersToAdd);
        }
    }

    public Set<Integer> recentBrokers(boolean isRemoved) {
        return isRemoved ? this._executor.recentlyRemovedBrokers() : this._executor.recentlyDemotedBrokers();
    }

    public void setRequestedExecutionProgressCheckIntervalMs(Long requestedExecutionProgressCheckIntervalMs) {
        this._executor.setRequestedExecutionProgressCheckIntervalMs(requestedExecutionProgressCheckIntervalMs);
    }

    public void setRequestedInterBrokerPartitionMovementConcurrency(Integer requestedInterBrokerPartitionMovementConcurrency) {
        this._executor.setRequestedInterBrokerPartitionMovementConcurrency(requestedInterBrokerPartitionMovementConcurrency);
    }

    public void setRequestedIntraBrokerPartitionMovementConcurrency(Integer requestedIntraBrokerPartitionMovementConcurrency) {
        this._executor.setRequestedIntraBrokerPartitionMovementConcurrency(requestedIntraBrokerPartitionMovementConcurrency);
    }

    public void setRequestedLeadershipMovementConcurrency(Integer requestedLeadershipMovementConcurrency) {
        this._executor.setRequestedLeadershipMovementConcurrency(requestedLeadershipMovementConcurrency);
    }

    public void resumeMetricSampling(String reason) {
        this._loadMonitor.resumeMetricSampling(reason);
    }

    public OptimizerResult getProposals(OperationProgress operationProgress, boolean allowCapacityEstimation) throws KafkaCruiseControlException {
        try {
            return this._goalOptimizer.optimizations(operationProgress, allowCapacityEstimation);
        }
        catch (InterruptedException ie) {
            throw new KafkaCruiseControlException("Interrupted when getting the optimization proposals", ie);
        }
    }

    public boolean ignoreProposalCache(List<String> goals, ModelCompletenessRequirements requirements, Pattern excludedTopics, boolean excludeBrokers, boolean ignoreProposalCache, boolean isTriggeredByGoalViolation, Set<Integer> requestedDestinationBrokerIds, boolean isRebalanceDiskMode) {
        ModelCompletenessRequirements requirementsForCache = this._goalOptimizer.modelCompletenessRequirementsForPrecomputing();
        boolean hasWeakerRequirement = requirementsForCache.minMonitoredPartitionsPercentage() > requirements.minMonitoredPartitionsPercentage() || requirementsForCache.minRequiredNumWindows() > requirements.minRequiredNumWindows() || requirementsForCache.includeAllTopics() && !requirements.includeAllTopics();
        return this.hasOngoingExecution() || ignoreProposalCache || goals != null && !goals.isEmpty() || hasWeakerRequirement || excludedTopics != null || excludeBrokers || isTriggeredByGoalViolation || !requestedDestinationBrokerIds.isEmpty() || isRebalanceDiskMode || RunnableUtils.partitionWithOfflineReplicas(this.kafkaCluster()) != null;
    }

    public synchronized OptimizerResult optimizations(ClusterModel clusterModel, List<Goal> goalsByPriority, OperationProgress operationProgress, Map<TopicPartition, List<ReplicaPlacementInfo>> initReplicaDistribution, OptimizationOptions optimizationOptions) throws KafkaCruiseControlException {
        return this._goalOptimizer.optimizations(clusterModel, goalsByPriority, operationProgress, initReplicaDistribution, optimizationOptions);
    }

    public Set<String> excludedTopics(ClusterModel clusterModel, Pattern requestedExcludedTopics) {
        return this._goalOptimizer.excludedTopics(clusterModel, requestedExcludedTopics);
    }

    public KafkaCruiseControlConfig config() {
        return this._config;
    }

    private static boolean hasProposalsToExecute(Collection<ExecutionProposal> proposals, String uuid) {
        if (proposals.isEmpty()) {
            LOG.info("Goals used in proposal generation for UUID {} are already satisfied.", (Object)uuid);
            return false;
        }
        return true;
    }

    public void executeProposals(Set<ExecutionProposal> proposals, Set<Integer> unthrottledBrokers, boolean isKafkaAssignerMode, Integer concurrentInterBrokerPartitionMovements, Integer concurrentIntraBrokerPartitionMovements, Integer concurrentLeaderMovements, Long executionProgressCheckIntervalMs, ReplicaMovementStrategy replicaMovementStrategy, Long replicationThrottle, boolean isTriggeredByUserRequest, String uuid, boolean skipInterBrokerReplicaConcurrencyAdjustment) throws OngoingExecutionException {
        if (KafkaCruiseControl.hasProposalsToExecute(proposals, uuid)) {
            this._executor.executeProposals(proposals, unthrottledBrokers, null, this._loadMonitor, concurrentInterBrokerPartitionMovements, concurrentIntraBrokerPartitionMovements, concurrentLeaderMovements, executionProgressCheckIntervalMs, replicaMovementStrategy, replicationThrottle, isTriggeredByUserRequest, uuid, isKafkaAssignerMode, skipInterBrokerReplicaConcurrencyAdjustment);
        } else {
            this.failGeneratingProposalsForExecution(uuid);
        }
    }

    public void executeRemoval(Set<ExecutionProposal> proposals, boolean throttleDecommissionedBroker, Set<Integer> removedBrokers, boolean isKafkaAssignerMode, Integer concurrentInterBrokerPartitionMovements, Integer concurrentLeaderMovements, Long executionProgressCheckIntervalMs, ReplicaMovementStrategy replicaMovementStrategy, Long replicationThrottle, boolean isTriggeredByUserRequest, String uuid) throws OngoingExecutionException {
        if (KafkaCruiseControl.hasProposalsToExecute(proposals, uuid)) {
            this._executor.executeProposals(proposals, throttleDecommissionedBroker ? Collections.emptySet() : removedBrokers, removedBrokers, this._loadMonitor, concurrentInterBrokerPartitionMovements, 0, concurrentLeaderMovements, executionProgressCheckIntervalMs, replicaMovementStrategy, replicationThrottle, isTriggeredByUserRequest, uuid, isKafkaAssignerMode, false);
        } else {
            this.failGeneratingProposalsForExecution(uuid);
        }
    }

    public void executeDemotion(Set<ExecutionProposal> proposals, Set<Integer> demotedBrokers, Integer concurrentLeaderMovements, int brokerCount, Long executionProgressCheckIntervalMs, ReplicaMovementStrategy replicaMovementStrategy, Long replicationThrottle, boolean isTriggeredByUserRequest, String uuid) throws OngoingExecutionException {
        if (KafkaCruiseControl.hasProposalsToExecute(proposals, uuid)) {
            int concurrentSwaps = concurrentLeaderMovements != null ? concurrentLeaderMovements : this._config.getInt("num.concurrent.leader.movements");
            concurrentSwaps = Math.min(this._config.getInt("max.num.cluster.movements") / brokerCount, concurrentSwaps);
            this._executor.executeDemoteProposals(proposals, demotedBrokers, this._loadMonitor, concurrentSwaps, concurrentLeaderMovements, executionProgressCheckIntervalMs, replicaMovementStrategy, replicationThrottle, isTriggeredByUserRequest, uuid);
        } else {
            this.failGeneratingProposalsForExecution(uuid);
        }
    }

    public void userTriggeredStopExecution(boolean forceExecutionStop) {
        this._executor.userTriggeredStopExecution(forceExecutionStop);
    }

    public void setGeneratingProposalsForExecution(String uuid, Supplier<String> reasonSupplier, boolean isTriggeredByUserRequest) throws OngoingExecutionException {
        this._executor.setGeneratingProposalsForExecution(uuid, reasonSupplier, isTriggeredByUserRequest);
    }

    public synchronized void failGeneratingProposalsForExecution(String uuid) {
        this._executor.failGeneratingProposalsForExecution(uuid);
    }

    public long executionProgressCheckIntervalMs() {
        return this._executor.executionProgressCheckIntervalMs();
    }

    public ExecutorState.State executionState() {
        return this.executorState().state();
    }

    public ExecutorState executorState() {
        return this._executor.state();
    }

    public LoadMonitorState monitorState(Cluster cluster) {
        return this._loadMonitor.state(cluster);
    }

    public AnalyzerState analyzerState(Cluster cluster) {
        return this._goalOptimizer.state(cluster);
    }

    public AnomalyDetectorState anomalyDetectorState() {
        return this._anomalyDetectorManager.anomalyDetectorState();
    }

    public Cluster kafkaCluster() {
        return this._loadMonitor.kafkaCluster();
    }

    public TopicConfigProvider topicConfigProvider() {
        return this._loadMonitor.topicConfigProvider();
    }

    public static String cruiseControlVersion() {
        return VERSION;
    }

    public static String cruiseControlCommitId() {
        return COMMIT_ID;
    }

    public ModelCompletenessRequirements modelCompletenessRequirements(Collection<Goal> goals) {
        return goals == null || goals.isEmpty() ? this._goalOptimizer.defaultModelCompletenessRequirements() : MonitorUtils.combineLoadRequirementOptions(goals);
    }

    public boolean meetCompletenessRequirements(List<String> goals) {
        MetadataClient.ClusterAndGeneration clusterAndGeneration = this._loadMonitor.refreshClusterAndGeneration();
        return KafkaCruiseControlUtils.goalsByPriority(goals, this._config).stream().allMatch(g -> this._loadMonitor.meetCompletenessRequirements(clusterAndGeneration.cluster(), g.clusterModelCompletenessRequirements()));
    }

    public void sanityCheckBrokerPresence(Set<Integer> brokerIds) {
        Cluster cluster = this._loadMonitor.refreshClusterAndGeneration().cluster();
        Set invalidBrokerIds = brokerIds.stream().filter(id -> cluster.nodeById(id.intValue()) == null).collect(Collectors.toSet());
        if (!invalidBrokerIds.isEmpty()) {
            throw new IllegalArgumentException(String.format("Broker %s does not exist.", invalidBrokerIds));
        }
    }

    static {
        Properties props = new Properties();
        try (InputStream resourceStream = KafkaCruiseControl.class.getResourceAsStream("/cruise-control/cruise-control-version.properties");){
            props.load(resourceStream);
        }
        catch (Exception e) {
            LOG.warn("Error while loading cruise-control-version.properties :" + e.getMessage());
        }
        VERSION = props.getProperty("version", "unknown").trim();
        COMMIT_ID = props.getProperty("commitId", "unknown").trim();
        LOG.info("COMMIT INFO: " + VERSION + "---" + COMMIT_ID);
    }
}

