/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.pulsar.core;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.core.PulsarAdminBuilderCustomizer;
import org.springframework.pulsar.core.PulsarAdministrationOperations;
import org.springframework.pulsar.core.PulsarTopic;
import org.springframework.util.CollectionUtils;

public class PulsarAdministration
implements ApplicationContextAware,
SmartInitializingSingleton,
PulsarAdministrationOperations {
    private final LogAccessor logger = new LogAccessor(this.getClass());
    @Nullable
    private ApplicationContext applicationContext;
    @Nullable
    private final List<PulsarAdminBuilderCustomizer> adminCustomizers;
    @Nullable
    private PulsarAdminBuilder adminBuilder;

    public PulsarAdministration(String serviceHttpUrl) {
        this((PulsarAdminBuilder adminBuilder) -> adminBuilder.serviceHttpUrl(serviceHttpUrl));
    }

    public PulsarAdministration(@Nullable PulsarAdminBuilderCustomizer adminCustomizer) {
        this(adminCustomizer != null ? List.of(adminCustomizer) : Collections.emptyList());
    }

    public PulsarAdministration(List<PulsarAdminBuilderCustomizer> adminCustomizers) {
        this.adminCustomizers = adminCustomizers;
    }

    public void afterSingletonsInstantiated() {
        this.initialize();
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    private void initialize() {
        Collection<PulsarTopic> topics = Objects.requireNonNull(this.applicationContext, "Application context was not set").getBeansOfType(PulsarTopic.class, false, false).values();
        this.createOrModifyTopicsIfNeeded(topics);
    }

    public PulsarAdmin createAdminClient() throws PulsarClientException {
        if (this.adminBuilder == null) {
            this.adminBuilder = PulsarAdmin.builder();
        }
        this.adminCustomizers.forEach(adminCustomizer -> adminCustomizer.customize(this.adminBuilder));
        return this.adminBuilder.build();
    }

    void setAdminBuilder(PulsarAdminBuilder adminBuilder) {
        this.adminBuilder = adminBuilder;
    }

    @Override
    public void createOrModifyTopics(PulsarTopic ... topics) {
        this.createOrModifyTopicsIfNeeded(Arrays.asList(topics));
    }

    private Map<String, List<PulsarTopic>> getTopicsPerNamespace(Collection<PulsarTopic> topics) {
        return topics.stream().collect(Collectors.groupingBy(this::getTopicNamespaceIdentifier));
    }

    private String getTopicNamespaceIdentifier(PulsarTopic topic) {
        PulsarTopic.TopicComponents components = topic.getComponents();
        return components.tenant() + "/" + components.namespace();
    }

    private List<String> getMatchingTopicPartitions(PulsarTopic topic, List<String> existingTopics) {
        return existingTopics.stream().filter(existing -> existing.startsWith(topic.topicName() + "-partition-")).toList();
    }

    private void createOrModifyTopicsIfNeeded(Collection<PulsarTopic> topics) {
        if (CollectionUtils.isEmpty(topics)) {
            return;
        }
        try (PulsarAdmin admin = this.createAdminClient();){
            this.doCreateOrModifyTopicsIfNeeded(admin, topics);
        }
        catch (PulsarClientException e) {
            throw new IllegalStateException("Could not create PulsarAdmin", e);
        }
    }

    private void doCreateOrModifyTopicsIfNeeded(PulsarAdmin admin, Collection<PulsarTopic> topics) {
        Map<String, List<PulsarTopic>> topicsPerNamespace = this.getTopicsPerNamespace(topics);
        topicsPerNamespace.forEach((namespace, requestedTopics) -> {
            HashSet<PulsarTopic> topicsToCreate = new HashSet<PulsarTopic>();
            HashSet<PulsarTopic> topicsToModify = new HashSet<PulsarTopic>();
            try {
                List existingTopicsInNamespace = admin.topics().getList(namespace);
                for (PulsarTopic topic : requestedTopics) {
                    List<String> matchingPartitions;
                    String topicName = topic.topicName();
                    if (topic.isPartitioned()) {
                        if (existingTopicsInNamespace.contains(topicName)) {
                            throw new IllegalStateException("Topic '%s' already exists un-partitioned - needs to be deleted first".formatted(topicName));
                        }
                        matchingPartitions = this.getMatchingTopicPartitions(topic, existingTopicsInNamespace);
                        if (matchingPartitions.isEmpty()) {
                            this.logger.debug(() -> "Topic '%s' does not yet exist - will add".formatted(topicName));
                            topicsToCreate.add(topic);
                            continue;
                        }
                        int numberOfExistingPartitions = matchingPartitions.size();
                        if (numberOfExistingPartitions < topic.numberOfPartitions()) {
                            this.logger.debug(() -> "Topic '%s' found with %d partitions - will update to %d".formatted(topicName, numberOfExistingPartitions, topic.numberOfPartitions()));
                            topicsToModify.add(topic);
                            continue;
                        }
                        if (numberOfExistingPartitions <= topic.numberOfPartitions()) continue;
                        throw new IllegalStateException("Topic '%s' found w/ %d partitions but can't shrink to %d - needs to be deleted first".formatted(topicName, numberOfExistingPartitions, topic.numberOfPartitions()));
                    }
                    matchingPartitions = this.getMatchingTopicPartitions(topic, existingTopicsInNamespace);
                    if (!matchingPartitions.isEmpty()) {
                        throw new IllegalStateException("Topic '%s' already exists partitioned - needs to be deleted first".formatted(topicName));
                    }
                    if (existingTopicsInNamespace.contains(topicName)) continue;
                    this.logger.debug(() -> "Topic '%s' does not yet exist - will add".formatted(topicName));
                    topicsToCreate.add(topic);
                }
                this.createTopics(admin, topicsToCreate);
                this.modifyTopics(admin, topicsToModify);
            }
            catch (PulsarAdminException e) {
                throw new RuntimeException(e);
            }
        });
    }

    private void createTopics(PulsarAdmin admin, Set<PulsarTopic> topicsToCreate) throws PulsarAdminException {
        this.logger.debug(() -> "Creating topics: " + topicsToCreate.stream().map(PulsarTopic::topicName).collect(Collectors.joining(",")));
        for (PulsarTopic topic : topicsToCreate) {
            if (topic.isPartitioned()) {
                admin.topics().createPartitionedTopic(topic.topicName(), topic.numberOfPartitions());
                continue;
            }
            admin.topics().createNonPartitionedTopic(topic.topicName());
        }
    }

    private void modifyTopics(PulsarAdmin admin, Set<PulsarTopic> topicsToModify) throws PulsarAdminException {
        this.logger.debug(() -> "Modifying topics: " + topicsToModify.stream().map(PulsarTopic::topicName).collect(Collectors.joining(",")));
        for (PulsarTopic topic : topicsToModify) {
            admin.topics().updatePartitionedTopic(topic.topicName(), topic.numberOfPartitions());
        }
    }
}

