/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.springboot.autoconfig;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.kafka.eventhandling.DefaultKafkaMessageConverter;
import org.axonframework.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.kafka.eventhandling.consumer.AsyncFetcher;
import org.axonframework.kafka.eventhandling.consumer.ConsumerFactory;
import org.axonframework.kafka.eventhandling.consumer.DefaultConsumerFactory;
import org.axonframework.kafka.eventhandling.consumer.Fetcher;
import org.axonframework.kafka.eventhandling.consumer.KafkaMessageSource;
import org.axonframework.kafka.eventhandling.consumer.SortedKafkaMessageBuffer;
import org.axonframework.kafka.eventhandling.producer.ConfirmationMode;
import org.axonframework.kafka.eventhandling.producer.DefaultProducerFactory;
import org.axonframework.kafka.eventhandling.producer.KafkaPublisher;
import org.axonframework.kafka.eventhandling.producer.ProducerFactory;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.serialization.Serializer;
import org.axonframework.spring.config.AxonConfiguration;
import org.axonframework.springboot.KafkaProperties;
import org.axonframework.springboot.autoconfig.AxonAutoConfiguration;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnClass(value={KafkaPublisher.class})
@EnableConfigurationProperties(value={KafkaProperties.class})
@AutoConfigureAfter(value={AxonAutoConfiguration.class})
public class KafkaAutoConfiguration {
    private final KafkaProperties properties;

    public KafkaAutoConfiguration(KafkaProperties properties) {
        this.properties = properties;
    }

    @ConditionalOnMissingBean
    @ConditionalOnProperty(value={"axon.kafka.producer.transaction-id-prefix"})
    @Bean
    public ProducerFactory<String, byte[]> kafkaProducerFactory() {
        Map<String, Object> producer = this.properties.buildProducerProperties();
        String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix == null) {
            throw new IllegalStateException("transactionalIdPrefix cannot be empty");
        }
        return DefaultProducerFactory.builder().configuration(producer).confirmationMode(ConfirmationMode.TRANSACTIONAL).transactionalIdPrefix(transactionIdPrefix).build();
    }

    @ConditionalOnMissingBean
    @Bean
    @ConditionalOnProperty(value={"axon.kafka.consumer.group-id"})
    public ConsumerFactory<String, byte[]> kafkaConsumerFactory() {
        return new DefaultConsumerFactory(this.properties.buildConsumerProperties());
    }

    @ConditionalOnMissingBean
    @Bean
    public KafkaMessageConverter<String, byte[]> kafkaMessageConverter(@Qualifier(value="eventSerializer") Serializer eventSerializer) {
        return DefaultKafkaMessageConverter.builder().serializer(eventSerializer).build();
    }

    @ConditionalOnMissingBean
    @Bean(initMethod="start", destroyMethod="shutDown")
    @ConditionalOnBean(value={ProducerFactory.class, KafkaMessageConverter.class})
    public KafkaPublisher<String, byte[]> kafkaPublisher(ProducerFactory<String, byte[]> kafkaProducerFactory, EventBus eventBus, KafkaMessageConverter<String, byte[]> kafkaMessageConverter, AxonConfiguration configuration) {
        return KafkaPublisher.builder().messageSource((SubscribableMessageSource)eventBus).producerFactory(kafkaProducerFactory).messageConverter(kafkaMessageConverter).messageMonitor(configuration.messageMonitor(KafkaPublisher.class, "kafkaPublisher")).topic(this.properties.getDefaultTopic()).build();
    }

    @ConditionalOnMissingBean
    @ConditionalOnBean(value={ConsumerFactory.class, KafkaMessageConverter.class})
    @Bean(destroyMethod="shutdown")
    public Fetcher kafkaFetcher(ConsumerFactory<String, byte[]> kafkaConsumerFactory, KafkaMessageConverter<String, byte[]> kafkaMessageConverter) {
        return AsyncFetcher.builder().consumerFactory(kafkaConsumerFactory).bufferFactory(() -> new SortedKafkaMessageBuffer(this.properties.getFetcher().getBufferSize())).messageConverter(kafkaMessageConverter).topic(this.properties.getDefaultTopic()).pollTimeout(this.properties.getFetcher().getPollTimeout(), TimeUnit.MILLISECONDS).build();
    }

    @ConditionalOnMissingBean
    @Bean
    @ConditionalOnBean(value={ConsumerFactory.class})
    public KafkaMessageSource kafkaMessageSource(Fetcher kafkaFetcher) {
        return new KafkaMessageSource(kafkaFetcher);
    }
}

