/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.retrytopic;

import java.lang.reflect.Method;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
import org.springframework.kafka.retrytopic.DestinationTopic;
import org.springframework.kafka.retrytopic.DestinationTopicProcessor;
import org.springframework.kafka.retrytopic.EndpointCustomizer;
import org.springframework.kafka.retrytopic.EndpointCustomizerFactory;
import org.springframework.kafka.retrytopic.ListenerContainerFactoryConfigurer;
import org.springframework.kafka.retrytopic.ListenerContainerFactoryResolver;
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
import org.springframework.kafka.retrytopic.RetryTopicNamesProviderFactory;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.EndpointHandlerMethod;
import org.springframework.kafka.support.EndpointHandlerMultiMethod;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.TopicForRetryable;

public class RetryTopicConfigurer
implements BeanFactoryAware {
    private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(RetryTopicConfigurer.class));
    public static final EndpointHandlerMethod DEFAULT_DLT_HANDLER = RetryTopicConfigurer.createHandlerMethodWith(LoggingDltListenerHandlerMethod.class, "logMessage");
    private final DestinationTopicProcessor destinationTopicProcessor;
    private final ListenerContainerFactoryResolver containerFactoryResolver;
    private final ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer;
    private BeanFactory beanFactory;
    private final RetryTopicNamesProviderFactory retryTopicNamesProviderFactory;

    @Autowired
    public RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor, ListenerContainerFactoryResolver containerFactoryResolver, ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer, RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {
        this.destinationTopicProcessor = destinationTopicProcessor;
        this.containerFactoryResolver = containerFactoryResolver;
        this.listenerContainerFactoryConfigurer = listenerContainerFactoryConfigurer;
        this.retryTopicNamesProviderFactory = retryTopicNamesProviderFactory;
    }

    public void processMainAndRetryListeners(EndpointProcessor endpointProcessor, MethodKafkaListenerEndpoint<?, ?> mainEndpoint, RetryTopicConfiguration configuration, KafkaListenerEndpointRegistrar registrar, @Nullable KafkaListenerContainerFactory<?> factory, String defaultContainerFactoryBeanName) {
        String id = mainEndpoint.getId();
        if (id == null) {
            id = "no.id.provided";
        }
        DestinationTopicProcessor.Context context = new DestinationTopicProcessor.Context(id, configuration.getDestinationTopicProperties());
        this.configureEndpoints(mainEndpoint, endpointProcessor, factory, registrar, configuration, context, defaultContainerFactoryBeanName);
        this.destinationTopicProcessor.processRegisteredDestinations(this.getTopicCreationFunction(configuration), context);
    }

    private void configureEndpoints(MethodKafkaListenerEndpoint<?, ?> mainEndpoint, EndpointProcessor endpointProcessor, KafkaListenerContainerFactory<?> factory, KafkaListenerEndpointRegistrar registrar, RetryTopicConfiguration configuration, DestinationTopicProcessor.Context context, String defaultContainerFactoryBeanName) {
        this.destinationTopicProcessor.processDestinationTopicProperties(destinationTopicProperties -> this.processAndRegisterEndpoint(mainEndpoint, endpointProcessor, factory, defaultContainerFactoryBeanName, registrar, configuration, context, (DestinationTopic.Properties)destinationTopicProperties), context);
    }

    private void processAndRegisterEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEndpoint, EndpointProcessor endpointProcessor, KafkaListenerContainerFactory<?> factory, String defaultFactoryBeanName, KafkaListenerEndpointRegistrar registrar, RetryTopicConfiguration configuration, DestinationTopicProcessor.Context context, DestinationTopic.Properties destinationTopicProperties) {
        MethodKafkaListenerEndpoint<Object, Object> endpoint;
        KafkaListenerContainerFactory<?> resolvedFactory;
        KafkaListenerContainerFactory<?> kafkaListenerContainerFactory = resolvedFactory = destinationTopicProperties.isMainEndpoint() ? this.resolveAndConfigureFactoryForMainEndpoint(factory, defaultFactoryBeanName, configuration) : this.resolveAndConfigureFactoryForRetryEndpoint(factory, defaultFactoryBeanName, configuration);
        if (destinationTopicProperties.isMainEndpoint()) {
            endpoint = mainEndpoint;
        } else {
            if (mainEndpoint instanceof MultiMethodKafkaListenerEndpoint) {
                MultiMethodKafkaListenerEndpoint multi = (MultiMethodKafkaListenerEndpoint)mainEndpoint;
                endpoint = new MultiMethodKafkaListenerEndpoint(multi.getMethods(), multi.getDefaultMethod(), multi.getBean());
            } else {
                endpoint = new MethodKafkaListenerEndpoint();
            }
            endpoint.setId(mainEndpoint.getId());
            endpoint.setMainListenerId(mainEndpoint.getId());
        }
        endpointProcessor.accept(endpoint);
        Integer concurrency = configuration.getConcurrency();
        if (!destinationTopicProperties.isMainEndpoint() && concurrency != null) {
            endpoint.setConcurrency(concurrency);
        }
        EndpointHandlerMethod endpointBeanMethod = this.getEndpointHandlerMethod(mainEndpoint, configuration, destinationTopicProperties);
        this.createEndpointCustomizer(endpointBeanMethod, destinationTopicProperties).customizeEndpointAndCollectTopics(endpoint).forEach(topicNamesHolder -> this.destinationTopicProcessor.registerDestinationTopic(topicNamesHolder.getMainTopic(), topicNamesHolder.getCustomizedTopic(), destinationTopicProperties, context));
        registrar.registerEndpoint(endpoint, resolvedFactory);
        endpoint.setBeanFactory(this.beanFactory);
    }

    protected EndpointHandlerMethod getEndpointHandlerMethod(MethodKafkaListenerEndpoint<?, ?> mainEndpoint, RetryTopicConfiguration configuration, DestinationTopic.Properties props) {
        EndpointHandlerMethod dltHandlerMethod = configuration.getDltHandlerMethod();
        return props.isDltTopic() ? this.getDltEndpointHandlerMethodOrDefault(mainEndpoint, dltHandlerMethod) : this.getRetryEndpointHandlerMethod(mainEndpoint);
    }

    private Consumer<Collection<String>> getTopicCreationFunction(RetryTopicConfiguration config) {
        RetryTopicConfiguration.TopicCreation topicCreationConfig = config.forKafkaTopicAutoCreation();
        return topicCreationConfig.shouldCreateTopics() ? topics -> this.createNewTopicBeans((Collection<String>)topics, topicCreationConfig) : topics -> {};
    }

    protected void createNewTopicBeans(Collection<String> topics, RetryTopicConfiguration.TopicCreation config) {
        topics.forEach(topic -> {
            DefaultListableBeanFactory bf = (DefaultListableBeanFactory)this.beanFactory;
            String beanName = topic + "-topicRegistrationBean";
            if (!bf.containsBean(beanName)) {
                bf.registerSingleton(beanName, (Object)new TopicForRetryable((String)topic, config.getNumPartitions(), config.getReplicationFactor()));
            }
        });
    }

    protected EndpointCustomizer<MethodKafkaListenerEndpoint<?, ?>> createEndpointCustomizer(EndpointHandlerMethod endpointBeanMethod, DestinationTopic.Properties destinationTopicProperties) {
        return new EndpointCustomizerFactory(destinationTopicProperties, endpointBeanMethod, this.beanFactory, this.retryTopicNamesProviderFactory).createEndpointCustomizer();
    }

    private EndpointHandlerMethod getDltEndpointHandlerMethodOrDefault(MethodKafkaListenerEndpoint<?, ?> mainEndpoint, @Nullable EndpointHandlerMethod dltEndpointHandlerMethod) {
        EndpointHandlerMethod dltHandlerMethod;
        EndpointHandlerMethod endpointHandlerMethod = dltHandlerMethod = dltEndpointHandlerMethod != null ? dltEndpointHandlerMethod : DEFAULT_DLT_HANDLER;
        if (mainEndpoint instanceof MultiMethodKafkaListenerEndpoint) {
            dltHandlerMethod = new EndpointHandlerMultiMethod(dltHandlerMethod.resolveBean(this.beanFactory), dltHandlerMethod.getMethod(), List.of(dltHandlerMethod.getMethod()));
        }
        return dltHandlerMethod;
    }

    private EndpointHandlerMethod getRetryEndpointHandlerMethod(MethodKafkaListenerEndpoint<?, ?> mainEndpoint) {
        EndpointHandlerMethod retryBeanMethod;
        if (mainEndpoint instanceof MultiMethodKafkaListenerEndpoint) {
            MultiMethodKafkaListenerEndpoint multi = (MultiMethodKafkaListenerEndpoint)mainEndpoint;
            retryBeanMethod = new EndpointHandlerMultiMethod(multi.getBean(), multi.getDefaultMethod(), multi.getMethods());
        } else {
            retryBeanMethod = new EndpointHandlerMethod(mainEndpoint.getBean(), mainEndpoint.getMethod());
        }
        return retryBeanMethod;
    }

    private KafkaListenerContainerFactory<?> resolveAndConfigureFactoryForMainEndpoint(KafkaListenerContainerFactory<?> providedFactory, String defaultFactoryBeanName, RetryTopicConfiguration configuration) {
        ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory = this.containerFactoryResolver.resolveFactoryForMainEndpoint(providedFactory, defaultFactoryBeanName, configuration.forContainerFactoryResolver());
        return this.listenerContainerFactoryConfigurer.decorateFactory(resolvedFactory);
    }

    private KafkaListenerContainerFactory<?> resolveAndConfigureFactoryForRetryEndpoint(KafkaListenerContainerFactory<?> providedFactory, String defaultFactoryBeanName, RetryTopicConfiguration configuration) {
        ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory = this.containerFactoryResolver.resolveFactoryForRetryEndpoint(providedFactory, defaultFactoryBeanName, configuration.forContainerFactoryResolver());
        return this.listenerContainerFactoryConfigurer.decorateFactory(resolvedFactory);
    }

    public static EndpointHandlerMethod createHandlerMethodWith(Object beanOrClass, String methodName) {
        return new EndpointHandlerMethod(beanOrClass, methodName);
    }

    public static EndpointHandlerMethod createHandlerMethodWith(Object bean, Method method) {
        return new EndpointHandlerMethod(bean, method);
    }

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
    }

    public static interface EndpointProcessor
    extends Consumer<MethodKafkaListenerEndpoint<?, ?>> {
        default public void process(MethodKafkaListenerEndpoint<?, ?> listenerEndpoint) {
            this.accept(listenerEndpoint);
        }
    }

    static class LoggingDltListenerHandlerMethod {
        public static final String DEFAULT_DLT_METHOD_NAME = "logMessage";

        LoggingDltListenerHandlerMethod() {
        }

        public void logMessage(Object message, @NonNull Acknowledgment ack) {
            if (message instanceof ConsumerRecord) {
                LOGGER.info(() -> "Received message in dlt listener: " + KafkaUtils.format((ConsumerRecord)message));
            } else {
                LOGGER.info(() -> "Received message in dlt listener.");
            }
            ack.acknowledge();
        }
    }
}

