/*
 * Decompiled with CFR 0.152.
 */
package com.icthh.xm.commons.topic.service;

import com.icthh.xm.commons.logging.trace.TraceWrapper;
import com.icthh.xm.commons.topic.config.MessageListenerContainerBuilder;
import com.icthh.xm.commons.topic.domain.ConsumerHolder;
import com.icthh.xm.commons.topic.domain.TopicConfig;
import com.icthh.xm.commons.topic.message.MessageHandler;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.stereotype.Service;

@Service
public class TopicManagerService {
    private static final Logger log = LoggerFactory.getLogger(TopicManagerService.class);
    private final KafkaProperties kafkaProperties;
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final TraceWrapper traceWrapper;
    private final Map<String, Map<String, ConsumerHolder>> topicConsumerHolders = new ConcurrentHashMap<String, Map<String, ConsumerHolder>>();

    public void processTopicConfig(String tenantKey, TopicConfig topicConfig, MessageHandler messageHandler) {
        String topicConfigKey = topicConfig.getKey();
        Map<String, ConsumerHolder> existingConsumers = this.getConsumerHoldersByTenant(tenantKey);
        ConsumerHolder existingConfig = existingConsumers.get(topicConfigKey);
        if (topicConfig.getMaxPollInterval() != null && topicConfig.getBackOffPeriod() > (long)topicConfig.getMaxPollInterval().intValue()) {
            log.error("Consumer was not created, backOffPeriod is greater than maxPollInterval, topicConfig: [{}]", (Object)topicConfig);
            return;
        }
        if (existingConfig == null) {
            this.startNewConsumer(tenantKey, topicConfig, messageHandler);
            return;
        }
        if (existingConfig.getTopicConfig().equals(topicConfig)) {
            log.info("[{}] Skip consumer configuration due to no changes found: [{}] ", (Object)tenantKey, (Object)topicConfig);
            return;
        }
        this.updateConsumer(tenantKey, topicConfig, existingConfig, messageHandler);
    }

    public void stopAllTenantConsumers(String tenantKey) {
        Map<String, ConsumerHolder> existingConsumers = this.getConsumerHoldersByTenant(tenantKey);
        Collection<ConsumerHolder> holders = existingConsumers.values();
        this.withLog(tenantKey, "stopAllTenantConsumers", () -> holders.forEach(consumerHolder -> this.stopConsumer(tenantKey, (ConsumerHolder)consumerHolder)), "[{}]", holders);
        this.topicConsumerHolders.remove(tenantKey);
    }

    public void removeOldConsumers(String tenantKey, List<TopicConfig> newTopicConfigs) {
        Map<String, ConsumerHolder> existingConsumers = this.getConsumerHoldersByTenant(tenantKey);
        Set toRemove = existingConsumers.entrySet().stream().filter(entry -> !newTopicConfigs.contains(((ConsumerHolder)entry.getValue()).getTopicConfig())).peek(entry -> this.stopConsumer(tenantKey, (ConsumerHolder)entry.getValue())).collect(Collectors.toSet());
        existingConsumers.entrySet().removeAll(toRemove);
    }

    public void startNewConsumer(String tenantKey, TopicConfig topicConfig, MessageHandler messageHandler) {
        this.withLog(tenantKey, "startNewConsumer", () -> {
            AbstractMessageListenerContainer container = this.buildListenerContainer(tenantKey, topicConfig, messageHandler);
            container.start();
            Map<String, ConsumerHolder> existingConsumers = this.getConsumerHoldersByTenant(tenantKey);
            existingConsumers.put(topicConfig.getKey(), new ConsumerHolder(topicConfig, container));
        }, "{}", topicConfig);
    }

    public void updateConsumer(String tenantKey, TopicConfig topicConfig, ConsumerHolder existingConfig, MessageHandler messageHandler) {
        this.withLog(tenantKey, "restartConsumer", () -> {
            existingConfig.getContainer().stop();
            AbstractMessageListenerContainer container = this.buildListenerContainer(tenantKey, topicConfig, messageHandler);
            container.start();
            Map<String, ConsumerHolder> existingConsumers = this.getConsumerHoldersByTenant(tenantKey);
            existingConsumers.put(topicConfig.getKey(), new ConsumerHolder(topicConfig, container));
        }, "{}", topicConfig);
    }

    public Map<String, ConsumerHolder> getConsumerHoldersByTenantImmutable(String tenantKey) {
        return Collections.unmodifiableMap(this.getConsumerHoldersByTenant(tenantKey));
    }

    protected AbstractMessageListenerContainer buildListenerContainer(String tenantKey, TopicConfig topicConfig, MessageHandler messageHandler) {
        return new MessageListenerContainerBuilder(this.kafkaProperties, this.kafkaTemplate).build(tenantKey, topicConfig, messageHandler, this.traceWrapper);
    }

    private Map<String, ConsumerHolder> getConsumerHoldersByTenant(String tenantKey) {
        return this.topicConsumerHolders.computeIfAbsent(tenantKey, key -> new ConcurrentHashMap());
    }

    private void withLog(String tenant, String command, Runnable action, String logTemplate, Object ... params) {
        StopWatch stopWatch = StopWatch.createStarted();
        log.info("[{}] start: {} " + logTemplate, new Object[]{tenant, command, params});
        action.run();
        log.info("[{}]  stop: {}, time = {} ms.", new Object[]{tenant, command, stopWatch.getTime()});
    }

    private void stopConsumer(String tenantKey, ConsumerHolder consumerHolder) {
        TopicConfig existConfig = consumerHolder.getTopicConfig();
        this.withLog(tenantKey, "stopConsumer", () -> consumerHolder.getContainer().stop(), "{}", existConfig);
    }

    public TopicManagerService(KafkaProperties kafkaProperties, KafkaTemplate<String, String> kafkaTemplate, TraceWrapper traceWrapper) {
        this.kafkaProperties = kafkaProperties;
        this.kafkaTemplate = kafkaTemplate;
        this.traceWrapper = traceWrapper;
    }
}

