/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.commandrouter.impl.kafka;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.kafka.admin.KafkaAdminClient;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.KafkaAdminClientConfigProperties;
import org.eclipse.hono.client.kafka.KafkaClientFactory;
import org.eclipse.hono.commandrouter.AdapterInstanceStatusService;
import org.eclipse.hono.util.LifecycleStatus;
import org.eclipse.microprofile.health.HealthCheckResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InternalKafkaTopicCleanupService
extends AbstractVerticle {
    private static final Logger LOG = LoggerFactory.getLogger(InternalKafkaTopicCleanupService.class);
    private static final long CHECK_INTERVAL_MILLIS = TimeUnit.MINUTES.toMillis(10L);
    private static final String CLIENT_NAME = "internal-topic-cleanup";
    private static final Pattern INTERNAL_COMMAND_TOPIC_PATTERN = Pattern.compile(Pattern.quote(HonoTopic.Type.COMMAND_INTERNAL.prefix) + "(.+)");
    private final AdapterInstanceStatusService adapterInstanceStatusService;
    private final Supplier<Future<KafkaAdminClient>> kafkaAdminClientCreator;
    private final Set<String> topicsToDelete = new HashSet<String>();
    private final LifecycleStatus lifecycleStatus = new LifecycleStatus();
    private KafkaAdminClient adminClient;
    private long timerId;

    public InternalKafkaTopicCleanupService(AdapterInstanceStatusService adapterInstanceStatusService, KafkaAdminClientConfigProperties adminClientConfigProperties) {
        this.adapterInstanceStatusService = Objects.requireNonNull(adapterInstanceStatusService);
        Objects.requireNonNull(adminClientConfigProperties);
        Map adminClientConfig = adminClientConfigProperties.getAdminClientConfig(CLIENT_NAME);
        this.kafkaAdminClientCreator = () -> {
            KafkaClientFactory kafkaClientFactory = new KafkaClientFactory(this.vertx);
            return kafkaClientFactory.createKafkaAdminClientWithRetries(adminClientConfig, () -> ((LifecycleStatus)this.lifecycleStatus).isStarting(), KafkaClientFactory.UNLIMITED_RETRIES_DURATION);
        };
    }

    InternalKafkaTopicCleanupService(AdapterInstanceStatusService adapterInstanceStatusService, KafkaAdminClient kafkaAdminClient) {
        this.adapterInstanceStatusService = Objects.requireNonNull(adapterInstanceStatusService);
        Objects.requireNonNull(kafkaAdminClient);
        this.kafkaAdminClientCreator = () -> Future.succeededFuture((Object)kafkaAdminClient);
    }

    public final void addOnServiceReadyHandler(Handler<AsyncResult<Void>> handler) {
        if (handler != null) {
            this.lifecycleStatus.addOnStartedHandler(handler);
        }
    }

    public HealthCheckResponse checkReadiness() {
        return HealthCheckResponse.builder().name("Kafka-topic-cleanup-service").status(this.lifecycleStatus.isStarted()).build();
    }

    public void start() {
        if (this.lifecycleStatus.isStarting()) {
            return;
        }
        if (!this.lifecycleStatus.setStarting()) {
            throw new IllegalStateException("client is already started/stopping");
        }
        this.kafkaAdminClientCreator.get().onSuccess(client -> {
            this.adminClient = client;
            this.timerId = this.vertx.setPeriodic(CHECK_INTERVAL_MILLIS, tid -> this.performCleanup());
            LOG.info("started InternalKafkaTopicCleanupService");
            this.lifecycleStatus.setStarted();
        });
    }

    protected final void performCleanup() {
        if (this.topicsToDelete.isEmpty()) {
            this.determineToBeDeletedTopics();
        } else {
            this.adminClient.listTopics().onFailure(thr -> LOG.warn("error listing topics", thr)).onSuccess(allTopics -> {
                List existingTopicsToDelete = this.topicsToDelete.stream().filter(allTopics::contains).collect(Collectors.toList());
                if (existingTopicsToDelete.isEmpty()) {
                    this.topicsToDelete.clear();
                    this.determineToBeDeletedTopics((Set<String>)allTopics);
                } else {
                    this.adminClient.deleteTopics(existingTopicsToDelete).onSuccess(v -> LOG.info("triggered deletion of {} topics ({})", (Object)existingTopicsToDelete.size(), (Object)existingTopicsToDelete)).onFailure(thr -> {
                        if (thr instanceof UnknownTopicOrPartitionException) {
                            LOG.info("triggered deletion of {} topics, some had already been deleted ({})", (Object)existingTopicsToDelete.size(), (Object)existingTopicsToDelete);
                        } else {
                            LOG.warn("error deleting topics {}", (Object)existingTopicsToDelete, thr);
                        }
                    }).onComplete(ar -> {
                        this.topicsToDelete.clear();
                        this.determineToBeDeletedTopics();
                    });
                }
            });
        }
    }

    private void determineToBeDeletedTopics() {
        this.adminClient.listTopics().onSuccess(this::determineToBeDeletedTopics).onFailure(thr -> LOG.warn("error listing topics", thr));
    }

    private void determineToBeDeletedTopics(Set<String> allTopics) {
        HashMap<String, String> adapterInstanceIdToTopicMap = new HashMap<String, String>();
        for (String topic : allTopics) {
            Matcher matcher = INTERNAL_COMMAND_TOPIC_PATTERN.matcher(topic);
            if (!matcher.matches()) continue;
            String adapterInstanceId = matcher.group(1);
            adapterInstanceIdToTopicMap.put(adapterInstanceId, topic);
        }
        this.adapterInstanceStatusService.getDeadAdapterInstances(adapterInstanceIdToTopicMap.keySet()).onFailure(thr -> LOG.warn("error determining dead adapter instances", thr)).onSuccess(deadAdapterInstances -> {
            deadAdapterInstances.forEach(id -> this.topicsToDelete.add((String)adapterInstanceIdToTopicMap.get(id)));
            if (this.topicsToDelete.isEmpty()) {
                LOG.debug("found no topics to be deleted; no. of checked topics: {}", (Object)adapterInstanceIdToTopicMap.size());
            } else {
                LOG.info("marking topics as to be deleted on next run {}", this.topicsToDelete);
            }
        });
    }

    public void stop(Promise<Void> stopResult) {
        this.lifecycleStatus.runStopAttempt(() -> {
            this.vertx.cancelTimer(this.timerId);
            return Optional.ofNullable(this.adminClient).map(KafkaAdminClient::close).orElseGet(Future::succeededFuture).onFailure(thr -> LOG.warn("error closing admin client", thr));
        }).onComplete(stopResult);
    }
}

