/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka.streams;

import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.DeserializationExceptionHandler;
import org.springframework.cloud.stream.binder.kafka.streams.SendToDlqAndContinue;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.utils.DlqPartitionFunction;
import org.springframework.context.ApplicationContext;
import org.springframework.core.MethodParameter;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

final class KafkaStreamsBinderUtils {
    private static final Log LOGGER = LogFactory.getLog(KafkaStreamsBinderUtils.class);

    private KafkaStreamsBinderUtils() {
    }

    static void prepareConsumerBinding(String name, String group, ApplicationContext context, KafkaTopicProvisioner kafkaTopicProvisioner, KafkaStreamsBinderConfigurationProperties binderConfigurationProperties, ExtendedConsumerProperties<KafkaStreamsConsumerProperties> properties) {
        String[] inputTopics;
        DeserializationExceptionHandler deserializationExceptionHandler;
        ExtendedConsumerProperties<KafkaStreamsConsumerProperties> extendedConsumerProperties = properties;
        if (binderConfigurationProperties.getDeserializationExceptionHandler() == DeserializationExceptionHandler.sendToDlq) {
            ((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).setEnableDlq(true);
        }
        if ((deserializationExceptionHandler = ((KafkaStreamsConsumerProperties)((Object)properties.getExtension())).getDeserializationExceptionHandler()) == DeserializationExceptionHandler.sendToDlq) {
            ((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).setEnableDlq(true);
        }
        for (String inputTopic : inputTopics = StringUtils.commaDelimitedListToStringArray((String)name)) {
            kafkaTopicProvisioner.provisionConsumerDestination(inputTopic, group, extendedConsumerProperties);
        }
        if (((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).isEnableDlq()) {
            Map partitionFunctions = context.getBeansOfType(DlqPartitionFunction.class, false, false);
            boolean oneFunctionPresent = partitionFunctions.size() == 1;
            Integer dlqPartitions = ((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).getDlqPartitions();
            DlqPartitionFunction partitionFunction = oneFunctionPresent ? (DlqPartitionFunction)partitionFunctions.values().iterator().next() : DlqPartitionFunction.determineFallbackFunction((Integer)dlqPartitions, (Log)LOGGER);
            DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = KafkaStreamsBinderUtils.getProducerFactory((ExtendedProducerProperties<KafkaProducerProperties>)new ExtendedProducerProperties((Object)((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).getDlqProducerProperties()), binderConfigurationProperties);
            KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory);
            BiFunction<ConsumerRecord, Exception, TopicPartition> destinationResolver = (cr, e) -> new TopicPartition(((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).getDlqName(), partitionFunction.apply(group, cr, (Throwable)e).intValue());
            DeadLetterPublishingRecoverer kafkaStreamsBinderDlqRecoverer = !StringUtils.isEmpty((Object)((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).getDlqName()) ? new DeadLetterPublishingRecoverer(kafkaTemplate, destinationResolver) : null;
            for (String inputTopic : inputTopics) {
                if (StringUtils.isEmpty((Object)((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).getDlqName())) {
                    destinationResolver = (cr, e) -> new TopicPartition("error." + inputTopic + "." + group, partitionFunction.apply(group, cr, (Throwable)e).intValue());
                    kafkaStreamsBinderDlqRecoverer = new DeadLetterPublishingRecoverer(kafkaTemplate, destinationResolver);
                }
                SendToDlqAndContinue sendToDlqAndContinue = (SendToDlqAndContinue)context.getBean(SendToDlqAndContinue.class);
                sendToDlqAndContinue.addKStreamDlqDispatch(inputTopic, kafkaStreamsBinderDlqRecoverer);
            }
        }
    }

    private static DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(ExtendedProducerProperties<KafkaProducerProperties> producerProperties, KafkaBinderConfigurationProperties configurationProperties) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("retries", 0);
        props.put("buffer.memory", 0x2000000);
        props.put("acks", configurationProperties.getRequiredAcks());
        Map mergedConfig = configurationProperties.mergedProducerConfiguration();
        if (!ObjectUtils.isEmpty((Object)mergedConfig)) {
            props.putAll(mergedConfig);
        }
        if (ObjectUtils.isEmpty(props.get("bootstrap.servers"))) {
            props.put("bootstrap.servers", configurationProperties.getKafkaConnectionString());
        }
        if (ObjectUtils.isEmpty(props.get("batch.size"))) {
            props.put("batch.size", String.valueOf(((KafkaProducerProperties)producerProperties.getExtension()).getBufferSize()));
        }
        if (ObjectUtils.isEmpty(props.get("linger.ms"))) {
            props.put("linger.ms", String.valueOf(((KafkaProducerProperties)producerProperties.getExtension()).getBatchTimeout()));
        }
        if (ObjectUtils.isEmpty(props.get("compression.type"))) {
            props.put("compression.type", ((KafkaProducerProperties)producerProperties.getExtension()).getCompressionType().toString());
        }
        if (!ObjectUtils.isEmpty((Object)((KafkaProducerProperties)producerProperties.getExtension()).getConfiguration())) {
            props.putAll(((KafkaProducerProperties)producerProperties.getExtension()).getConfiguration());
        }
        props.put("key.serializer", ByteArraySerializer.class);
        props.put("value.serializer", ByteArraySerializer.class);
        return new DefaultKafkaProducerFactory(props);
    }

    static boolean supportsKStream(MethodParameter methodParameter, Class<?> targetBeanClass) {
        return KStream.class.isAssignableFrom(targetBeanClass) && KStream.class.isAssignableFrom(methodParameter.getParameterType());
    }
}

