/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.monitor.task;

import com.codahale.metrics.MetricRegistry;
import com.linkedin.cruisecontrol.metricdef.MetricDef;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigResolver;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricFetcherManager;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.SampleStore;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaBrokerMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaPartitionMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.task.BootstrapTask;
import com.linkedin.kafka.cruisecontrol.monitor.task.SampleLoadingTask;
import com.linkedin.kafka.cruisecontrol.monitor.task.SamplingTask;
import com.linkedin.kafka.cruisecontrol.monitor.task.TrainingTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoadMonitorTaskRunner {
    private static final Logger LOG = LoggerFactory.getLogger(LoadMonitorTaskRunner.class);
    public static final MetricSampler.SamplingMode DEFAULT_SAMPLING_MODE = MetricSampler.SamplingMode.ALL;
    private final Time _time;
    private final MetricFetcherManager _metricFetcherManager;
    private final KafkaPartitionMetricSampleAggregator _partitionMetricSampleAggregator;
    private final KafkaBrokerMetricSampleAggregator _brokerMetricSampleAggregator;
    private final MetadataClient _metadataClient;
    private final SampleStore _sampleStore;
    private final ScheduledExecutorService _samplingScheduler;
    private final long _samplingIntervalMs;
    private final int _configuredNumWindows;
    private final long _configuredWindowMs;
    private final AtomicReference<LoadMonitorTaskRunnerState> _state;
    private volatile double _bootstrapProgress;
    private volatile boolean _awaitingPauseSampling;
    private volatile String _reasonOfLatestPauseOrResume;
    private volatile MetricSampler.SamplingMode _samplingMode;

    public LoadMonitorTaskRunner(KafkaCruiseControlConfig config, KafkaPartitionMetricSampleAggregator partitionMetricSampleAggregator, KafkaBrokerMetricSampleAggregator brokerMetricSampleAggregator, MetadataClient metadataClient, MetricDef metricDef, Time time, MetricRegistry dropwizardMetricRegistry, BrokerCapacityConfigResolver brokerCapacityConfigResolver) {
        this(config, new MetricFetcherManager(config, partitionMetricSampleAggregator, brokerMetricSampleAggregator, metadataClient, metricDef, time, dropwizardMetricRegistry, brokerCapacityConfigResolver), partitionMetricSampleAggregator, brokerMetricSampleAggregator, metadataClient, time);
    }

    LoadMonitorTaskRunner(KafkaCruiseControlConfig config, MetricFetcherManager metricFetcherManager, KafkaPartitionMetricSampleAggregator partitionMetricSampleAggregator, KafkaBrokerMetricSampleAggregator brokerMetricSampleAggregator, MetadataClient metadataClient, Time time) {
        this._time = time;
        this._metricFetcherManager = metricFetcherManager;
        this._partitionMetricSampleAggregator = partitionMetricSampleAggregator;
        this._brokerMetricSampleAggregator = brokerMetricSampleAggregator;
        this._metadataClient = metadataClient;
        this._sampleStore = config.getConfiguredInstance("sample.store.class", SampleStore.class);
        long samplingIntervalMs = config.getLong("metric.sampling.interval.ms");
        this._samplingScheduler = Executors.newScheduledThreadPool(2, new KafkaCruiseControlThreadFactory("SamplingScheduler", true, LOG));
        this._samplingIntervalMs = samplingIntervalMs;
        this._configuredNumWindows = config.getInt("num.partition.metrics.windows");
        this._configuredWindowMs = config.getLong("partition.metrics.window.ms");
        this._state = new AtomicReference<LoadMonitorTaskRunnerState>(LoadMonitorTaskRunnerState.NOT_STARTED);
        this._bootstrapProgress = -1.0;
        this._awaitingPauseSampling = false;
        this._reasonOfLatestPauseOrResume = null;
        this._samplingMode = DEFAULT_SAMPLING_MODE;
    }

    public void bootstrap(long startMs, long endMs, boolean clearMetrics) {
        if (!this._state.compareAndSet(LoadMonitorTaskRunnerState.RUNNING, LoadMonitorTaskRunnerState.BOOTSTRAPPING)) {
            throw new IllegalStateException("Cannot bootstrap because the load monitor in " + (Object)((Object)this._state.get()) + " state.");
        }
        this._samplingScheduler.submit(new BootstrapTask(startMs, endMs, clearMetrics, this._metadataClient, this._partitionMetricSampleAggregator, this, this._metricFetcherManager, this._sampleStore, this._configuredNumWindows, this._configuredWindowMs, this._samplingIntervalMs, this._time));
    }

    public void bootstrap(long startMs, boolean clearMetrics) {
        if (!this._state.compareAndSet(LoadMonitorTaskRunnerState.RUNNING, LoadMonitorTaskRunnerState.BOOTSTRAPPING)) {
            throw new IllegalStateException("Cannot bootstrap because the load monitor in " + (Object)((Object)this._state.get()) + " state.");
        }
        this._samplingScheduler.submit(new BootstrapTask(startMs, clearMetrics, this._metadataClient, this._partitionMetricSampleAggregator, this, this._metricFetcherManager, this._sampleStore, this._configuredNumWindows, this._configuredWindowMs, this._samplingIntervalMs, this._time));
    }

    public void bootstrap(boolean clearMetrics) {
        if (!this._state.compareAndSet(LoadMonitorTaskRunnerState.RUNNING, LoadMonitorTaskRunnerState.BOOTSTRAPPING)) {
            throw new IllegalStateException("Cannot bootstrap because the load monitor is in " + (Object)((Object)this._state.get()) + " state.");
        }
        this._samplingScheduler.submit(new BootstrapTask(clearMetrics, this._metadataClient, this._partitionMetricSampleAggregator, this, this._metricFetcherManager, this._sampleStore, this._configuredNumWindows, this._configuredWindowMs, this._samplingIntervalMs, this._time));
    }

    public double bootStrapProgress() {
        return this._bootstrapProgress;
    }

    private void loadSamples() {
        if (!this._state.compareAndSet(LoadMonitorTaskRunnerState.RUNNING, LoadMonitorTaskRunnerState.LOADING)) {
            throw new IllegalStateException("Cannot load samples because the load monitor is in " + (Object)((Object)this._state.get()) + " state.");
        }
        this._samplingScheduler.submit(new SampleLoadingTask(this._sampleStore, this._partitionMetricSampleAggregator, this._brokerMetricSampleAggregator, this));
    }

    public double sampleLoadingProgress() {
        return this._sampleStore.sampleLoadingProgress();
    }

    public void train(long startMs, long endMs) {
        if (!this._state.compareAndSet(LoadMonitorTaskRunnerState.RUNNING, LoadMonitorTaskRunnerState.TRAINING)) {
            throw new IllegalStateException("Cannot start model training because the load monitor is in " + (Object)((Object)this._state.get()) + " state.");
        }
        this._samplingScheduler.submit(new TrainingTask(this._time, this, this._metricFetcherManager, this._sampleStore, this._configuredWindowMs, this._samplingIntervalMs, startMs, endMs));
    }

    public LoadMonitorTaskRunnerState state() {
        return this._state.get();
    }

    public void start(boolean skipLoadingSamples) {
        if (!this._state.compareAndSet(LoadMonitorTaskRunnerState.NOT_STARTED, LoadMonitorTaskRunnerState.RUNNING)) {
            throw new IllegalStateException("Cannot start the task runner because the load monitor is in " + (Object)((Object)this._state.get()) + " state.");
        }
        if (!skipLoadingSamples) {
            this.loadSamples();
        }
        this._samplingScheduler.scheduleAtFixedRate(new SamplingTask(this._samplingIntervalMs, this._metadataClient, this, this._metricFetcherManager, this._sampleStore, this._time), 0L, this._samplingIntervalMs, TimeUnit.MILLISECONDS);
    }

    public void shutdown() {
        LOG.info("Shutting down load monitor task runner.");
        this._samplingScheduler.shutdown();
        try {
            this._samplingScheduler.awaitTermination(1000L, TimeUnit.MILLISECONDS);
            if (!this._samplingScheduler.isTerminated()) {
                LOG.warn("The sampling scheduler failed to shutdown in " + this._samplingIntervalMs + " ms.");
            }
        }
        catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for metric fetcher manager to shutdown.");
        }
        this._metricFetcherManager.shutdown();
        try {
            this._sampleStore.close();
        }
        catch (Exception e) {
            LOG.warn("Received exception when closing sample store.", (Throwable)e);
        }
        LOG.info("Load monitor task runner shutdown completed.");
    }

    public synchronized void pauseSampling(String reason, boolean forcePauseSampling) {
        if (this._state.get() == LoadMonitorTaskRunnerState.LOADING) {
            LOG.info("Skip pause sampling since load monitor is in loading state");
            return;
        }
        if (this._state.get() != LoadMonitorTaskRunnerState.PAUSED && !this._state.compareAndSet(LoadMonitorTaskRunnerState.RUNNING, LoadMonitorTaskRunnerState.PAUSED)) {
            this._awaitingPauseSampling = forcePauseSampling;
            throw new IllegalStateException("Cannot pause the load monitor because it is in " + (Object)((Object)this._state.get()) + " state.");
        }
        this._awaitingPauseSampling = false;
        this.setReasonOfLatestPauseOrResume(reason);
    }

    public synchronized void resumeSampling(String reason) {
        if (this._state.get() == LoadMonitorTaskRunnerState.LOADING) {
            LOG.info("Skip resume sampling since load monitor is in loading state");
            return;
        }
        if (this._state.get() != LoadMonitorTaskRunnerState.RUNNING && !this._state.compareAndSet(LoadMonitorTaskRunnerState.PAUSED, LoadMonitorTaskRunnerState.RUNNING)) {
            throw new IllegalStateException("Cannot resume the load monitor because it is in " + (Object)((Object)this._state.get()) + " state");
        }
        this.setReasonOfLatestPauseOrResume(reason);
    }

    void setReasonOfLatestPauseOrResume(String reason) {
        this._reasonOfLatestPauseOrResume = reason;
    }

    public String reasonOfLatestPauseOrResume() {
        return this._reasonOfLatestPauseOrResume;
    }

    public boolean awaitingPauseSampling() {
        return this._awaitingPauseSampling;
    }

    public void setSamplingMode(MetricSampler.SamplingMode samplingMode) {
        this._samplingMode = samplingMode;
    }

    public MetricSampler.SamplingMode samplingMode() {
        return this._samplingMode;
    }

    boolean compareAndSetState(LoadMonitorTaskRunnerState expectedState, LoadMonitorTaskRunnerState newState) {
        return this._state.compareAndSet(expectedState, newState);
    }

    void setBootstrapProgress(double progress) {
        this._bootstrapProgress = progress;
    }

    public static enum LoadMonitorTaskRunnerState {
        NOT_STARTED,
        RUNNING,
        PAUSED,
        SAMPLING,
        BOOTSTRAPPING,
        TRAINING,
        LOADING;

    }
}

