/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.kafka.core.ConsumerFactory;

public class KafkaBinderHealthIndicator
implements HealthIndicator {
    private static final int DEFAULT_TIMEOUT = 60;
    private final KafkaMessageChannelBinder binder;
    private final ConsumerFactory<?, ?> consumerFactory;
    private int timeout = 60;
    private Consumer<?, ?> metadataConsumer;

    public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder, ConsumerFactory<?, ?> consumerFactory) {
        this.binder = binder;
        this.consumerFactory = consumerFactory;
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Health health() {
        ExecutorService exec = Executors.newSingleThreadExecutor();
        Future<Health> future = exec.submit(this::buildHealthStatus);
        try {
            Health health = future.get(this.timeout, TimeUnit.SECONDS);
            return health;
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            Health health = Health.down().withDetail("Interrupted while waiting for partition information in", (Object)(this.timeout + " seconds")).build();
            return health;
        }
        catch (ExecutionException ex) {
            Health health = Health.down((Exception)ex).build();
            return health;
        }
        catch (TimeoutException ex) {
            Health health = Health.down().withDetail("Failed to retrieve partition information in", (Object)(this.timeout + " seconds")).build();
            return health;
        }
        finally {
            exec.shutdownNow();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Health buildHealthStatus() {
        try {
            Consumer<?, ?> consumer;
            if (this.metadataConsumer == null) {
                consumer = this;
                synchronized (consumer) {
                    if (this.metadataConsumer == null) {
                        this.metadataConsumer = this.consumerFactory.createConsumer();
                    }
                }
            }
            consumer = this.metadataConsumer;
            synchronized (consumer) {
                HashSet<String> downMessages = new HashSet<String>();
                Map<String, KafkaMessageChannelBinder.TopicInformation> topicsInUse = this.binder.getTopicsInUse();
                if (topicsInUse.isEmpty()) {
                    return Health.down().withDetail("No topic information available", (Object)"Kafka broker is not reachable").build();
                }
                for (String topic : topicsInUse.keySet()) {
                    KafkaMessageChannelBinder.TopicInformation topicInformation = topicsInUse.get(topic);
                    if (topicInformation.isTopicPattern()) continue;
                    List partitionInfos = this.metadataConsumer.partitionsFor(topic);
                    for (PartitionInfo partitionInfo : partitionInfos) {
                        if (!topicInformation.getPartitionInfos().contains(partitionInfo) || partitionInfo.leader().id() != -1) continue;
                        downMessages.add(partitionInfo.toString());
                    }
                }
                if (downMessages.isEmpty()) {
                    return Health.up().build();
                }
                return Health.down().withDetail("Following partitions in use have no leaders: ", (Object)((Object)downMessages).toString()).build();
            }
        }
        catch (Exception ex) {
            return Health.down((Exception)ex).build();
        }
    }
}

