package com.linkedin.kafka.cruisecontrol.monitor.sampling;

import com.linkedin.cruisecontrol.metricdef.MetricDef;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigResolver;
import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig;
import com.linkedin.kafka.cruisecontrol.config.constants.MonitorConfig;
import com.linkedin.kafka.cruisecontrol.exception.MetricSamplingException;
import com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.MetricSerde;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/sampling/CruiseControlMetricsReporterSampler.class */
public class CruiseControlMetricsReporterSampler implements MetricSampler {
    public static final String METRIC_REPORTER_SAMPLER_BOOTSTRAP_SERVERS = "metric.reporter.sampler.bootstrap.servers";
    public static final String METRIC_REPORTER_TOPIC = "metric.reporter.topic";
    public static final String METRIC_REPORTER_SAMPLER_GROUP_ID = "metric.reporter.sampler.group.id";
    public static final String DEFAULT_METRIC_REPORTER_SAMPLER_GROUP_ID = "CruiseControlMetricsReporterSampler";
    public static final long ACCEPTABLE_NETWORK_DELAY_MS = 100;
    protected CruiseControlMetricsProcessor _metricsProcessor;
    protected Consumer<String, CruiseControlMetric> _metricConsumer;
    protected String _metricReporterTopic;
    protected Set<TopicPartition> _currentPartitionAssignment;
    protected long _acceptableMetricRecordProduceDelayMs;
    private static final Logger LOG = LoggerFactory.getLogger(CruiseControlMetricsReporterSampler.class);
    public static final Duration METRIC_REPORTER_CONSUMER_POLL_TIMEOUT = Duration.ofMillis(ExecutorConfig.DEFAULT_MIN_EXECUTION_PROGRESS_CHECK_INTERVAL_MS);
    protected static final Random RANDOM = new Random();

    @Override // com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler
    public MetricSampler.Samples getSamples(Cluster cluster, Set<TopicPartition> set, long j, long j2, MetricSampler.SamplingMode samplingMode, MetricDef metricDef, long j3) throws MetricSamplingException {
        if (refreshPartitionAssignment()) {
            return MetricSampler.EMPTY_SAMPLES;
        }
        HashMap hashMap = new HashMap(this._currentPartitionAssignment.size());
        Iterator<TopicPartition> it = this._currentPartitionAssignment.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), Long.valueOf(j));
        }
        HashSet hashSet = new HashSet(this._currentPartitionAssignment);
        Map<TopicPartition, Long> endOffsets = this._metricConsumer.endOffsets(hashSet);
        Map offsetsForTimes = this._metricConsumer.offsetsForTimes(hashMap);
        SamplingUtils.sanityCheckOffsetFetch(endOffsets, offsetsForTimes);
        hashSet.removeAll(offsetsForTimes.keySet());
        hashSet.forEach(topicPartition -> {
            this._metricConsumer.seek(topicPartition, ((Long) endOffsets.get(topicPartition)).longValue());
        });
        for (Map.Entry entry : offsetsForTimes.entrySet()) {
            TopicPartition topicPartition2 = (TopicPartition) entry.getKey();
            OffsetAndTimestamp offsetAndTimestamp = (OffsetAndTimestamp) entry.getValue();
            this._metricConsumer.seek(topicPartition2, offsetAndTimestamp != null ? offsetAndTimestamp.offset() : endOffsets.get(topicPartition2).longValue());
        }
        LOG.debug("Starting consuming from metrics reporter topic partitions {}.", this._currentPartitionAssignment);
        this._metricConsumer.resume(this._metricConsumer.paused());
        int i = 0;
        HashSet hashSet2 = new HashSet();
        do {
            Iterator it2 = this._metricConsumer.poll(METRIC_REPORTER_CONSUMER_POLL_TIMEOUT).iterator();
            while (it2.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it2.next();
                if (consumerRecord == null) {
                    LOG.warn("Cannot parse record, please update your Cruise Control version.");
                } else {
                    long time = ((CruiseControlMetric) consumerRecord.value()).time();
                    if (time + this._acceptableMetricRecordProduceDelayMs < j) {
                        LOG.debug("Discarding metric {} because its timestamp is more than {} ms earlier than the start time of sampling period {}.", new Object[]{consumerRecord.value(), Long.valueOf(this._acceptableMetricRecordProduceDelayMs), Long.valueOf(j)});
                    } else if (time >= j2) {
                        TopicPartition topicPartition3 = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
                        LOG.debug("Saw metric {} whose timestamp is larger than the end time of sampling period {}. Pausing partition {} at offset {}.", new Object[]{consumerRecord.value(), Long.valueOf(j2), topicPartition3, Long.valueOf(consumerRecord.offset())});
                        hashSet2.add(topicPartition3);
                    } else {
                        this._metricsProcessor.addMetric((CruiseControlMetric) consumerRecord.value());
                        i++;
                    }
                }
            }
            if (!hashSet2.isEmpty()) {
                this._metricConsumer.pause(hashSet2);
                hashSet2.clear();
            }
            if (consumptionDone(endOffsets)) {
                break;
            }
        } while (System.currentTimeMillis() < j3);
        LOG.info("Finished sampling for topic partitions {} in time range [{},{}]. Collected {} metrics.", new Object[]{this._currentPartitionAssignment, Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(i)});
        try {
            if (i > 0) {
                MetricSampler.Samples process = this._metricsProcessor.process(cluster, set, samplingMode);
                this._metricsProcessor.clear();
                return process;
            }
            MetricSampler.Samples samples = new MetricSampler.Samples(Collections.emptySet(), Collections.emptySet());
            this._metricsProcessor.clear();
            return samples;
        } catch (Throwable th) {
            this._metricsProcessor.clear();
            throw th;
        }
    }

    protected boolean consumptionDone(Map<TopicPartition, Long> map) {
        HashSet<TopicPartition> hashSet = new HashSet(this._metricConsumer.assignment());
        hashSet.removeAll(this._metricConsumer.paused());
        for (TopicPartition topicPartition : hashSet) {
            if (this._metricConsumer.position(topicPartition) < map.get(topicPartition).longValue()) {
                return false;
            }
        }
        return true;
    }

    protected boolean refreshPartitionAssignment() {
        List<PartitionInfo> partitionsFor = this._metricConsumer.partitionsFor(this._metricReporterTopic);
        if (partitionsFor == null) {
            LOG.error("_metricConsumer returned null for _metricReporterTopic {}", this._metricReporterTopic);
            return true;
        }
        if (partitionsFor.isEmpty()) {
            this._currentPartitionAssignment = Collections.emptySet();
            LOG.error("The set of partitions currently assigned to the metric consumer is empty.");
            return true;
        }
        if (partitionsFor.size() == this._currentPartitionAssignment.size()) {
            return false;
        }
        this._currentPartitionAssignment = new HashSet(partitionsFor.size());
        for (PartitionInfo partitionInfo : partitionsFor) {
            this._currentPartitionAssignment.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
        }
        this._metricConsumer.assign(this._currentPartitionAssignment);
        return false;
    }

    public void configure(Map<String, ?> map) {
        if (((Integer) map.get(MonitorConfig.NUM_METRIC_FETCHERS_CONFIG)).intValue() != 1) {
            throw new ConfigException("CruiseControlMetricsReporterSampler is not thread safe. Please change num.metric.fetchers to 1");
        }
        BrokerCapacityConfigResolver brokerCapacityConfigResolver = (BrokerCapacityConfigResolver) map.get("broker.capacity.config.resolver.object");
        if (brokerCapacityConfigResolver == null) {
            throw new IllegalArgumentException("Metrics reporter sampler configuration is missing broker capacity config resolver object.");
        }
        this._metricsProcessor = new CruiseControlMetricsProcessor(brokerCapacityConfigResolver, ((Boolean) map.get(MonitorConfig.SAMPLING_ALLOW_CPU_CAPACITY_ESTIMATION_CONFIG)).booleanValue());
        String str = (String) map.get(METRIC_REPORTER_SAMPLER_BOOTSTRAP_SERVERS);
        if (str == null) {
            str = map.get(MonitorConfig.BOOTSTRAP_SERVERS_CONFIG).toString();
            if (str.length() > 2) {
                str = str.substring(1, str.length() - 1);
            }
        }
        this._metricReporterTopic = (String) map.get(METRIC_REPORTER_TOPIC);
        if (this._metricReporterTopic == null) {
            this._metricReporterTopic = "__CruiseControlMetrics";
        }
        String str2 = (String) map.get(METRIC_REPORTER_SAMPLER_GROUP_ID);
        if (str2 == null) {
            str2 = "CruiseControlMetricsReporterSampler-" + RANDOM.nextLong();
        }
        String obj = map.get(MonitorConfig.RECONNECT_BACKOFF_MS_CONFIG).toString();
        CruiseControlMetricsReporterConfig cruiseControlMetricsReporterConfig = new CruiseControlMetricsReporterConfig(map, false);
        this._acceptableMetricRecordProduceDelayMs = 100 + Math.max(cruiseControlMetricsReporterConfig.getLong("cruise.control.metrics.reporter.max.block.ms").longValue(), cruiseControlMetricsReporterConfig.getLong("cruise.control.metrics.reporter.linger.ms").longValue());
        Properties properties = new Properties();
        properties.putAll(map);
        Random random = new Random();
        properties.setProperty(MonitorConfig.BOOTSTRAP_SERVERS_CONFIG, str);
        properties.setProperty("group.id", str2);
        properties.setProperty(MonitorConfig.CLIENT_ID_CONFIG, str2 + "-consumer-" + random.nextInt());
        properties.setProperty("auto.offset.reset", "latest");
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("max.poll.records", Integer.toString(Integer.MAX_VALUE));
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", MetricSerde.class.getName());
        properties.setProperty(MonitorConfig.RECONNECT_BACKOFF_MS_CONFIG, obj);
        this._metricConsumer = new KafkaConsumer(properties);
        this._currentPartitionAssignment = Collections.emptySet();
        if (refreshPartitionAssignment()) {
            throw new IllegalStateException("Cruise Control cannot find partitions for the metrics reporter that topic matches " + this._metricReporterTopic + " in the target cluster.");
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this._metricConsumer.close();
    }
}
