/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.TimeGauge;
import io.micrometer.core.instrument.binder.MeterBinder;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.springframework.cloud.stream.binder.BindingCreatedEvent;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.context.ApplicationListener;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.ObjectUtils;

public class KafkaBinderMetrics
implements MeterBinder,
ApplicationListener<BindingCreatedEvent> {
    private static final int DEFAULT_TIMEOUT = 60;
    private static final Log LOG = LogFactory.getLog(KafkaBinderMetrics.class);
    static final String METRIC_NAME = "spring.cloud.stream.binder.kafka.offset";
    private final KafkaMessageChannelBinder binder;
    private final KafkaBinderConfigurationProperties binderConfigurationProperties;
    private ConsumerFactory<?, ?> defaultConsumerFactory;
    private final MeterRegistry meterRegistry;
    private Consumer<?, ?> metadataConsumer;
    private int timeout = 60;

    public KafkaBinderMetrics(KafkaMessageChannelBinder binder, KafkaBinderConfigurationProperties binderConfigurationProperties, ConsumerFactory<?, ?> defaultConsumerFactory, @Nullable MeterRegistry meterRegistry) {
        this.binder = binder;
        this.binderConfigurationProperties = binderConfigurationProperties;
        this.defaultConsumerFactory = defaultConsumerFactory;
        this.meterRegistry = meterRegistry;
    }

    public KafkaBinderMetrics(KafkaMessageChannelBinder binder, KafkaBinderConfigurationProperties binderConfigurationProperties) {
        this(binder, binderConfigurationProperties, null, null);
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    public void bindTo(MeterRegistry registry) {
        for (Map.Entry<String, KafkaMessageChannelBinder.TopicInformation> topicInfo : this.binder.getTopicsInUse().entrySet()) {
            if (!topicInfo.getValue().isConsumerTopic()) continue;
            String topic = topicInfo.getKey();
            String group = topicInfo.getValue().getConsumerGroup();
            TimeGauge.builder((String)METRIC_NAME, (Object)this, (TimeUnit)TimeUnit.MILLISECONDS, o -> this.calculateConsumerLagOnTopic(topic, group)).tag("group", group).tag("topic", topic).description("Consumer lag for a particular group and topic").register(registry);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private double calculateConsumerLagOnTopic(String topic, String group) {
        ExecutorService exec = Executors.newSingleThreadExecutor();
        Future<Long> future = exec.submit(() -> {
            long lag = 0L;
            try {
                Consumer<?, ?> consumer;
                if (this.metadataConsumer == null) {
                    consumer = this;
                    synchronized (consumer) {
                        if (this.metadataConsumer == null) {
                            this.metadataConsumer = super.createConsumerFactory(group).createConsumer();
                        }
                    }
                }
                consumer = this.metadataConsumer;
                synchronized (consumer) {
                    List partitionInfos = this.metadataConsumer.partitionsFor(topic);
                    LinkedList<TopicPartition> topicPartitions = new LinkedList<TopicPartition>();
                    for (PartitionInfo partitionInfo : partitionInfos) {
                        topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
                    }
                    Map endOffsets = this.metadataConsumer.endOffsets(topicPartitions);
                    for (Map.Entry endOffset : endOffsets.entrySet()) {
                        OffsetAndMetadata current = this.metadataConsumer.committed((TopicPartition)endOffset.getKey());
                        if (current != null) {
                            lag += (Long)endOffset.getValue() - current.offset();
                            continue;
                        }
                        lag += ((Long)endOffset.getValue()).longValue();
                    }
                }
            }
            catch (Exception e) {
                LOG.debug((Object)("Cannot generate metric for topic: " + topic), (Throwable)e);
            }
            return lag;
        });
        try {
            double d = future.get(this.timeout, TimeUnit.SECONDS).longValue();
            return d;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            double d = 0.0;
            return d;
        }
        catch (ExecutionException | TimeoutException e) {
            double d = 0.0;
            return d;
        }
        finally {
            exec.shutdownNow();
        }
    }

    private ConsumerFactory<?, ?> createConsumerFactory(String group) {
        if (this.defaultConsumerFactory == null) {
            HashMap<String, Object> props = new HashMap<String, Object>();
            props.put("key.deserializer", ByteArrayDeserializer.class);
            props.put("value.deserializer", ByteArrayDeserializer.class);
            if (!ObjectUtils.isEmpty((Object)this.binderConfigurationProperties.getConsumerConfiguration())) {
                props.putAll(this.binderConfigurationProperties.getConsumerConfiguration());
            }
            if (!props.containsKey("bootstrap.servers")) {
                props.put("bootstrap.servers", this.binderConfigurationProperties.getKafkaConnectionString());
            }
            props.put("group.id", group);
            this.defaultConsumerFactory = new DefaultKafkaConsumerFactory(props);
        }
        return this.defaultConsumerFactory;
    }

    public void onApplicationEvent(BindingCreatedEvent event) {
        if (this.meterRegistry != null) {
            this.bindTo(this.meterRegistry);
        }
    }
}

