/*
 * Decompiled with CFR 0.152.
 */
package com.icthh.xm.commons.topic;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.icthh.xm.commons.config.client.api.RefreshableConfiguration;
import com.icthh.xm.commons.logging.trace.SleuthWrapper;
import com.icthh.xm.commons.topic.config.MessageListenerContainerBuilder;
import com.icthh.xm.commons.topic.domain.ConsumerHolder;
import com.icthh.xm.commons.topic.domain.TopicConfig;
import com.icthh.xm.commons.topic.domain.TopicConsumersSpec;
import com.icthh.xm.commons.topic.message.MessageHandler;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.stereotype.Component;
import org.springframework.util.AntPathMatcher;

@Component
public class TopicManager
implements RefreshableConfiguration {
    private static final Logger log = LoggerFactory.getLogger(TopicManager.class);
    private static final String TENANT_NAME = "tenant";
    private AntPathMatcher matcher = new AntPathMatcher();
    private ObjectMapper ymlMapper = new ObjectMapper((JsonFactory)new YAMLFactory());
    private final String configPath;
    private final SleuthWrapper sleuthWrapper;
    private Map<String, Map<String, ConsumerHolder>> tenantTopicConsumers = new ConcurrentHashMap<String, Map<String, ConsumerHolder>>();
    private final KafkaProperties kafkaProperties;
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final MessageHandler messageHandler;

    public TopicManager(@Value(value="${spring.application.name}") String appName, KafkaProperties kafkaProperties, KafkaTemplate<String, String> kafkaTemplate, MessageHandler messageHandler, SleuthWrapper sleuthWrapper) {
        this.kafkaProperties = kafkaProperties;
        this.kafkaTemplate = kafkaTemplate;
        this.messageHandler = messageHandler;
        this.sleuthWrapper = sleuthWrapper;
        this.configPath = "/config/tenants/{tenant}/" + appName + "/topic-consumers.yml";
    }

    public void onRefresh(String updatedKey, String config) {
        this.refreshConfig(updatedKey, config);
    }

    public boolean isListeningConfiguration(String updatedKey) {
        return this.matcher.match(this.configPath, updatedKey);
    }

    public void onInit(String configKey, String configValue) {
        if (this.isListeningConfiguration(configKey)) {
            this.refreshConfig(configKey, configValue);
        }
    }

    private void refreshConfig(String updatedKey, String config) {
        String tenantKey = this.extractTenant(updatedKey);
        Map<String, ConsumerHolder> existingConsumers = this.getTenantConsumers(tenantKey);
        if (StringUtils.isEmpty((CharSequence)config)) {
            this.stopAllTenantConsumers(tenantKey, existingConsumers);
            return;
        }
        TopicConsumersSpec spec = this.readSpec(updatedKey, config);
        if (spec == null) {
            log.warn("Skip processing of configuration: [{}]. Specification is null", (Object)updatedKey);
            return;
        }
        List<TopicConfig> forUpdate = spec.getTopics();
        forUpdate.forEach(topicConfig -> this.processTopicConfig(tenantKey, (TopicConfig)topicConfig, existingConsumers));
        this.removeOldConsumers(tenantKey, forUpdate, existingConsumers);
        this.tenantTopicConsumers.put(tenantKey, existingConsumers);
    }

    private void processTopicConfig(String tenantKey, TopicConfig topicConfig, Map<String, ConsumerHolder> existingConsumers) {
        String topicConfigKey = topicConfig.getKey();
        ConsumerHolder existingConfig = existingConsumers.get(topicConfigKey);
        if (topicConfig.getMaxPollInterval() != null && topicConfig.getBackOffPeriod() > (long)topicConfig.getMaxPollInterval().intValue()) {
            log.error("Consumer was not created, backOffPeriod is greater than maxPollInterval, topicConfig: [{}]", (Object)topicConfig);
            return;
        }
        if (existingConfig == null) {
            this.startNewConsumer(tenantKey, topicConfig, existingConsumers);
            return;
        }
        if (existingConfig.getTopicConfig().equals(topicConfig)) {
            log.info("[{}] Skip consumer configuration due to no changes found: [{}] ", (Object)tenantKey, (Object)topicConfig);
            return;
        }
        this.updateConsumer(tenantKey, topicConfig, existingConfig, existingConsumers);
    }

    private void startNewConsumer(String tenantKey, TopicConfig topicConfig, Map<String, ConsumerHolder> existingConsumers) {
        this.withLog(tenantKey, "startNewConsumer", () -> {
            AbstractMessageListenerContainer container = this.buildListenerContainer(tenantKey, topicConfig);
            container.start();
            existingConsumers.put(topicConfig.getKey(), new ConsumerHolder(topicConfig, container));
        }, "{}", topicConfig);
    }

    private void updateConsumer(String tenantKey, TopicConfig topicConfig, ConsumerHolder existingConfig, Map<String, ConsumerHolder> existingConsumers) {
        this.withLog(tenantKey, "restartConsumer", () -> {
            existingConfig.getContainer().stop();
            AbstractMessageListenerContainer container = this.buildListenerContainer(tenantKey, topicConfig);
            container.start();
            existingConsumers.put(topicConfig.getKey(), new ConsumerHolder(topicConfig, container));
        }, "{}", topicConfig);
    }

    protected AbstractMessageListenerContainer buildListenerContainer(String tenantKey, TopicConfig topicConfig) {
        return new MessageListenerContainerBuilder(this.kafkaProperties, this.kafkaTemplate).build(tenantKey, topicConfig, this.messageHandler, this.sleuthWrapper);
    }

    private void stopAllTenantConsumers(String tenantKey, Map<String, ConsumerHolder> existingConsumers) {
        Collection<ConsumerHolder> holders = existingConsumers.values();
        this.withLog(tenantKey, "stopAllTenantConsumers", () -> {
            holders.forEach(consumerHolder -> this.stopConsumer(tenantKey, (ConsumerHolder)consumerHolder));
            this.tenantTopicConsumers.remove(tenantKey);
        }, "[{}]", holders);
    }

    private void removeOldConsumers(String tenantKey, List<TopicConfig> newTopicConfigs, Map<String, ConsumerHolder> existingConsumers) {
        Set toRemove = existingConsumers.entrySet().stream().filter(entry -> !newTopicConfigs.contains(((ConsumerHolder)entry.getValue()).getTopicConfig())).peek(entry -> this.stopConsumer(tenantKey, (ConsumerHolder)entry.getValue())).collect(Collectors.toSet());
        existingConsumers.entrySet().removeAll(toRemove);
    }

    private void stopConsumer(String tenantKey, ConsumerHolder consumerHolder) {
        TopicConfig existConfig = consumerHolder.getTopicConfig();
        this.withLog(tenantKey, "stopConsumer", () -> consumerHolder.getContainer().stop(), "{}", existConfig);
    }

    private String extractTenant(String updatedKey) {
        return (String)this.matcher.extractUriTemplateVariables(this.configPath, updatedKey).get(TENANT_NAME);
    }

    private TopicConsumersSpec readSpec(String updatedKey, String config) {
        TopicConsumersSpec spec = null;
        try {
            spec = (TopicConsumersSpec)this.ymlMapper.readValue(config, TopicConsumersSpec.class);
        }
        catch (Exception e) {
            log.error("Error read topic specification from path: {}", (Object)updatedKey, (Object)e);
        }
        return spec;
    }

    private Map<String, ConsumerHolder> getTenantConsumers(String tenantKey) {
        if (this.tenantTopicConsumers.containsKey(tenantKey)) {
            return this.tenantTopicConsumers.get(tenantKey);
        }
        return new ConcurrentHashMap<String, ConsumerHolder>();
    }

    private void withLog(String tenant, String command, Runnable action, String logTemplate, Object ... params) {
        StopWatch stopWatch = StopWatch.createStarted();
        log.info("[{}] start: {} " + logTemplate, new Object[]{tenant, command, params});
        action.run();
        log.info("[{}]  stop: {}, time = {} ms.", new Object[]{tenant, command, stopWatch.getTime()});
    }

    public Map<String, Map<String, ConsumerHolder>> getTenantTopicConsumers() {
        return this.tenantTopicConsumers;
    }
}

