package com.linkedin.kafka.cruisecontrol.monitor.task;

import com.codahale.metrics.MetricRegistry;
import com.linkedin.cruisecontrol.metricdef.MetricDef;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigResolver;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.constants.MonitorConfig;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricFetcherManager;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.SampleStore;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaBrokerMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaPartitionMetricSampleAggregator;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/task/LoadMonitorTaskRunner.class */
public class LoadMonitorTaskRunner {
    private static final Logger LOG = LoggerFactory.getLogger(LoadMonitorTaskRunner.class);
    public static final MetricSampler.SamplingMode DEFAULT_SAMPLING_MODE = MetricSampler.SamplingMode.ALL;
    private final Time _time;
    private final MetricFetcherManager _metricFetcherManager;
    private final KafkaPartitionMetricSampleAggregator _partitionMetricSampleAggregator;
    private final KafkaBrokerMetricSampleAggregator _brokerMetricSampleAggregator;
    private final MetadataClient _metadataClient;
    private final SampleStore _sampleStore;
    private final ScheduledExecutorService _samplingScheduler;
    private final long _samplingIntervalMs;
    private final int _configuredNumWindows;
    private final long _configuredWindowMs;
    private final AtomicReference<LoadMonitorTaskRunnerState> _state;
    private volatile double _bootstrapProgress;
    private volatile boolean _awaitingPauseSampling;
    private volatile String _reasonOfLatestPauseOrResume;
    private volatile MetricSampler.SamplingMode _samplingMode;

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/task/LoadMonitorTaskRunner$LoadMonitorTaskRunnerState.class */
    public enum LoadMonitorTaskRunnerState {
        NOT_STARTED,
        RUNNING,
        PAUSED,
        SAMPLING,
        BOOTSTRAPPING,
        TRAINING,
        LOADING
    }

    public LoadMonitorTaskRunner(KafkaCruiseControlConfig kafkaCruiseControlConfig, KafkaPartitionMetricSampleAggregator kafkaPartitionMetricSampleAggregator, KafkaBrokerMetricSampleAggregator kafkaBrokerMetricSampleAggregator, MetadataClient metadataClient, MetricDef metricDef, Time time, MetricRegistry metricRegistry, BrokerCapacityConfigResolver brokerCapacityConfigResolver) {
        this(kafkaCruiseControlConfig, new MetricFetcherManager(kafkaCruiseControlConfig, kafkaPartitionMetricSampleAggregator, kafkaBrokerMetricSampleAggregator, metadataClient, metricDef, time, metricRegistry, brokerCapacityConfigResolver), kafkaPartitionMetricSampleAggregator, kafkaBrokerMetricSampleAggregator, metadataClient, time);
    }

    LoadMonitorTaskRunner(KafkaCruiseControlConfig kafkaCruiseControlConfig, MetricFetcherManager metricFetcherManager, KafkaPartitionMetricSampleAggregator kafkaPartitionMetricSampleAggregator, KafkaBrokerMetricSampleAggregator kafkaBrokerMetricSampleAggregator, MetadataClient metadataClient, Time time) {
        this._time = time;
        this._metricFetcherManager = metricFetcherManager;
        this._partitionMetricSampleAggregator = kafkaPartitionMetricSampleAggregator;
        this._brokerMetricSampleAggregator = kafkaBrokerMetricSampleAggregator;
        this._metadataClient = metadataClient;
        this._sampleStore = (SampleStore) kafkaCruiseControlConfig.getConfiguredInstance(MonitorConfig.SAMPLE_STORE_CLASS_CONFIG, SampleStore.class);
        long longValue = kafkaCruiseControlConfig.getLong(MonitorConfig.METRIC_SAMPLING_INTERVAL_MS_CONFIG).longValue();
        this._samplingScheduler = Executors.newScheduledThreadPool(2, new KafkaCruiseControlThreadFactory("SamplingScheduler", true, LOG));
        this._samplingIntervalMs = longValue;
        this._configuredNumWindows = kafkaCruiseControlConfig.getInt(MonitorConfig.NUM_PARTITION_METRICS_WINDOWS_CONFIG).intValue();
        this._configuredWindowMs = kafkaCruiseControlConfig.getLong(MonitorConfig.PARTITION_METRICS_WINDOW_MS_CONFIG).longValue();
        this._state = new AtomicReference<>(LoadMonitorTaskRunnerState.NOT_STARTED);
        this._bootstrapProgress = -1.0d;
        this._awaitingPauseSampling = false;
        this._reasonOfLatestPauseOrResume = null;
        this._samplingMode = DEFAULT_SAMPLING_MODE;
    }

    public void bootstrap(long j, long j2, boolean z) {
        if (!this._state.compareAndSet(LoadMonitorTaskRunnerState.RUNNING, LoadMonitorTaskRunnerState.BOOTSTRAPPING)) {
            throw new IllegalStateException("Cannot bootstrap because the load monitor in " + this._state.get() + " state.");
        }
        this._samplingScheduler.submit(new BootstrapTask(j, j2, z, this._metadataClient, this._partitionMetricSampleAggregator, this, this._metricFetcherManager, this._sampleStore, this._configuredNumWindows, this._configuredWindowMs, this._samplingIntervalMs, this._time));
    }

    public void bootstrap(long j, boolean z) {
        if (!this._state.compareAndSet(LoadMonitorTaskRunnerState.RUNNING, LoadMonitorTaskRunnerState.BOOTSTRAPPING)) {
            throw new IllegalStateException("Cannot bootstrap because the load monitor in " + this._state.get() + " state.");
        }
        this._samplingScheduler.submit(new BootstrapTask(j, z, this._metadataClient, this._partitionMetricSampleAggregator, this, this._metricFetcherManager, this._sampleStore, this._configuredNumWindows, this._configuredWindowMs, this._samplingIntervalMs, this._time));
    }

    public void bootstrap(boolean z) {
        if (!this._state.compareAndSet(LoadMonitorTaskRunnerState.RUNNING, LoadMonitorTaskRunnerState.BOOTSTRAPPING)) {
            throw new IllegalStateException("Cannot bootstrap because the load monitor is in " + this._state.get() + " state.");
        }
        this._samplingScheduler.submit(new BootstrapTask(z, this._metadataClient, this._partitionMetricSampleAggregator, this, this._metricFetcherManager, this._sampleStore, this._configuredNumWindows, this._configuredWindowMs, this._samplingIntervalMs, this._time));
    }

    public double bootStrapProgress() {
        return this._bootstrapProgress;
    }

    private void loadSamples() {
        if (!this._state.compareAndSet(LoadMonitorTaskRunnerState.RUNNING, LoadMonitorTaskRunnerState.LOADING)) {
            throw new IllegalStateException("Cannot load samples because the load monitor is in " + this._state.get() + " state.");
        }
        this._samplingScheduler.submit(new SampleLoadingTask(this._sampleStore, this._partitionMetricSampleAggregator, this._brokerMetricSampleAggregator, this));
    }

    public double sampleLoadingProgress() {
        return this._sampleStore.sampleLoadingProgress();
    }

    public void train(long j, long j2) {
        if (!this._state.compareAndSet(LoadMonitorTaskRunnerState.RUNNING, LoadMonitorTaskRunnerState.TRAINING)) {
            throw new IllegalStateException("Cannot start model training because the load monitor is in " + this._state.get() + " state.");
        }
        this._samplingScheduler.submit(new TrainingTask(this._time, this, this._metricFetcherManager, this._sampleStore, this._configuredWindowMs, this._samplingIntervalMs, j, j2));
    }

    public LoadMonitorTaskRunnerState state() {
        return this._state.get();
    }

    public void start(boolean z) {
        if (!this._state.compareAndSet(LoadMonitorTaskRunnerState.NOT_STARTED, LoadMonitorTaskRunnerState.RUNNING)) {
            throw new IllegalStateException("Cannot start the task runner because the load monitor is in " + this._state.get() + " state.");
        }
        if (!z) {
            loadSamples();
        }
        this._samplingScheduler.scheduleAtFixedRate(new SamplingTask(this._samplingIntervalMs, this._metadataClient, this, this._metricFetcherManager, this._sampleStore, this._time), 0L, this._samplingIntervalMs, TimeUnit.MILLISECONDS);
    }

    public void shutdown() {
        LOG.info("Shutting down load monitor task runner.");
        this._samplingScheduler.shutdown();
        try {
            this._samplingScheduler.awaitTermination(1000L, TimeUnit.MILLISECONDS);
            if (!this._samplingScheduler.isTerminated()) {
                LOG.warn("The sampling scheduler failed to shutdown in " + this._samplingIntervalMs + " ms.");
            }
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for metric fetcher manager to shutdown.");
        }
        this._metricFetcherManager.shutdown();
        this._sampleStore.close();
        LOG.info("Load monitor task runner shutdown completed.");
    }

    public synchronized void pauseSampling(String str, boolean z) {
        if (this._state.get() == LoadMonitorTaskRunnerState.LOADING) {
            LOG.info("Skip pause sampling since load monitor is in loading state");
        } else {
            if (this._state.get() != LoadMonitorTaskRunnerState.PAUSED && !this._state.compareAndSet(LoadMonitorTaskRunnerState.RUNNING, LoadMonitorTaskRunnerState.PAUSED)) {
                this._awaitingPauseSampling = z;
                throw new IllegalStateException("Cannot pause the load monitor because it is in " + this._state.get() + " state.");
            }
            this._awaitingPauseSampling = false;
            setReasonOfLatestPauseOrResume(str);
        }
    }

    public synchronized void resumeSampling(String str) {
        if (this._state.get() == LoadMonitorTaskRunnerState.LOADING) {
            LOG.info("Skip resume sampling since load monitor is in loading state");
        } else {
            if (this._state.get() != LoadMonitorTaskRunnerState.RUNNING && !this._state.compareAndSet(LoadMonitorTaskRunnerState.PAUSED, LoadMonitorTaskRunnerState.RUNNING)) {
                throw new IllegalStateException("Cannot resume the load monitor because it is in " + this._state.get() + " state");
            }
            setReasonOfLatestPauseOrResume(str);
        }
    }

    void setReasonOfLatestPauseOrResume(String str) {
        this._reasonOfLatestPauseOrResume = str;
    }

    public String reasonOfLatestPauseOrResume() {
        return this._reasonOfLatestPauseOrResume;
    }

    public boolean awaitingPauseSampling() {
        return this._awaitingPauseSampling;
    }

    public void setSamplingMode(MetricSampler.SamplingMode samplingMode) {
        this._samplingMode = samplingMode;
    }

    public MetricSampler.SamplingMode samplingMode() {
        return this._samplingMode;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean compareAndSetState(LoadMonitorTaskRunnerState loadMonitorTaskRunnerState, LoadMonitorTaskRunnerState loadMonitorTaskRunnerState2) {
        return this._state.compareAndSet(loadMonitorTaskRunnerState, loadMonitorTaskRunnerState2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setBootstrapProgress(double d) {
        this._bootstrapProgress = d;
    }
}
