/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kafka.spout.metrics2;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.kafka.spout.metrics2.KafkaOffsetTopicMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaOffsetPartitionMetrics<K, V>
implements MetricSet {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetPartitionMetrics.class);
    private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
    private final Supplier<Admin> adminSupplier;
    private TopicPartition topicPartition;
    private KafkaOffsetTopicMetrics topicMetrics;

    public KafkaOffsetPartitionMetrics(Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier, Supplier<Admin> adminSupplier, TopicPartition topicPartition, KafkaOffsetTopicMetrics topicMetrics) {
        this.offsetManagerSupplier = offsetManagerSupplier;
        this.adminSupplier = adminSupplier;
        this.topicPartition = topicPartition;
        this.topicMetrics = topicMetrics;
        LOG.info("Running KafkaOffsetMetricSet");
    }

    public Map<String, Metric> getMetrics() {
        HashMap<String, Metric> metrics = new HashMap<String, Metric>();
        String metricPath = this.topicPartition.topic() + "/partition_" + this.topicPartition.partition();
        Gauge<Long> spoutLagGauge = new Gauge<Long>(){

            public Long getValue() {
                Map<TopicPartition, Long> endOffsets = KafkaOffsetPartitionMetrics.this.getEndOffsets(Collections.singleton(KafkaOffsetPartitionMetrics.this.topicPartition));
                if (endOffsets == null || endOffsets.isEmpty()) {
                    LOG.error("Failed to get endOffsets from Kafka for topic partitions: {}.", (Object)KafkaOffsetPartitionMetrics.this.topicPartition);
                    return 0L;
                }
                OffsetManager offsetManager = KafkaOffsetPartitionMetrics.this.offsetManagerSupplier.get().get(KafkaOffsetPartitionMetrics.this.topicPartition);
                Long ret = endOffsets.get(KafkaOffsetPartitionMetrics.this.topicPartition) - offsetManager.getCommittedOffset();
                KafkaOffsetPartitionMetrics.this.topicMetrics.totalSpoutLag += ret.longValue();
                return ret;
            }
        };
        Gauge<Long> earliestTimeOffsetGauge = new Gauge<Long>(){

            public Long getValue() {
                Map<TopicPartition, Long> beginningOffsets = KafkaOffsetPartitionMetrics.this.getBeginningOffsets(Collections.singleton(KafkaOffsetPartitionMetrics.this.topicPartition));
                if (beginningOffsets == null || beginningOffsets.isEmpty()) {
                    LOG.error("Failed to get beginningOffsets from Kafka for topic partitions: {}.", (Object)KafkaOffsetPartitionMetrics.this.topicPartition);
                    return 0L;
                }
                Long ret = beginningOffsets.get(KafkaOffsetPartitionMetrics.this.topicPartition);
                KafkaOffsetPartitionMetrics.this.topicMetrics.totalEarliestTimeOffset += beginningOffsets.get(KafkaOffsetPartitionMetrics.this.topicPartition).longValue();
                return ret;
            }
        };
        Gauge<Long> latestTimeOffsetGauge = new Gauge<Long>(){

            public Long getValue() {
                Map<TopicPartition, Long> endOffsets = KafkaOffsetPartitionMetrics.this.getEndOffsets(Collections.singleton(KafkaOffsetPartitionMetrics.this.topicPartition));
                if (endOffsets == null || endOffsets.isEmpty()) {
                    LOG.error("Failed to get endOffsets from Kafka for topic partitions: {}.", (Object)KafkaOffsetPartitionMetrics.this.topicPartition);
                    return 0L;
                }
                Long ret = endOffsets.get(KafkaOffsetPartitionMetrics.this.topicPartition);
                KafkaOffsetPartitionMetrics.this.topicMetrics.totalLatestTimeOffset += ret.longValue();
                return ret;
            }
        };
        Gauge<Long> latestEmittedOffsetGauge = new Gauge<Long>(){

            public Long getValue() {
                OffsetManager offsetManager = KafkaOffsetPartitionMetrics.this.offsetManagerSupplier.get().get(KafkaOffsetPartitionMetrics.this.topicPartition);
                Long ret = offsetManager.getLatestEmittedOffset();
                KafkaOffsetPartitionMetrics.this.topicMetrics.totalLatestEmittedOffset += ret.longValue();
                return ret;
            }
        };
        Gauge<Long> latestCompletedOffsetGauge = new Gauge<Long>(){

            public Long getValue() {
                OffsetManager offsetManager = KafkaOffsetPartitionMetrics.this.offsetManagerSupplier.get().get(KafkaOffsetPartitionMetrics.this.topicPartition);
                Long ret = offsetManager.getCommittedOffset();
                KafkaOffsetPartitionMetrics.this.topicMetrics.totalLatestCompletedOffset += ret.longValue();
                return ret;
            }
        };
        Gauge<Long> recordsInPartitionGauge = new Gauge<Long>(){

            public Long getValue() {
                Map<TopicPartition, Long> endOffsets = KafkaOffsetPartitionMetrics.this.getEndOffsets(Collections.singleton(KafkaOffsetPartitionMetrics.this.topicPartition));
                if (endOffsets == null || endOffsets.isEmpty()) {
                    LOG.error("Failed to get endOffsets from Kafka for topic partitions: {}.", (Object)KafkaOffsetPartitionMetrics.this.topicPartition);
                    return 0L;
                }
                Map<TopicPartition, Long> beginningOffsets = KafkaOffsetPartitionMetrics.this.getBeginningOffsets(Collections.singleton(KafkaOffsetPartitionMetrics.this.topicPartition));
                if (beginningOffsets == null || beginningOffsets.isEmpty()) {
                    LOG.error("Failed to get beginningOffsets from Kafka for topic partitions: {}.", (Object)KafkaOffsetPartitionMetrics.this.topicPartition);
                    return 0L;
                }
                Long ret = endOffsets.get(KafkaOffsetPartitionMetrics.this.topicPartition) - beginningOffsets.get(KafkaOffsetPartitionMetrics.this.topicPartition);
                KafkaOffsetPartitionMetrics.this.topicMetrics.totalRecordsInPartitions += ret.longValue();
                return ret;
            }
        };
        metrics.put(metricPath + "/spoutLag", (Metric)spoutLagGauge);
        metrics.put(metricPath + "/earliestTimeOffset", (Metric)earliestTimeOffsetGauge);
        metrics.put(metricPath + "/latestTimeOffset", (Metric)latestTimeOffsetGauge);
        metrics.put(metricPath + "/latestEmittedOffset", (Metric)latestEmittedOffsetGauge);
        metrics.put(metricPath + "/latestCompletedOffset", (Metric)latestCompletedOffsetGauge);
        metrics.put(metricPath + "/recordsInPartition", (Metric)recordsInPartitionGauge);
        return metrics;
    }

    private Map<TopicPartition, Long> getBeginningOffsets(Set<TopicPartition> topicPartitions) {
        Map<TopicPartition, Long> beginningOffsets;
        Admin admin = this.adminSupplier.get();
        if (admin == null) {
            LOG.error("Kafka admin object is null, returning 0.");
            return Collections.EMPTY_MAP;
        }
        try {
            beginningOffsets = KafkaOffsetPartitionMetrics.getOffsets(admin, topicPartitions, OffsetSpec.earliest());
        }
        catch (InterruptedException | ExecutionException | RetriableException e) {
            LOG.error("Failed to get offset from Kafka for topic partitions: {}.", (Object)this.topicPartition, (Object)e);
            return Collections.EMPTY_MAP;
        }
        return beginningOffsets;
    }

    private Map<TopicPartition, Long> getEndOffsets(Set<TopicPartition> topicPartitions) {
        Map<TopicPartition, Long> endOffsets;
        Admin admin = this.adminSupplier.get();
        if (admin == null) {
            LOG.error("Kafka admin object is null, returning 0.");
            return Collections.EMPTY_MAP;
        }
        try {
            endOffsets = KafkaOffsetPartitionMetrics.getOffsets(admin, topicPartitions, OffsetSpec.latest());
        }
        catch (InterruptedException | ExecutionException | RetriableException e) {
            LOG.error("Failed to get offset from Kafka for topic partitions: {}.", (Object)this.topicPartition, (Object)e);
            return Collections.EMPTY_MAP;
        }
        return endOffsets;
    }

    private static Map<TopicPartition, Long> getOffsets(Admin admin, Set<TopicPartition> topicPartitions, OffsetSpec offsetSpec) throws InterruptedException, ExecutionException {
        HashMap<TopicPartition, OffsetSpec> offsetSpecMap = new HashMap<TopicPartition, OffsetSpec>();
        for (TopicPartition topicPartition : topicPartitions) {
            offsetSpecMap.put(topicPartition, offsetSpec);
        }
        HashMap<TopicPartition, Long> ret = new HashMap<TopicPartition, Long>();
        ListOffsetsResult listOffsetsResult = admin.listOffsets(offsetSpecMap);
        KafkaFuture<Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>> all = listOffsetsResult.all();
        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> topicPartitionListOffsetsResultInfoMap = all.get();
        for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> entry : topicPartitionListOffsetsResultInfoMap.entrySet()) {
            ret.put(entry.getKey(), entry.getValue().offset());
        }
        return ret;
    }
}

