/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.monitor.sampling;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.linkedin.cruisecontrol.metricdef.MetricDef;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigResolver;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricFetcher;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSamplerPartitionAssignor;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.SampleStore;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.SamplingFetcher;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.TrainingFetcher;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaBrokerMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaPartitionMetricSampleAggregator;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetricFetcherManager {
    static final String BROKER_CAPACITY_CONFIG_RESOLVER_OBJECT_CONFIG = "broker.capacity.config.resolver.object";
    private static final Logger LOG = LoggerFactory.getLogger(MetricFetcherManager.class);
    static final int SUPPORTED_NUM_METRIC_FETCHER = 1;
    private final Time _time;
    private final KafkaPartitionMetricSampleAggregator _partitionMetricSampleAggregator;
    private final KafkaBrokerMetricSampleAggregator _brokerMetricSampleAggregator;
    private final MetadataClient _metadataClient;
    private final MetricSampler _metricSampler;
    private final MetricSamplerPartitionAssignor _partitionAssignor;
    private final ExecutorService _samplingExecutor;
    private final boolean _useLinearRegressionModel;
    private final MetricDef _metricDef;
    private final Timer _samplingFetcherTimer;
    private final Meter _samplingFetcherFailureRate;
    private final Timer _trainingSamplesFetcherTimer;
    private final Meter _trainingSamplesFetcherFailureRate;

    public MetricFetcherManager(KafkaCruiseControlConfig config, KafkaPartitionMetricSampleAggregator partitionMetricSampleAggregator, KafkaBrokerMetricSampleAggregator brokerMetricSampleAggregator, MetadataClient metadataClient, MetricDef metricDef, Time time, MetricRegistry dropwizardMetricRegistry, BrokerCapacityConfigResolver brokerCapacityConfigResolver) {
        this(config, partitionMetricSampleAggregator, brokerMetricSampleAggregator, metadataClient, metricDef, time, dropwizardMetricRegistry, brokerCapacityConfigResolver, null);
    }

    public MetricFetcherManager(KafkaCruiseControlConfig config, KafkaPartitionMetricSampleAggregator partitionMetricSampleAggregator, KafkaBrokerMetricSampleAggregator brokerMetricSampleAggregator, MetadataClient metadataClient, MetricDef metricDef, Time time, MetricRegistry dropwizardMetricRegistry, BrokerCapacityConfigResolver brokerCapacityConfigResolver, MetricSampler sampler) {
        this._time = time;
        this._partitionMetricSampleAggregator = partitionMetricSampleAggregator;
        this._brokerMetricSampleAggregator = brokerMetricSampleAggregator;
        this._metadataClient = metadataClient;
        this._metricDef = metricDef;
        this._samplingExecutor = Executors.newFixedThreadPool(1, new KafkaCruiseControlThreadFactory("MetricFetcher", true, LOG));
        this._partitionAssignor = config.getConfiguredInstance("metric.sampler.partition.assignor.class", MetricSamplerPartitionAssignor.class);
        this._partitionAssignor.configure(config.mergedConfigValues());
        this._useLinearRegressionModel = config.getBoolean("use.linear.regression.model");
        this._samplingFetcherTimer = dropwizardMetricRegistry.timer(MetricRegistry.name((String)"MetricFetcherManager", (String[])new String[]{"partition-samples-fetcher-timer"}));
        this._samplingFetcherFailureRate = dropwizardMetricRegistry.meter(MetricRegistry.name((String)"MetricFetcherManager", (String[])new String[]{"partition-samples-fetcher-failure-rate"}));
        this._trainingSamplesFetcherTimer = dropwizardMetricRegistry.timer(MetricRegistry.name((String)"MetricFetcherManager", (String[])new String[]{"training-samples-fetcher-timer"}));
        this._trainingSamplesFetcherFailureRate = dropwizardMetricRegistry.meter(MetricRegistry.name((String)"MetricFetcherManager", (String[])new String[]{"training-samples-fetcher-failure-rate"}));
        this._metricSampler = sampler == null ? config.getConfiguredInstance("metric.sampler.class", MetricSampler.class, Collections.singletonMap(BROKER_CAPACITY_CONFIG_RESOLVER_OBJECT_CONFIG, brokerCapacityConfigResolver)) : sampler;
    }

    public void shutdown() {
        try {
            this._metricSampler.close();
        }
        catch (Exception e) {
            LOG.warn("Received exception when closing metric samplers.", (Throwable)e);
        }
        LOG.info("Shutting down metric fetcher manager.");
        this._samplingExecutor.shutdown();
        LOG.info("Metric fetcher manager shutdown completed.");
    }

    public boolean fetchMetricSamples(long startMs, long endMs, long timeoutMs, SampleStore sampleStore, MetricSampler.SamplingMode samplingMode) {
        LOG.info("Kicking off metric sampling for time range [{}, {}], duration {} ms with timeout {} ms.", new Object[]{startMs, endMs, endMs - startMs, timeoutMs});
        Set<TopicPartition> partitionAssignment = this._partitionAssignor.assignPartitions(this._metadataClient.cluster());
        SamplingFetcher samplingFetcher = new SamplingFetcher(this._metricSampler, this._metadataClient.cluster(), this._partitionMetricSampleAggregator, this._brokerMetricSampleAggregator, sampleStore, partitionAssignment, startMs, endMs, true, this._useLinearRegressionModel, this._metricDef, this._samplingFetcherTimer, this._samplingFetcherFailureRate, samplingMode);
        return this.fetchSamples(samplingFetcher, timeoutMs);
    }

    public boolean fetchBrokerMetricSamples(long startMs, long endMs, long timeoutMs, SampleStore sampleStore) {
        LOG.info("Kicking off broker metric sampling for time range [{}, {}], duration {} ms with timeout {} ms.", new Object[]{startMs, endMs, endMs - startMs, timeoutMs});
        Set<TopicPartition> partitionAssignment = this._partitionAssignor.assignPartitions(this._metadataClient.cluster());
        TrainingFetcher trainingFetcher = new TrainingFetcher(this._metricSampler, this._metadataClient.cluster(), sampleStore, partitionAssignment, startMs, endMs, this._metricDef, this._trainingSamplesFetcherTimer, this._trainingSamplesFetcherFailureRate);
        return this.fetchSamples(trainingFetcher, timeoutMs);
    }

    private boolean fetchSamples(MetricFetcher metricFetcher, long timeoutMs) {
        boolean hasSamplingError = false;
        long samplingActionStartMs = this._time.milliseconds();
        long deadlineMs = samplingActionStartMs + timeoutMs;
        Future<Boolean> errorFuture = this._samplingExecutor.submit(metricFetcher);
        try {
            hasSamplingError = errorFuture.get(deadlineMs - this._time.milliseconds(), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            LOG.warn("Sampling scheduler thread is interrupted when waiting for sampling to finish.", (Throwable)e);
        }
        catch (ExecutionException e) {
            LOG.error("Sampling scheduler received Execution exception when waiting for sampling to finish.", (Throwable)e);
        }
        catch (TimeoutException e) {
            LOG.error("Sampling scheduler received Timeout exception when waiting for sampling to finish.", (Throwable)e);
        }
        catch (Exception e) {
            LOG.error("Sampling scheduler received Unknown exception when waiting for sampling to finish.", (Throwable)e);
        }
        long samplingTime = this._time.milliseconds() - samplingActionStartMs;
        LOG.info("Finished sampling in {} ms.", (Object)samplingTime);
        return hasSamplingError;
    }
}

