/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka;

import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.kafka.core.ConsumerFactory;

public class KafkaBinderHealthIndicator
implements HealthIndicator {
    private static final int DEFAULT_TIMEOUT = 60;
    private final KafkaMessageChannelBinder binder;
    private final ConsumerFactory<?, ?> consumerFactory;
    private int timeout = 60;
    private Consumer<?, ?> metadataConsumer;

    public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder, ConsumerFactory<?, ?> consumerFactory) {
        this.binder = binder;
        this.consumerFactory = consumerFactory;
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Health health() {
        ExecutorService exec = Executors.newSingleThreadExecutor();
        Future<Health> future = exec.submit(new Callable<Health>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Health call() {
                try {
                    KafkaBinderHealthIndicator kafkaBinderHealthIndicator;
                    if (KafkaBinderHealthIndicator.this.metadataConsumer == null) {
                        kafkaBinderHealthIndicator = KafkaBinderHealthIndicator.this;
                        synchronized (kafkaBinderHealthIndicator) {
                            if (KafkaBinderHealthIndicator.this.metadataConsumer == null) {
                                KafkaBinderHealthIndicator.this.metadataConsumer = KafkaBinderHealthIndicator.this.consumerFactory.createConsumer();
                            }
                        }
                    }
                    kafkaBinderHealthIndicator = KafkaBinderHealthIndicator.this.metadataConsumer;
                    synchronized (kafkaBinderHealthIndicator) {
                        HashSet<String> downMessages = new HashSet<String>();
                        for (String topic : KafkaBinderHealthIndicator.this.binder.getTopicsInUse().keySet()) {
                            List partitionInfos = KafkaBinderHealthIndicator.this.metadataConsumer.partitionsFor(topic);
                            for (PartitionInfo partitionInfo : partitionInfos) {
                                if (!KafkaBinderHealthIndicator.this.binder.getTopicsInUse().get(topic).getPartitionInfos().contains(partitionInfo) || partitionInfo.leader().id() != -1) continue;
                                downMessages.add(partitionInfo.toString());
                            }
                        }
                        if (downMessages.isEmpty()) {
                            return Health.up().build();
                        }
                        return Health.down().withDetail("Following partitions in use have no leaders: ", (Object)((Object)downMessages).toString()).build();
                    }
                }
                catch (Exception e) {
                    return Health.down((Exception)e).build();
                }
            }
        });
        try {
            Health health = future.get(this.timeout, TimeUnit.SECONDS);
            return health;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            Health health = Health.down().withDetail("Interrupted while waiting for partition information in", (Object)(this.timeout + " seconds")).build();
            return health;
        }
        catch (ExecutionException e) {
            Health health = Health.down((Exception)e).build();
            return health;
        }
        catch (TimeoutException e) {
            Health health = Health.down().withDetail("Failed to retrieve partition information in", (Object)(this.timeout + " seconds")).build();
            return health;
        }
        finally {
            exec.shutdownNow();
        }
    }
}

