/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.monitor.sampling;

import com.linkedin.cruisecontrol.monitor.sampling.MetricSample;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig;
import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.UnknownVersionException;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.SampleStore;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.SamplingUtils;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerMetricSample;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionEntity;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionMetricSample;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSampleStore
implements SampleStore {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSampleStore.class);
    protected static final Duration PRODUCER_CLOSE_TIMEOUT = Duration.ofMinutes(3L);
    protected static final Duration CONSUMER_CLOSE_TIMEOUT = Duration.ofSeconds(10L);
    protected static final int ADDITIONAL_WINDOW_TO_RETAIN_FACTOR = 2;
    protected static final ConsumerRecords<byte[], byte[]> SHUTDOWN_RECORDS = new ConsumerRecords(Collections.emptyMap());
    protected static final Duration SAMPLE_POLL_TIMEOUT = Duration.ofMillis(1000L);
    protected static final int DEFAULT_NUM_SAMPLE_LOADING_THREADS = 8;
    protected static final short DEFAULT_SAMPLE_STORE_TOPIC_REPLICATION_FACTOR = 2;
    protected static final int DEFAULT_PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT = 32;
    protected static final int DEFAULT_BROKER_SAMPLE_STORE_TOPIC_PARTITION_COUNT = 32;
    protected static final long DEFAULT_MIN_PARTITION_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS = 3600000L;
    protected static final long DEFAULT_MIN_BROKER_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS = 3600000L;
    protected static final String PRODUCER_CLIENT_ID = "KafkaCruiseControlSampleStoreProducer";
    protected static final String CONSUMER_CLIENT_ID_PREFIX = "KafkaCruiseControlSampleStore";
    protected List<KafkaConsumer<byte[], byte[]>> _consumers;
    protected ExecutorService _metricProcessorExecutor;
    protected String _partitionMetricSampleStoreTopic;
    protected String _brokerMetricSampleStoreTopic;
    protected Short _sampleStoreTopicReplicationFactor;
    protected int _partitionSampleStoreTopicPartitionCount;
    protected int _brokerSampleStoreTopicPartitionCount;
    protected long _minPartitionSampleStoreTopicRetentionTimeMs;
    protected long _minBrokerSampleStoreTopicRetentionTimeMs;
    protected volatile double _loadingProgress;
    protected Producer<byte[], byte[]> _producer;
    protected volatile boolean _shutdown = false;
    protected boolean _skipSampleStoreTopicRackAwarenessCheck;
    public static final String PARTITION_METRIC_SAMPLE_STORE_TOPIC_CONFIG = "partition.metric.sample.store.topic";
    public static final String BROKER_METRIC_SAMPLE_STORE_TOPIC_CONFIG = "broker.metric.sample.store.topic";
    public static final String NUM_SAMPLE_LOADING_THREADS_CONFIG = "num.sample.loading.threads";
    public static final String SAMPLE_STORE_TOPIC_REPLICATION_FACTOR_CONFIG = "sample.store.topic.replication.factor";
    public static final String PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG = "partition.sample.store.topic.partition.count";
    public static final String BROKER_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG = "broker.sample.store.topic.partition.count";
    public static final String MIN_PARTITION_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS_CONFIG = "min.partition.sample.store.topic.retention.time.ms";
    public static final String MIN_BROKER_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS_CONFIG = "min.broker.sample.store.topic.retention.time.ms";
    public static final String SKIP_SAMPLE_STORE_TOPIC_RACK_AWARENESS_CHECK_CONFIG = "skip.sample.store.topic.rack.awareness.check";

    public void configure(Map<String, ?> config) {
        this._partitionMetricSampleStoreTopic = KafkaCruiseControlUtils.getRequiredConfig(config, PARTITION_METRIC_SAMPLE_STORE_TOPIC_CONFIG);
        this._brokerMetricSampleStoreTopic = KafkaCruiseControlUtils.getRequiredConfig(config, BROKER_METRIC_SAMPLE_STORE_TOPIC_CONFIG);
        String metricSampleStoreTopicReplicationFactorString = (String)config.get(SAMPLE_STORE_TOPIC_REPLICATION_FACTOR_CONFIG);
        this._sampleStoreTopicReplicationFactor = metricSampleStoreTopicReplicationFactorString == null || metricSampleStoreTopicReplicationFactorString.isEmpty() ? null : Short.valueOf(Short.parseShort(metricSampleStoreTopicReplicationFactorString));
        String partitionSampleStoreTopicPartitionCountString = (String)config.get(PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG);
        this._partitionSampleStoreTopicPartitionCount = partitionSampleStoreTopicPartitionCountString == null || partitionSampleStoreTopicPartitionCountString.isEmpty() ? 32 : Integer.parseInt(partitionSampleStoreTopicPartitionCountString);
        String brokerSampleStoreTopicPartitionCountString = (String)config.get(BROKER_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG);
        this._brokerSampleStoreTopicPartitionCount = brokerSampleStoreTopicPartitionCountString == null || brokerSampleStoreTopicPartitionCountString.isEmpty() ? 32 : Integer.parseInt(brokerSampleStoreTopicPartitionCountString);
        String minPartitionSampleStoreTopicRetentionTimeMsString = (String)config.get(MIN_PARTITION_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS_CONFIG);
        this._minPartitionSampleStoreTopicRetentionTimeMs = minPartitionSampleStoreTopicRetentionTimeMsString == null || minPartitionSampleStoreTopicRetentionTimeMsString.isEmpty() ? 3600000L : Long.parseLong(minPartitionSampleStoreTopicRetentionTimeMsString);
        String minBrokerSampleStoreTopicRetentionTimeMsString = (String)config.get(MIN_BROKER_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS_CONFIG);
        this._minBrokerSampleStoreTopicRetentionTimeMs = minBrokerSampleStoreTopicRetentionTimeMsString == null || minBrokerSampleStoreTopicRetentionTimeMsString.isEmpty() ? 3600000L : Long.parseLong(minBrokerSampleStoreTopicRetentionTimeMsString);
        String numProcessingThreadsString = (String)config.get(NUM_SAMPLE_LOADING_THREADS_CONFIG);
        int numProcessingThreads = numProcessingThreadsString == null || numProcessingThreadsString.isEmpty() ? 8 : Integer.parseInt(numProcessingThreadsString);
        String skipSampleStoreTopicRackAwarenessCheckString = (String)config.get(SKIP_SAMPLE_STORE_TOPIC_RACK_AWARENESS_CHECK_CONFIG);
        this._skipSampleStoreTopicRackAwarenessCheck = Boolean.parseBoolean(skipSampleStoreTopicRackAwarenessCheckString);
        this._metricProcessorExecutor = Executors.newFixedThreadPool(numProcessingThreads);
        this._consumers = new ArrayList<KafkaConsumer<byte[], byte[]>>(numProcessingThreads);
        for (int i = 0; i < numProcessingThreads; ++i) {
            this._consumers.add(SamplingUtils.createSampleStoreConsumer(config, CONSUMER_CLIENT_ID_PREFIX));
        }
        this._producer = this.createProducer(config);
        this._loadingProgress = -1.0;
        this.ensureTopicsCreated(config);
    }

    protected short sampleStoreTopicReplicationFactor(Map<String, ?> config, AdminClient adminClient) {
        if (this._sampleStoreTopicReplicationFactor == null) {
            short numberOfBrokersInCluster;
            try {
                numberOfBrokersInCluster = (short)((Collection)adminClient.describeCluster().nodes().get(30000L, TimeUnit.MILLISECONDS)).size();
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                throw new IllegalStateException("Auto creation of sample store topics failed due to failure to describe cluster.", e);
            }
            if (numberOfBrokersInCluster <= 1) {
                throw new IllegalStateException(String.format("Kafka cluster has less than 2 brokers (brokers in cluster=%d, zookeeper.connect=%s)", numberOfBrokersInCluster, config.get(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG)));
            }
            return (short)Math.min(2, numberOfBrokersInCluster);
        }
        return this._sampleStoreTopicReplicationFactor;
    }

    protected KafkaProducer<byte[], byte[]> createProducer(Map<String, ?> config) {
        Properties producerProps = new Properties();
        producerProps.putAll(config);
        producerProps.setProperty("bootstrap.servers", SamplingUtils.bootstrapServers(config));
        producerProps.setProperty("client.id", PRODUCER_CLIENT_ID);
        producerProps.setProperty("linger.ms", "30000");
        producerProps.setProperty("batch.size", "500000");
        producerProps.setProperty("buffer.memory", "67108864");
        producerProps.setProperty("retries", "5");
        producerProps.setProperty("compression.type", "gzip");
        producerProps.setProperty("key.serializer", ByteArraySerializer.class.getName());
        producerProps.setProperty("value.serializer", ByteArraySerializer.class.getName());
        producerProps.setProperty("reconnect.backoff.ms", config.get("reconnect.backoff.ms").toString());
        return new KafkaProducer(producerProps);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void ensureTopicsCreated(Map<String, ?> config) {
        AdminClient adminClient = KafkaCruiseControlUtils.createAdminClient(config);
        try {
            short replicationFactor = this.sampleStoreTopicReplicationFactor(config, adminClient);
            long partitionSampleWindowMs = (Long)config.get("partition.metrics.window.ms");
            long brokerSampleWindowMs = (Long)config.get("broker.metrics.window.ms");
            int numPartitionSampleWindows = (Integer)config.get("num.partition.metrics.windows");
            long partitionSampleRetentionMs = (long)(numPartitionSampleWindows * 2) * partitionSampleWindowMs;
            partitionSampleRetentionMs = Math.max(this._minPartitionSampleStoreTopicRetentionTimeMs, partitionSampleRetentionMs);
            int numBrokerSampleWindows = (Integer)config.get("num.broker.metrics.windows");
            long brokerSampleRetentionMs = (long)(numBrokerSampleWindows * 2) * brokerSampleWindowMs;
            brokerSampleRetentionMs = Math.max(this._minBrokerSampleStoreTopicRetentionTimeMs, brokerSampleRetentionMs);
            NewTopic partitionSampleStoreNewTopic = KafkaCruiseControlUtils.wrapTopic(this._partitionMetricSampleStoreTopic, this._partitionSampleStoreTopicPartitionCount, replicationFactor, partitionSampleRetentionMs);
            NewTopic brokerSampleStoreNewTopic = KafkaCruiseControlUtils.wrapTopic(this._brokerMetricSampleStoreTopic, this._brokerSampleStoreTopicPartitionCount, replicationFactor, brokerSampleRetentionMs);
            this.ensureTopicCreated(adminClient, partitionSampleStoreNewTopic);
            this.ensureTopicCreated(adminClient, brokerSampleStoreNewTopic);
        }
        finally {
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(adminClient);
        }
    }

    protected void ensureTopicCreated(AdminClient adminClient, NewTopic sampleStoreTopic) {
        if (!KafkaCruiseControlUtils.createTopic(adminClient, sampleStoreTopic)) {
            KafkaCruiseControlUtils.maybeUpdateTopicConfig(adminClient, sampleStoreTopic);
            KafkaCruiseControlUtils.maybeIncreasePartitionCount(adminClient, sampleStoreTopic);
        }
    }

    @Override
    public void storeSamples(MetricSampler.Samples samples) {
        final AtomicInteger metricSampleCount = new AtomicInteger(0);
        for (final PartitionMetricSample sample : samples.partitionMetricSamples()) {
            this._producer.send(new ProducerRecord(this._partitionMetricSampleStoreTopic, null, Long.valueOf(sample.sampleTime()), null, (Object)sample.toBytes()), new Callback(){

                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        metricSampleCount.incrementAndGet();
                    } else {
                        LOG.error("Failed to produce partition metric sample for {} of timestamp {} due to exception", new Object[]{((PartitionEntity)sample.entity()).tp(), sample.sampleTime(), e});
                    }
                }
            });
        }
        final AtomicInteger brokerMetricSampleCount = new AtomicInteger(0);
        for (BrokerMetricSample sample : samples.brokerMetricSamples()) {
            this._producer.send(new ProducerRecord(this._brokerMetricSampleStoreTopic, (Object)sample.toBytes()), new Callback(){

                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        brokerMetricSampleCount.incrementAndGet();
                    } else {
                        LOG.error("Failed to produce model training sample due to exception", (Throwable)e);
                    }
                }
            });
        }
        this._producer.flush();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Stored {} partition metric samples and {} broker metric samples to Kafka", (Object)metricSampleCount.get(), (Object)brokerMetricSampleCount.get());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void loadSamples(SampleStore.SampleLoader sampleLoader) {
        LOG.info("Starting loading samples.");
        long startMs = System.currentTimeMillis();
        AtomicLong numPartitionMetricSamples = new AtomicLong(0L);
        AtomicLong numBrokerMetricSamples = new AtomicLong(0L);
        AtomicLong totalSamples = new AtomicLong(0L);
        AtomicLong numLoadedSamples = new AtomicLong(0L);
        try {
            this.prepareConsumers();
            for (KafkaConsumer<byte[], byte[]> consumer : this._consumers) {
                this._metricProcessorExecutor.submit(new MetricLoader(consumer, sampleLoader, numLoadedSamples, numPartitionMetricSamples, numBrokerMetricSamples, totalSamples));
            }
            this._metricProcessorExecutor.shutdown();
            this._metricProcessorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            LOG.error("Received exception when loading samples", (Throwable)e);
        }
        finally {
            this._consumers.forEach(Consumer::close);
            try {
                this._metricProcessorExecutor.awaitTermination(30000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                LOG.warn("Interrupted during waiting for metrics processor to shutdown.");
            }
        }
        long endMs = System.currentTimeMillis();
        long addedPartitionSampleCount = sampleLoader.partitionSampleCount();
        long addedBrokerSampleCount = sampleLoader.brokerSampleCount();
        long discardedPartitionMetricSamples = numPartitionMetricSamples.get() - addedPartitionSampleCount;
        long discardedBrokerMetricSamples = numBrokerMetricSamples.get() - addedBrokerSampleCount;
        LOG.info("Sample loading finished. Loaded {}{} partition metrics samples and {}{} broker metric samples in {} ms.", new Object[]{addedPartitionSampleCount, discardedPartitionMetricSamples > 0L ? String.format("(%d discarded)", discardedPartitionMetricSamples) : "", sampleLoader.brokerSampleCount(), discardedBrokerMetricSamples > 0L ? String.format("(%d discarded)", discardedBrokerMetricSamples) : "", endMs - startMs});
    }

    @Override
    public double sampleLoadingProgress() {
        return this._loadingProgress;
    }

    @Override
    public void evictSamplesBefore(long timestamp) {
    }

    @Override
    public void close() {
        this._shutdown = true;
        this._producer.close(PRODUCER_CLOSE_TIMEOUT);
        for (KafkaConsumer<byte[], byte[]> consumer : this._consumers) {
            consumer.close(CONSUMER_CLOSE_TIMEOUT);
        }
    }

    protected void prepareConsumers() {
        int numConsumers = this._consumers.size();
        ArrayList assignments = new ArrayList();
        for (int i = 0; i < numConsumers; ++i) {
            assignments.add(new ArrayList());
        }
        int j = 0;
        for (String topic : Arrays.asList(this._partitionMetricSampleStoreTopic, this._brokerMetricSampleStoreTopic)) {
            for (PartitionInfo partInfo : this._consumers.get(0).partitionsFor(topic)) {
                ((List)assignments.get(j++ % numConsumers)).add(new TopicPartition(partInfo.topic(), partInfo.partition()));
            }
        }
        for (int i = 0; i < numConsumers; ++i) {
            this._consumers.get(i).assign((Collection)assignments.get(i));
        }
    }

    protected class MetricLoader
    implements Runnable {
        protected final SampleStore.SampleLoader _sampleLoader;
        protected final AtomicLong _numLoadedSamples;
        protected final AtomicLong _numPartitionMetricSamples;
        protected final AtomicLong _numBrokerMetricSamples;
        protected final AtomicLong _totalSamples;
        protected final KafkaConsumer<byte[], byte[]> _consumer;

        MetricLoader(KafkaConsumer<byte[], byte[]> consumer, SampleStore.SampleLoader sampleLoader, AtomicLong numLoadedSamples, AtomicLong numPartitionMetricSamples, AtomicLong numBrokerMetricSamples, AtomicLong totalSamples) {
            this._consumer = consumer;
            this._sampleLoader = sampleLoader;
            this._numLoadedSamples = numLoadedSamples;
            this._numPartitionMetricSamples = numPartitionMetricSamples;
            this._numBrokerMetricSamples = numBrokerMetricSamples;
            this._totalSamples = totalSamples;
        }

        @Override
        public void run() {
            try {
                this.prepareConsumerOffset();
                Map beginningOffsets = this._consumer.beginningOffsets((Collection)this._consumer.assignment());
                Map endOffsets = this._consumer.endOffsets((Collection)this._consumer.assignment());
                LOG.debug("Loading beginning offsets: {}, loading end offsets: {}", (Object)beginningOffsets, (Object)endOffsets);
                for (Map.Entry entry : beginningOffsets.entrySet()) {
                    this._totalSamples.addAndGet((Long)endOffsets.get(entry.getKey()) - (Long)entry.getValue());
                    KafkaSampleStore.this._loadingProgress = (double)this._numLoadedSamples.get() / (double)this._totalSamples.get();
                }
                while (!this.sampleLoadingFinished(endOffsets)) {
                    try {
                        ConsumerRecords consumerRecords = this._consumer.poll(SAMPLE_POLL_TIMEOUT);
                        if (consumerRecords == SHUTDOWN_RECORDS) {
                            LOG.trace("Metric loader received empty records");
                            return;
                        }
                        HashSet<PartitionMetricSample> partitionMetricSamples = new HashSet<PartitionMetricSample>();
                        HashSet<BrokerMetricSample> brokerMetricSamples = new HashSet<BrokerMetricSample>();
                        for (ConsumerRecord record : consumerRecords) {
                            try {
                                MetricSample sample;
                                if (record.topic().equals(KafkaSampleStore.this._partitionMetricSampleStoreTopic)) {
                                    sample = PartitionMetricSample.fromBytes((byte[])record.value());
                                    partitionMetricSamples.add((PartitionMetricSample)sample);
                                    LOG.trace("Loaded partition metric sample {}", (Object)sample);
                                    continue;
                                }
                                if (!record.topic().equals(KafkaSampleStore.this._brokerMetricSampleStoreTopic)) continue;
                                sample = BrokerMetricSample.fromBytes((byte[])record.value());
                                sample.close(record.timestamp());
                                brokerMetricSamples.add((BrokerMetricSample)sample);
                                LOG.trace("Loaded broker metric sample {}", (Object)sample);
                            }
                            catch (UnknownVersionException e) {
                                LOG.warn("Ignoring sample due to", (Throwable)e);
                            }
                        }
                        if (partitionMetricSamples.isEmpty() && brokerMetricSamples.isEmpty()) continue;
                        this._sampleLoader.loadSamples(new MetricSampler.Samples(partitionMetricSamples, brokerMetricSamples));
                        this._numPartitionMetricSamples.getAndAdd(partitionMetricSamples.size());
                        this._numBrokerMetricSamples.getAndAdd(brokerMetricSamples.size());
                        KafkaSampleStore.this._loadingProgress = (double)this._numLoadedSamples.addAndGet(consumerRecords.count()) / (double)this._totalSamples.get();
                    }
                    catch (KafkaException ke) {
                        if (ke.getMessage().toLowerCase().contains("record is corrupt")) {
                            for (TopicPartition tp : this._consumer.assignment()) {
                                long position = this._consumer.position(tp);
                                if (position >= (Long)endOffsets.get(tp)) continue;
                                this._consumer.seek(tp, position + 1L);
                            }
                            continue;
                        }
                        LOG.error("Metric loader received exception:", (Throwable)ke);
                    }
                    catch (Exception e) {
                        if (KafkaSampleStore.this._shutdown) {
                            return;
                        }
                        LOG.error("Metric loader received exception:", (Throwable)e);
                    }
                }
                LOG.info("Metric loader finished loading samples.");
            }
            catch (Throwable t) {
                LOG.warn("Encountered error when loading sample from Kafka.", t);
            }
        }

        protected boolean sampleLoadingFinished(Map<TopicPartition, Long> endOffsets) {
            for (Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
                long position = this._consumer.position(entry.getKey());
                if (position >= entry.getValue()) continue;
                LOG.debug("Partition {} is still lagging. Current position: {}, LEO: {}", new Object[]{entry.getKey(), position, entry.getValue()});
                return false;
            }
            return true;
        }

        protected void prepareConsumerOffset() {
            HashMap<TopicPartition, Long> beginningTimestamp = new HashMap<TopicPartition, Long>(this._consumer.assignment().size());
            long currentTimeMs = System.currentTimeMillis();
            for (TopicPartition tp : this._consumer.assignment()) {
                if (tp.topic().equals(KafkaSampleStore.this._brokerMetricSampleStoreTopic)) {
                    beginningTimestamp.put(tp, currentTimeMs - this._sampleLoader.brokerMonitoringPeriodMs());
                    continue;
                }
                beginningTimestamp.put(tp, currentTimeMs - this._sampleLoader.partitionMonitoringPeriodMs());
            }
            HashSet<TopicPartition> partitionWithNoRecentMessage = new HashSet<TopicPartition>();
            Map beginningOffsetAndTimestamp = this._consumer.offsetsForTimes(beginningTimestamp);
            for (Map.Entry entry : beginningOffsetAndTimestamp.entrySet()) {
                if (entry.getValue() == null) {
                    partitionWithNoRecentMessage.add((TopicPartition)entry.getKey());
                    continue;
                }
                this._consumer.seek((TopicPartition)entry.getKey(), ((OffsetAndTimestamp)entry.getValue()).offset());
            }
            if (partitionWithNoRecentMessage.size() > 0) {
                this._consumer.seekToBeginning(partitionWithNoRecentMessage);
            }
        }
    }
}

