/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka;

import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

public class KafkaBinderHealthIndicator
implements HealthIndicator,
DisposableBean {
    private static final int DEFAULT_TIMEOUT = 60;
    private final ExecutorService executor = Executors.newSingleThreadExecutor((ThreadFactory)new CustomizableThreadFactory("kafka-binder-health-"));
    private final KafkaMessageChannelBinder binder;
    private final ConsumerFactory<?, ?> consumerFactory;
    private int timeout = 60;
    private Consumer<?, ?> metadataConsumer;

    public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder, ConsumerFactory<?, ?> consumerFactory) {
        this.binder = binder;
        this.consumerFactory = consumerFactory;
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    public Health health() {
        Future<Health> future = this.executor.submit(this::buildHealthStatus);
        try {
            return future.get(this.timeout, TimeUnit.SECONDS);
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            return Health.down().withDetail("Interrupted while waiting for partition information in", (Object)(this.timeout + " seconds")).build();
        }
        catch (ExecutionException ex) {
            return Health.down((Exception)ex).build();
        }
        catch (TimeoutException ex) {
            return Health.down().withDetail("Failed to retrieve partition information in", (Object)(this.timeout + " seconds")).build();
        }
    }

    private synchronized Consumer<?, ?> initMetadataConsumer() {
        if (this.metadataConsumer == null) {
            this.metadataConsumer = this.consumerFactory.createConsumer();
        }
        return this.metadataConsumer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Health buildHealthStatus() {
        try {
            this.initMetadataConsumer();
            Consumer<?, ?> consumer = this.metadataConsumer;
            synchronized (consumer) {
                HashSet<String> downMessages = new HashSet<String>();
                Map<String, KafkaMessageChannelBinder.TopicInformation> topicsInUse = this.binder.getTopicsInUse();
                if (topicsInUse.isEmpty()) {
                    try {
                        this.metadataConsumer.listTopics(Duration.ofSeconds(this.timeout));
                    }
                    catch (Exception e) {
                        return Health.down().withDetail("No topic information available", (Object)"Kafka broker is not reachable").build();
                    }
                    return Health.unknown().withDetail("No bindings found", (Object)"Kafka binder may not be bound to destinations on the broker").build();
                }
                for (String topic : topicsInUse.keySet()) {
                    KafkaMessageChannelBinder.TopicInformation topicInformation = topicsInUse.get(topic);
                    if (topicInformation.isTopicPattern()) continue;
                    List partitionInfos = this.metadataConsumer.partitionsFor(topic);
                    for (PartitionInfo partitionInfo : partitionInfos) {
                        if (!topicInformation.getPartitionInfos().contains(partitionInfo) || partitionInfo.leader().id() != -1) continue;
                        downMessages.add(partitionInfo.toString());
                    }
                }
                if (downMessages.isEmpty()) {
                    return Health.up().build();
                }
                return Health.down().withDetail("Following partitions in use have no leaders: ", (Object)((Object)downMessages).toString()).build();
            }
        }
        catch (Exception ex) {
            return Health.down((Exception)ex).build();
        }
    }

    public void destroy() throws Exception {
        this.executor.shutdown();
    }
}

