/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.monitor.sampling;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.exception.SamplingException;
import com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.AbstractMetricSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSamplerOptions;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.SamplingUtils;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CruiseControlMetricsReporterSampler
extends AbstractMetricSampler {
    private static final Logger LOG = LoggerFactory.getLogger(CruiseControlMetricsReporterSampler.class);
    public static final String METRIC_REPORTER_SAMPLER_BOOTSTRAP_SERVERS = "metric.reporter.sampler.bootstrap.servers";
    public static final String METRIC_REPORTER_TOPIC = "metric.reporter.topic";
    @Deprecated
    public static final String METRIC_REPORTER_SAMPLER_GROUP_ID = "metric.reporter.sampler.group.id";
    public static final Duration METRIC_REPORTER_CONSUMER_POLL_TIMEOUT = Duration.ofMillis(5000L);
    public static final String CONSUMER_CLIENT_ID_PREFIX = "CruiseControlMetricsReporterSampler";
    public static final long ACCEPTABLE_NETWORK_DELAY_MS = 100L;
    protected Consumer<String, CruiseControlMetric> _metricConsumer;
    protected String _metricReporterTopic;
    protected Set<TopicPartition> _currentPartitionAssignment;
    protected long _acceptableMetricRecordProduceDelayMs;

    @Override
    protected int retrieveMetricsForProcessing(MetricSamplerOptions metricSamplerOptions) throws SamplingException {
        if (this.refreshPartitionAssignment()) {
            return 0;
        }
        HashMap<TopicPartition, Long> timestampToSeek = new HashMap<TopicPartition, Long>(this._currentPartitionAssignment.size());
        for (TopicPartition tp2 : this._currentPartitionAssignment) {
            timestampToSeek.put(tp2, metricSamplerOptions.startTimeMs());
        }
        HashSet<TopicPartition> assignment = new HashSet<TopicPartition>(this._currentPartitionAssignment);
        Map endOffsets = this._metricConsumer.endOffsets(assignment);
        Map offsetsForTimes = this._metricConsumer.offsetsForTimes(timestampToSeek);
        KafkaCruiseControlUtils.sanityCheckOffsetFetch(endOffsets, offsetsForTimes);
        assignment.removeAll(offsetsForTimes.keySet());
        assignment.forEach(tp -> this._metricConsumer.seek(tp, ((Long)endOffsets.get(tp)).longValue()));
        for (Map.Entry entry : offsetsForTimes.entrySet()) {
            TopicPartition tp3 = (TopicPartition)entry.getKey();
            OffsetAndTimestamp offsetAndTimestamp = (OffsetAndTimestamp)entry.getValue();
            this._metricConsumer.seek(tp3, offsetAndTimestamp != null ? offsetAndTimestamp.offset() : ((Long)endOffsets.get(tp3)).longValue());
        }
        LOG.debug("Starting consuming from metrics reporter topic partitions {}.", this._currentPartitionAssignment);
        this._metricConsumer.resume((Collection)this._metricConsumer.paused());
        int totalMetricsAdded = 0;
        HashSet<TopicPartition> partitionsToPause = new HashSet<TopicPartition>();
        do {
            ConsumerRecords records = this._metricConsumer.poll(METRIC_REPORTER_CONSUMER_POLL_TIMEOUT);
            for (ConsumerRecord record : records) {
                if (record == null) {
                    LOG.warn("Cannot parse record, please update your Cruise Control version.");
                    continue;
                }
                long recordTime = ((CruiseControlMetric)record.value()).time();
                if (recordTime + this._acceptableMetricRecordProduceDelayMs < metricSamplerOptions.startTimeMs()) {
                    LOG.debug("Discarding metric {} because its timestamp is more than {} ms earlier than the start time of sampling period {}.", new Object[]{record.value(), this._acceptableMetricRecordProduceDelayMs, metricSamplerOptions.startTimeMs()});
                    continue;
                }
                if (recordTime >= metricSamplerOptions.endTimeMs()) {
                    TopicPartition tp4 = new TopicPartition(record.topic(), record.partition());
                    LOG.debug("Saw metric {} whose timestamp is larger than the end time of sampling period {}. Pausing partition {} at offset {}.", new Object[]{record.value(), metricSamplerOptions.endTimeMs(), tp4, record.offset()});
                    partitionsToPause.add(tp4);
                    continue;
                }
                this.addMetricForProcessing((CruiseControlMetric)record.value());
                ++totalMetricsAdded;
            }
            if (partitionsToPause.isEmpty()) continue;
            this._metricConsumer.pause(partitionsToPause);
            partitionsToPause.clear();
        } while (!KafkaCruiseControlUtils.consumptionDone(this._metricConsumer, endOffsets) && System.currentTimeMillis() < metricSamplerOptions.timeoutMs());
        LOG.info("Finished sampling for topic partitions {} in time range [{},{}]. Collected {} metrics.", new Object[]{this._currentPartitionAssignment, metricSamplerOptions.startTimeMs(), metricSamplerOptions.endTimeMs(), totalMetricsAdded});
        return totalMetricsAdded;
    }

    protected boolean refreshPartitionAssignment() {
        List remotePartitionInfo = this._metricConsumer.partitionsFor(this._metricReporterTopic);
        if (remotePartitionInfo == null) {
            LOG.error("_metricConsumer returned null for _metricReporterTopic {}", (Object)this._metricReporterTopic);
            return true;
        }
        if (remotePartitionInfo.isEmpty()) {
            this._currentPartitionAssignment = Collections.emptySet();
            LOG.error("The set of partitions currently assigned to the metric consumer is empty.");
            return true;
        }
        if (remotePartitionInfo.size() == this._currentPartitionAssignment.size()) {
            return false;
        }
        this._currentPartitionAssignment = new HashSet<TopicPartition>(remotePartitionInfo.size());
        for (PartitionInfo partitionInfo : remotePartitionInfo) {
            this._currentPartitionAssignment.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
        }
        this._metricConsumer.assign(this._currentPartitionAssignment);
        return false;
    }

    @Override
    public void configure(Map<String, ?> configs) {
        super.configure(configs);
        int numSamplers = (Integer)configs.get("num.metric.fetchers");
        if (numSamplers != 1) {
            throw new ConfigException("CruiseControlMetricsReporterSampler is not thread safe. Please change num.metric.fetchers to 1");
        }
        this._metricReporterTopic = (String)configs.get(METRIC_REPORTER_TOPIC);
        if (this._metricReporterTopic == null) {
            this._metricReporterTopic = "__CruiseControlMetrics";
        }
        CruiseControlMetricsReporterConfig reporterConfig = new CruiseControlMetricsReporterConfig(configs, false);
        this._acceptableMetricRecordProduceDelayMs = 100L + Math.max(reporterConfig.getLong("cruise.control.metrics.reporter.max.block.ms"), reporterConfig.getLong("cruise.control.metrics.reporter.linger.ms"));
        this._metricConsumer = SamplingUtils.createMetricConsumer(configs, CONSUMER_CLIENT_ID_PREFIX);
        this._currentPartitionAssignment = Collections.emptySet();
        if (this.refreshPartitionAssignment()) {
            throw new IllegalStateException("Cruise Control cannot find partitions for the metrics reporter that topic matches " + this._metricReporterTopic + " in the target cluster.");
        }
    }

    @Override
    public void close() {
        this._metricConsumer.close();
    }
}

