/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka;

import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.actuate.endpoint.PublicMetrics;
import org.springframework.boot.actuate.metrics.Metric;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.util.ObjectUtils;

public class KafkaBinderMetrics
implements PublicMetrics {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaBinderMetrics.class);
    static final String METRIC_PREFIX = "spring.cloud.stream.binder.kafka";
    private final KafkaMessageChannelBinder binder;
    private final KafkaBinderConfigurationProperties binderConfigurationProperties;
    private ConsumerFactory<?, ?> defaultConsumerFactory;

    public KafkaBinderMetrics(KafkaMessageChannelBinder binder, KafkaBinderConfigurationProperties binderConfigurationProperties, ConsumerFactory<?, ?> defaultConsumerFactory) {
        this.binder = binder;
        this.binderConfigurationProperties = binderConfigurationProperties;
        this.defaultConsumerFactory = defaultConsumerFactory;
    }

    public KafkaBinderMetrics(KafkaMessageChannelBinder binder, KafkaBinderConfigurationProperties binderConfigurationProperties) {
        this(binder, binderConfigurationProperties, null);
    }

    public Collection<Metric<?>> metrics() {
        LinkedList metrics = new LinkedList();
        for (Map.Entry<String, KafkaMessageChannelBinder.TopicInformation> topicInfo : this.binder.getTopicsInUse().entrySet()) {
            if (!topicInfo.getValue().isConsumerTopic()) continue;
            String topic = topicInfo.getKey();
            String group = topicInfo.getValue().getConsumerGroup();
            try {
                Consumer metadataConsumer = this.createConsumerFactory(group).createConsumer();
                Throwable throwable = null;
                try {
                    List partitionInfos = metadataConsumer.partitionsFor(topic);
                    LinkedList<TopicPartition> topicPartitions = new LinkedList<TopicPartition>();
                    for (PartitionInfo partitionInfo : partitionInfos) {
                        topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
                    }
                    Map endOffsets = metadataConsumer.endOffsets(topicPartitions);
                    long lag = 0L;
                    for (Map.Entry endOffset : endOffsets.entrySet()) {
                        OffsetAndMetadata current = metadataConsumer.committed((TopicPartition)endOffset.getKey());
                        if (current != null) {
                            lag += (Long)endOffset.getValue() - current.offset();
                            continue;
                        }
                        lag += ((Long)endOffset.getValue()).longValue();
                    }
                    metrics.add(new Metric(String.format("%s.%s.%s.lag", METRIC_PREFIX, group, topic), (Number)lag));
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (metadataConsumer == null) continue;
                    if (throwable != null) {
                        try {
                            metadataConsumer.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    metadataConsumer.close();
                }
            }
            catch (Exception e) {
                LOG.debug("Cannot generate metric for topic: " + topic, (Throwable)e);
            }
        }
        return metrics;
    }

    private ConsumerFactory<?, ?> createConsumerFactory(String group) {
        if (this.defaultConsumerFactory != null) {
            return this.defaultConsumerFactory;
        }
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("key.deserializer", ByteArrayDeserializer.class);
        props.put("value.deserializer", ByteArrayDeserializer.class);
        if (!ObjectUtils.isEmpty((Object)this.binderConfigurationProperties.getConsumerConfiguration())) {
            props.putAll(this.binderConfigurationProperties.getConsumerConfiguration());
        }
        if (!props.containsKey("bootstrap.servers")) {
            props.put("bootstrap.servers", this.binderConfigurationProperties.getKafkaConnectionString());
        }
        props.put("group.id", group);
        return new DefaultKafkaConsumerFactory(props);
    }
}

