/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.pulsar.core;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.logging.LogFactory;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.pulsar.core.PulsarAdministrationOperations;
import org.springframework.pulsar.core.PulsarTopic;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

public class PulsarAdministration
implements ApplicationContextAware,
SmartInitializingSingleton,
PulsarAdministrationOperations {
    private final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
    private final PulsarAdminBuilder adminBuilder;
    private ApplicationContext applicationContext;

    public PulsarAdministration(Map<String, Object> adminConfig) {
        this.adminBuilder = PulsarAdmin.builder();
        this.loadConf(this.adminBuilder, adminConfig);
    }

    public PulsarAdministration(PulsarAdminBuilder adminBuilder) {
        this.adminBuilder = adminBuilder;
    }

    public void afterSingletonsInstantiated() {
        this.initialize();
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    private void loadConf(PulsarAdminBuilder builder, Map<String, Object> adminConfig) {
        HashMap<String, Object> conf = new HashMap<String, Object>(adminConfig);
        Object v = conf.remove("connectionTimeoutMs");
        if (v instanceof Integer) {
            Integer connectTimeout = (Integer)v;
            builder.connectionTimeout(connectTimeout.intValue(), TimeUnit.MILLISECONDS);
        }
        if ((v = conf.remove("readTimeoutMs")) instanceof Integer) {
            Integer readTimeout = (Integer)v;
            builder.readTimeout(readTimeout.intValue(), TimeUnit.MILLISECONDS);
        }
        if ((v = conf.remove("requestTimeoutMs")) instanceof Integer) {
            Integer requestTimeout = (Integer)v;
            builder.requestTimeout(requestTimeout.intValue(), TimeUnit.MILLISECONDS);
        }
        if ((v = conf.remove("autoCertRefreshSeconds")) instanceof Integer) {
            Integer autoCertRefreshTime = (Integer)v;
            builder.autoCertRefreshTime(autoCertRefreshTime.intValue(), TimeUnit.SECONDS);
        }
        builder.loadConf(conf);
        String authPluginClassName = (String)conf.get("authPluginClassName");
        String authParams = (String)conf.get("authParams");
        if (StringUtils.hasText((String)authPluginClassName) && StringUtils.hasText((String)authParams)) {
            try {
                builder.authentication(authPluginClassName, authParams);
            }
            catch (PulsarClientException.UnsupportedAuthenticationException ex) {
                throw new RuntimeException("Unable to create admin auth: " + ex.getMessage(), ex);
            }
        }
    }

    private void initialize() {
        Collection<PulsarTopic> topics = this.applicationContext.getBeansOfType(PulsarTopic.class, false, false).values();
        this.createOrModifyTopicsIfNeeded(topics);
    }

    private PulsarAdmin createAdminClient() throws PulsarClientException {
        return this.adminBuilder.build();
    }

    @Override
    public void createOrModifyTopics(PulsarTopic ... topics) {
        this.createOrModifyTopicsIfNeeded(Arrays.asList(topics));
    }

    private Map<String, List<PulsarTopic>> getTopicsPerNamespace(Collection<PulsarTopic> topics) {
        return topics.stream().collect(Collectors.groupingBy(this::getTopicNamespaceIdentifier));
    }

    private String getTopicNamespaceIdentifier(PulsarTopic topic) {
        return topic.getComponents().tenant() + "/" + topic.getComponents().namespace();
    }

    private List<String> getMatchingTopicPartitions(PulsarTopic topic, List<String> existingTopics) {
        return existingTopics.stream().filter(existing -> existing.startsWith(topic.getFullyQualifiedTopicName() + "-partition-")).toList();
    }

    private void createOrModifyTopicsIfNeeded(Collection<PulsarTopic> topics) {
        if (CollectionUtils.isEmpty(topics)) {
            return;
        }
        try (PulsarAdmin admin = this.createAdminClient();){
            this.doCreateOrModifyTopicsIfNeeded(admin, topics);
        }
        catch (PulsarClientException e) {
            throw new IllegalStateException("Could not create PulsarAdmin", e);
        }
    }

    private void doCreateOrModifyTopicsIfNeeded(PulsarAdmin admin, Collection<PulsarTopic> topics) {
        Map<String, List<PulsarTopic>> topicsPerNamespace = this.getTopicsPerNamespace(topics);
        HashSet topicsToCreate = new HashSet();
        HashSet topicsToModify = new HashSet();
        topicsPerNamespace.forEach((namespace, requestedTopics) -> {
            try {
                List existingTopicsInNamespace = admin.topics().getList(namespace);
                for (PulsarTopic topic : requestedTopics) {
                    if (topic.isPartitioned()) {
                        List<String> matchingPartitions = this.getMatchingTopicPartitions(topic, existingTopicsInNamespace);
                        if (matchingPartitions.isEmpty()) {
                            this.logger.debug(() -> "Topic " + topic.getFullyQualifiedTopicName() + " does not exist.");
                            topicsToCreate.add(topic);
                            continue;
                        }
                        int numberOfExistingPartitions = matchingPartitions.size();
                        if (numberOfExistingPartitions < topic.numberOfPartitions()) {
                            this.logger.debug(() -> "Topic " + topic.getFullyQualifiedTopicName() + " found with " + numberOfExistingPartitions + " partitions.");
                            topicsToModify.add(topic);
                            continue;
                        }
                        if (numberOfExistingPartitions <= topic.numberOfPartitions()) continue;
                        throw new IllegalStateException("Topic " + topic.getFullyQualifiedTopicName() + " found with " + numberOfExistingPartitions + " partitions. Needs to be deleted first.");
                    }
                    if (existingTopicsInNamespace.contains(topic.getFullyQualifiedTopicName())) continue;
                    this.logger.debug(() -> "Topic " + topic.getFullyQualifiedTopicName() + " does not exist.");
                    topicsToCreate.add(topic);
                }
                this.createTopics(admin, topicsToCreate);
                this.modifyTopics(admin, topicsToModify);
            }
            catch (PulsarAdminException e) {
                throw new RuntimeException(e);
            }
        });
    }

    private void createTopics(PulsarAdmin admin, Set<PulsarTopic> topicsToCreate) throws PulsarAdminException {
        this.logger.debug(() -> "Creating topics: " + topicsToCreate.stream().map(PulsarTopic::getFullyQualifiedTopicName).collect(Collectors.joining(",")));
        for (PulsarTopic topic : topicsToCreate) {
            if (topic.isPartitioned()) {
                admin.topics().createPartitionedTopic(topic.topicName(), topic.numberOfPartitions());
                continue;
            }
            admin.topics().createNonPartitionedTopic(topic.topicName());
        }
    }

    private void modifyTopics(PulsarAdmin admin, Set<PulsarTopic> topicsToModify) throws PulsarAdminException {
        this.logger.debug(() -> "Modifying topics: " + topicsToModify.stream().map(PulsarTopic::getFullyQualifiedTopicName).collect(Collectors.joining(",")));
        for (PulsarTopic topic : topicsToModify) {
            admin.topics().updatePartitionedTopic(topic.topicName(), topic.numberOfPartitions());
        }
    }
}

