package com.linkedin.kafka.cruisecontrol.monitor.sampling;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.linkedin.cruisecontrol.metricdef.MetricDef;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigResolver;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.constants.MonitorConfig;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaBrokerMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaPartitionMetricSampleAggregator;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/sampling/MetricFetcherManager.class */
public class MetricFetcherManager {
    static final String BROKER_CAPACITY_CONFIG_RESOLVER_OBJECT_CONFIG = "broker.capacity.config.resolver.object";
    private static final Logger LOG = LoggerFactory.getLogger(MetricFetcherManager.class);
    static final int SUPPORTED_NUM_METRIC_FETCHER = 1;
    private final Time _time;
    private final KafkaPartitionMetricSampleAggregator _partitionMetricSampleAggregator;
    private final KafkaBrokerMetricSampleAggregator _brokerMetricSampleAggregator;
    private final MetadataClient _metadataClient;
    private final MetricSampler _metricSampler;
    private final MetricSamplerPartitionAssignor _partitionAssignor;
    private final ExecutorService _samplingExecutor;
    private final boolean _useLinearRegressionModel;
    private final MetricDef _metricDef;
    private final Timer _samplingFetcherTimer;
    private final Meter _samplingFetcherFailureRate;
    private final Timer _trainingSamplesFetcherTimer;
    private final Meter _trainingSamplesFetcherFailureRate;

    public MetricFetcherManager(KafkaCruiseControlConfig kafkaCruiseControlConfig, KafkaPartitionMetricSampleAggregator kafkaPartitionMetricSampleAggregator, KafkaBrokerMetricSampleAggregator kafkaBrokerMetricSampleAggregator, MetadataClient metadataClient, MetricDef metricDef, Time time, MetricRegistry metricRegistry, BrokerCapacityConfigResolver brokerCapacityConfigResolver) {
        this(kafkaCruiseControlConfig, kafkaPartitionMetricSampleAggregator, kafkaBrokerMetricSampleAggregator, metadataClient, metricDef, time, metricRegistry, brokerCapacityConfigResolver, null);
    }

    public MetricFetcherManager(KafkaCruiseControlConfig kafkaCruiseControlConfig, KafkaPartitionMetricSampleAggregator kafkaPartitionMetricSampleAggregator, KafkaBrokerMetricSampleAggregator kafkaBrokerMetricSampleAggregator, MetadataClient metadataClient, MetricDef metricDef, Time time, MetricRegistry metricRegistry, BrokerCapacityConfigResolver brokerCapacityConfigResolver, MetricSampler metricSampler) {
        this._time = time;
        this._partitionMetricSampleAggregator = kafkaPartitionMetricSampleAggregator;
        this._brokerMetricSampleAggregator = kafkaBrokerMetricSampleAggregator;
        this._metadataClient = metadataClient;
        this._metricDef = metricDef;
        this._samplingExecutor = Executors.newFixedThreadPool(1, new KafkaCruiseControlThreadFactory("MetricFetcher", true, LOG));
        this._partitionAssignor = (MetricSamplerPartitionAssignor) kafkaCruiseControlConfig.getConfiguredInstance(MonitorConfig.METRIC_SAMPLER_PARTITION_ASSIGNOR_CLASS_CONFIG, MetricSamplerPartitionAssignor.class);
        this._partitionAssignor.configure(kafkaCruiseControlConfig.mergedConfigValues());
        this._useLinearRegressionModel = kafkaCruiseControlConfig.getBoolean(MonitorConfig.USE_LINEAR_REGRESSION_MODEL_CONFIG).booleanValue();
        this._samplingFetcherTimer = metricRegistry.timer(MetricRegistry.name("MetricFetcherManager", new String[]{"partition-samples-fetcher-timer"}));
        this._samplingFetcherFailureRate = metricRegistry.meter(MetricRegistry.name("MetricFetcherManager", new String[]{"partition-samples-fetcher-failure-rate"}));
        this._trainingSamplesFetcherTimer = metricRegistry.timer(MetricRegistry.name("MetricFetcherManager", new String[]{"training-samples-fetcher-timer"}));
        this._trainingSamplesFetcherFailureRate = metricRegistry.meter(MetricRegistry.name("MetricFetcherManager", new String[]{"training-samples-fetcher-failure-rate"}));
        this._metricSampler = metricSampler == null ? (MetricSampler) kafkaCruiseControlConfig.getConfiguredInstance(MonitorConfig.METRIC_SAMPLER_CLASS_CONFIG, MetricSampler.class, Collections.singletonMap(BROKER_CAPACITY_CONFIG_RESOLVER_OBJECT_CONFIG, brokerCapacityConfigResolver)) : metricSampler;
    }

    public void shutdown() {
        try {
            this._metricSampler.close();
        } catch (Exception e) {
            LOG.warn("Received exception when closing metric samplers.", e);
        }
        LOG.info("Shutting down metric fetcher manager.");
        this._samplingExecutor.shutdown();
        LOG.info("Metric fetcher manager shutdown completed.");
    }

    public boolean fetchMetricSamples(long j, long j2, long j3, SampleStore sampleStore, MetricSampler.SamplingMode samplingMode) {
        LOG.info("Kicking off metric sampling for time range [{}, {}], duration {} ms with timeout {} ms.", new Object[]{Long.valueOf(j), Long.valueOf(j2), Long.valueOf(j2 - j), Long.valueOf(j3)});
        return fetchSamples(new SamplingFetcher(this._metricSampler, this._metadataClient.cluster(), this._partitionMetricSampleAggregator, this._brokerMetricSampleAggregator, sampleStore, this._partitionAssignor.assignPartitions(this._metadataClient.cluster()), j, j2, true, this._useLinearRegressionModel, this._metricDef, this._samplingFetcherTimer, this._samplingFetcherFailureRate, samplingMode), j3);
    }

    public boolean fetchBrokerMetricSamples(long j, long j2, long j3, SampleStore sampleStore) {
        LOG.info("Kicking off broker metric sampling for time range [{}, {}], duration {} ms with timeout {} ms.", new Object[]{Long.valueOf(j), Long.valueOf(j2), Long.valueOf(j2 - j), Long.valueOf(j3)});
        return fetchSamples(new TrainingFetcher(this._metricSampler, this._metadataClient.cluster(), sampleStore, this._partitionAssignor.assignPartitions(this._metadataClient.cluster()), j, j2, this._metricDef, this._trainingSamplesFetcherTimer, this._trainingSamplesFetcherFailureRate), j3);
    }

    private boolean fetchSamples(MetricFetcher metricFetcher, long j) {
        boolean z = false;
        long milliseconds = this._time.milliseconds();
        try {
            z = ((Boolean) this._samplingExecutor.submit(metricFetcher).get((milliseconds + j) - this._time.milliseconds(), TimeUnit.MILLISECONDS)).booleanValue();
        } catch (InterruptedException e) {
            LOG.warn("Sampling scheduler thread is interrupted when waiting for sampling to finish.", e);
        } catch (ExecutionException e2) {
            LOG.error("Sampling scheduler received Execution exception when waiting for sampling to finish.", e2);
        } catch (TimeoutException e3) {
            LOG.error("Sampling scheduler received Timeout exception when waiting for sampling to finish.", e3);
        } catch (Exception e4) {
            LOG.error("Sampling scheduler received Unknown exception when waiting for sampling to finish.", e4);
        }
        LOG.info("Finished sampling in {} ms.", Long.valueOf(this._time.milliseconds() - milliseconds));
        return z;
    }
}
