/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.monitor.sampling;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.linkedin.cruisecontrol.metricdef.MetricDef;
import com.linkedin.kafka.cruisecontrol.model.ModelParameters;
import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricFetcher;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.SampleStore;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.SamplingUtils;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaBrokerMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaPartitionMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerMetricSample;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionEntity;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionMetricSample;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SamplingFetcher
extends MetricFetcher {
    private static final Logger LOG = LoggerFactory.getLogger(SamplingFetcher.class);
    private final KafkaPartitionMetricSampleAggregator _partitionMetricSampleAggregator;
    private final KafkaBrokerMetricSampleAggregator _brokerMetricSampleAggregator;
    private final boolean _leaderValidation;
    private final boolean _useLinearRegressionModel;

    SamplingFetcher(MetricSampler metricSampler, Cluster cluster, KafkaPartitionMetricSampleAggregator partitionMetricSampleAggregator, KafkaBrokerMetricSampleAggregator brokerMetricSampleAggregator, SampleStore sampleStore, Set<TopicPartition> assignedPartitions, long startTimeMs, long endTimeMs, boolean leaderValidation, boolean useLinearRegressionModel, MetricDef metricDef, Timer fetchTimer, Meter fetchFailureRate, MetricSampler.SamplingMode samplingMode) {
        super(metricSampler, cluster, sampleStore, assignedPartitions, startTimeMs, endTimeMs, metricDef, fetchTimer, fetchFailureRate, samplingMode);
        this._partitionMetricSampleAggregator = partitionMetricSampleAggregator;
        this._brokerMetricSampleAggregator = brokerMetricSampleAggregator;
        this._leaderValidation = leaderValidation;
        this._useLinearRegressionModel = useLinearRegressionModel;
    }

    @Override
    protected void usePartitionMetricSamples(Set<PartitionMetricSample> partitionMetricSamples) {
        HashSet<TopicPartition> returnedPartitions = new HashSet<TopicPartition>(this._assignedPartitions.size());
        if (partitionMetricSamples != null) {
            int discarded = 0;
            Iterator<PartitionMetricSample> iter = partitionMetricSamples.iterator();
            while (iter.hasNext()) {
                PartitionMetricSample partitionMetricSample = iter.next();
                TopicPartition tp = ((PartitionEntity)partitionMetricSample.entity()).tp();
                if (this._assignedPartitions.contains(tp)) {
                    if (this._useLinearRegressionModel && ModelParameters.trainingCompleted()) {
                        partitionMetricSample.record(KafkaMetricDef.commonMetricDef().metricInfo(KafkaMetricDef.CPU_USAGE.name()), SamplingUtils.estimateLeaderCpuUtilUsingLinearRegressionModel(partitionMetricSample));
                    }
                    partitionMetricSample.close(this._endTimeMs);
                    if (this._partitionMetricSampleAggregator.addSample(partitionMetricSample, this._leaderValidation)) {
                        LOG.trace("Enqueued partition metric sample {}", (Object)partitionMetricSample);
                    } else {
                        iter.remove();
                        ++discarded;
                        LOG.trace("Failed to add partition metric sample {}", (Object)partitionMetricSample);
                    }
                    returnedPartitions.add(tp);
                    continue;
                }
                LOG.warn("Collected partition metric sample for partition {} which is not an assigned partition. The metric sample will be ignored.", (Object)tp);
            }
            LOG.info("Collected {}{} partition metric samples for {} partitions. Total partition assigned: {}.", new Object[]{partitionMetricSamples.size(), discarded > 0 ? String.format("(%d discarded)", discarded) : "", returnedPartitions.size(), this._assignedPartitions.size()});
        } else {
            LOG.warn("Failed to collect partition metric samples for {} assigned partitions", (Object)this._assignedPartitions.size());
        }
    }

    @Override
    protected void useBrokerMetricSamples(Set<BrokerMetricSample> brokerMetricSamples) {
        HashSet<Integer> returnedBrokerIds = new HashSet<Integer>();
        if (brokerMetricSamples != null) {
            int discarded = 0;
            Iterator<BrokerMetricSample> iter = brokerMetricSamples.iterator();
            while (iter.hasNext()) {
                BrokerMetricSample brokerMetricSample = iter.next();
                brokerMetricSample.close(this._endTimeMs);
                if (this._brokerMetricSampleAggregator.addSample(brokerMetricSample)) {
                    LOG.trace("Enqueued broker metric sample {}", (Object)brokerMetricSample);
                } else {
                    iter.remove();
                    ++discarded;
                    LOG.trace("Failed to add broker metric sample {}", (Object)brokerMetricSample);
                }
                returnedBrokerIds.add(brokerMetricSample.brokerId());
            }
            LOG.info("Collected {}{} broker metric samples for {} brokers.", new Object[]{brokerMetricSamples.size(), discarded > 0 ? String.format("(%d discarded)", discarded) : "", returnedBrokerIds.size()});
            ModelParameters.addMetricObservation(brokerMetricSamples);
        } else {
            LOG.warn("Failed to collect broker metrics samples.");
        }
    }
}

