/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka.streams;

import java.lang.annotation.Annotation;
import java.lang.reflect.Executable;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory;
import org.springframework.cloud.stream.binder.kafka.streams.KTableBoundElementFactory;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBindingInformationCatalogue;
import org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolver;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter;
import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
import org.springframework.cloud.stream.binding.StreamListenerSetupMethodOrchestrator;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.MethodParameter;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;

class KafkaStreamsStreamListenerSetupMethodOrchestrator
implements StreamListenerSetupMethodOrchestrator,
ApplicationContextAware {
    private static final Log LOG = LogFactory.getLog(KafkaStreamsStreamListenerSetupMethodOrchestrator.class);
    private final StreamListenerParameterAdapter streamListenerParameterAdapter;
    private final Collection<StreamListenerResultAdapter> streamListenerResultAdapters;
    private final BindingServiceProperties bindingServiceProperties;
    private final KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties;
    private final KeyValueSerdeResolver keyValueSerdeResolver;
    private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
    private final Map<Method, StreamsBuilderFactoryBean> methodStreamsBuilderFactoryBeanMap = new HashMap<Method, StreamsBuilderFactoryBean>();
    private final KafkaStreamsBinderConfigurationProperties binderConfigurationProperties;
    private ConfigurableApplicationContext applicationContext;

    KafkaStreamsStreamListenerSetupMethodOrchestrator(BindingServiceProperties bindingServiceProperties, KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties, KeyValueSerdeResolver keyValueSerdeResolver, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, StreamListenerParameterAdapter streamListenerParameterAdapter, Collection<StreamListenerResultAdapter> streamListenerResultAdapters, KafkaStreamsBinderConfigurationProperties binderConfigurationProperties) {
        this.bindingServiceProperties = bindingServiceProperties;
        this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
        this.keyValueSerdeResolver = keyValueSerdeResolver;
        this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
        this.streamListenerParameterAdapter = streamListenerParameterAdapter;
        this.streamListenerResultAdapters = streamListenerResultAdapters;
        this.binderConfigurationProperties = binderConfigurationProperties;
    }

    public boolean supports(Method method) {
        return this.methodParameterSupports(method) && (this.methodReturnTypeSuppports(method) || Void.TYPE.equals(method.getReturnType()));
    }

    private boolean methodReturnTypeSuppports(Method method) {
        Class<?> returnType = method.getReturnType();
        return returnType.equals(KStream.class) || returnType.isArray() && returnType.getComponentType().equals(KStream.class);
    }

    private boolean methodParameterSupports(Method method) {
        boolean supports = false;
        for (int i = 0; i < method.getParameterCount(); ++i) {
            MethodParameter methodParameter = MethodParameter.forExecutable((Executable)method, (int)i);
            Class parameterType = methodParameter.getParameterType();
            if (!parameterType.equals(KStream.class) && !parameterType.equals(KTable.class)) continue;
            supports = true;
        }
        return supports;
    }

    public void orchestrateStreamListenerSetupMethod(StreamListener streamListener, Method method, Object bean) {
        block9: {
            String[] methodAnnotatedOutboundNames = KafkaStreamsStreamListenerSetupMethodOrchestrator.getOutboundBindingTargetNames(method);
            this.validateStreamListenerMethod(streamListener, method, methodAnnotatedOutboundNames);
            String methodAnnotatedInboundName = streamListener.value();
            Object[] adaptedInboundArguments = this.adaptAndRetrieveInboundArguments(method, methodAnnotatedInboundName, (ApplicationContext)this.applicationContext, this.streamListenerParameterAdapter);
            try {
                ReflectionUtils.makeAccessible((Method)method);
                if (Void.TYPE.equals(method.getReturnType())) {
                    method.invoke(bean, adaptedInboundArguments);
                    break block9;
                }
                Object result = method.invoke(bean, adaptedInboundArguments);
                if (result.getClass().isArray()) {
                    Assert.isTrue((methodAnnotatedOutboundNames.length == ((Object[])result).length ? 1 : 0) != 0, (String)"Result does not match with the number of declared outbounds");
                } else {
                    Assert.isTrue((methodAnnotatedOutboundNames.length == 1 ? 1 : 0) != 0, (String)"Result does not match with the number of declared outbounds");
                }
                if (result.getClass().isArray()) {
                    Object[] outboundKStreams = (Object[])result;
                    int i = 0;
                    block2: for (Object outboundKStream : outboundKStreams) {
                        Object targetBean = this.applicationContext.getBean(methodAnnotatedOutboundNames[i++]);
                        for (StreamListenerResultAdapter streamListenerResultAdapter : this.streamListenerResultAdapters) {
                            if (!streamListenerResultAdapter.supports(outboundKStream.getClass(), targetBean.getClass())) continue;
                            streamListenerResultAdapter.adapt(outboundKStream, targetBean);
                            continue block2;
                        }
                    }
                    break block9;
                }
                Object targetBean = this.applicationContext.getBean(methodAnnotatedOutboundNames[0]);
                for (StreamListenerResultAdapter streamListenerResultAdapter : this.streamListenerResultAdapters) {
                    if (!streamListenerResultAdapter.supports(result.getClass(), targetBean.getClass())) continue;
                    streamListenerResultAdapter.adapt(result, targetBean);
                    break;
                }
            }
            catch (Exception e) {
                throw new BeanInitializationException("Cannot setup StreamListener for " + method, (Throwable)e);
            }
        }
    }

    public Object[] adaptAndRetrieveInboundArguments(Method method, String inboundName, ApplicationContext applicationContext, StreamListenerParameterAdapter ... streamListenerParameterAdapters) {
        Object[] arguments = new Object[method.getParameterTypes().length];
        for (int parameterIndex = 0; parameterIndex < arguments.length; ++parameterIndex) {
            MethodParameter methodParameter = MethodParameter.forExecutable((Executable)method, (int)parameterIndex);
            Class parameterType = methodParameter.getParameterType();
            Object targetReferenceValue = null;
            if (methodParameter.hasParameterAnnotation(Input.class)) {
                targetReferenceValue = AnnotationUtils.getValue((Annotation)methodParameter.getParameterAnnotation(Input.class));
                Input methodAnnotation = (Input)methodParameter.getParameterAnnotation(Input.class);
                inboundName = methodAnnotation.value();
            } else if (arguments.length == 1 && StringUtils.hasText((String)inboundName)) {
                targetReferenceValue = inboundName;
            }
            if (targetReferenceValue != null) {
                Assert.isInstanceOf(String.class, (Object)targetReferenceValue, (String)"Annotation value must be a String");
                Object targetBean = applicationContext.getBean((String)targetReferenceValue);
                BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(inboundName);
                this.enableNativeDecodingForKTableAlways(parameterType, bindingProperties);
                StreamsConfig streamsConfig = null;
                if (!this.methodStreamsBuilderFactoryBeanMap.containsKey(method)) {
                    streamsConfig = this.buildStreamsBuilderAndRetrieveConfig(method, applicationContext, bindingProperties);
                }
                try {
                    StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.methodStreamsBuilderFactoryBeanMap.get(method);
                    StreamsBuilder streamsBuilder = (StreamsBuilder)streamsBuilderFactoryBean.getObject();
                    KafkaStreamsConsumerProperties extendedConsumerProperties = this.kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(inboundName);
                    Serde<?> keySerde = this.keyValueSerdeResolver.getInboundKeySerde(extendedConsumerProperties);
                    Serde<?> valueSerde = this.keyValueSerdeResolver.getInboundValueSerde(bindingProperties.getConsumer(), extendedConsumerProperties);
                    if (parameterType.isAssignableFrom(KStream.class)) {
                        KStream<?, ?> stream = this.getkStream(inboundName, bindingProperties, streamsBuilder, keySerde, valueSerde);
                        KStreamBoundElementFactory.KStreamWrapper kStreamWrapper = (KStreamBoundElementFactory.KStreamWrapper)targetBean;
                        kStreamWrapper.wrap(stream);
                        this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactory(streamsBuilderFactoryBean);
                        if (streamsConfig != null) {
                            this.kafkaStreamsBindingInformationCatalogue.addStreamsConfigs(kStreamWrapper, streamsConfig);
                        }
                        for (StreamListenerParameterAdapter streamListenerParameterAdapter : streamListenerParameterAdapters) {
                            if (!streamListenerParameterAdapter.supports(stream.getClass(), methodParameter)) continue;
                            arguments[parameterIndex] = streamListenerParameterAdapter.adapt((Object)kStreamWrapper, methodParameter);
                            break;
                        }
                        if (arguments[parameterIndex] == null && parameterType.isAssignableFrom(stream.getClass())) {
                            arguments[parameterIndex] = stream;
                        }
                        Assert.notNull((Object)arguments[parameterIndex], (String)("Cannot convert argument " + parameterIndex + " of " + method + "from " + stream.getClass() + " to " + parameterType));
                        continue;
                    }
                    if (!parameterType.isAssignableFrom(KTable.class)) continue;
                    String materializedAs = extendedConsumerProperties.getMaterializedAs();
                    String bindingDestination = this.bindingServiceProperties.getBindingDestination(inboundName);
                    KTable<?, ?> table = materializedAs != null ? this.materializedAs(streamsBuilder, bindingDestination, materializedAs, keySerde, valueSerde) : streamsBuilder.table(bindingDestination, Consumed.with(keySerde, valueSerde));
                    KTableBoundElementFactory.KTableWrapper kTableWrapper = (KTableBoundElementFactory.KTableWrapper)targetBean;
                    kTableWrapper.wrap(table);
                    this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactory(streamsBuilderFactoryBean);
                    if (streamsConfig != null) {
                        this.kafkaStreamsBindingInformationCatalogue.addStreamsConfigs(kTableWrapper, streamsConfig);
                    }
                    arguments[parameterIndex] = table;
                    continue;
                }
                catch (Exception e) {
                    throw new IllegalStateException(e);
                }
            }
            throw new IllegalStateException("A method annotated with @StreamListener may use @Input or @Output annotations only in declarative mode and for parameters that are binding targets or convertible from binding targets.");
        }
        return arguments;
    }

    private <K, V> KTable<K, V> materializedAs(StreamsBuilder streamsBuilder, String destination, String storeName, Serde<K> k, Serde<V> v) {
        return streamsBuilder.table(this.bindingServiceProperties.getBindingDestination(destination), Materialized.as((String)storeName).withKeySerde(k).withValueSerde(v));
    }

    private KStream<?, ?> getkStream(String inboundName, BindingProperties bindingProperties, StreamsBuilder streamsBuilder, Serde<?> keySerde, Serde<?> valueSerde) {
        KStream stream = streamsBuilder.stream(this.bindingServiceProperties.getBindingDestination(inboundName), Consumed.with(keySerde, valueSerde));
        boolean nativeDecoding = this.bindingServiceProperties.getConsumerProperties(inboundName).isUseNativeDecoding();
        if (nativeDecoding) {
            LOG.info((Object)("Native decoding is enabled for " + inboundName + ". Inbound deserialization done at the broker."));
        } else {
            LOG.info((Object)("Native decoding is disabled for " + inboundName + ". Inbound message conversion done by Spring Cloud Stream."));
        }
        stream = stream.mapValues(value -> {
            Object returnValue;
            String contentType = bindingProperties.getContentType();
            if (!StringUtils.isEmpty((Object)contentType) && !nativeDecoding) {
                Message message = MessageBuilder.withPayload((Object)value).setHeader("contentType", (Object)contentType).build();
                returnValue = message;
            } else {
                returnValue = value;
            }
            return returnValue;
        });
        return stream;
    }

    private void enableNativeDecodingForKTableAlways(Class<?> parameterType, BindingProperties bindingProperties) {
        if (parameterType.isAssignableFrom(KTable.class)) {
            if (bindingProperties.getConsumer() == null) {
                bindingProperties.setConsumer(new ConsumerProperties());
            }
            bindingProperties.getConsumer().setUseNativeDecoding(true);
        }
    }

    private StreamsConfig buildStreamsBuilderAndRetrieveConfig(Method method, ApplicationContext applicationContext, BindingProperties bindingProperties) {
        ConfigurableListableBeanFactory beanFactory = this.applicationContext.getBeanFactory();
        StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean();
        streamsBuilder.setAutoStartup(false);
        AbstractBeanDefinition streamsBuilderBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(streamsBuilder.getClass(), () -> streamsBuilder).getRawBeanDefinition();
        ((BeanDefinitionRegistry)beanFactory).registerBeanDefinition("stream-builder-" + method.getName(), (BeanDefinition)streamsBuilderBeanDefinition);
        StreamsBuilderFactoryBean streamsBuilderX = (StreamsBuilderFactoryBean)applicationContext.getBean("&stream-builder-" + method.getName(), StreamsBuilderFactoryBean.class);
        String group = bindingProperties.getGroup();
        if (!StringUtils.hasText((String)group)) {
            group = this.binderConfigurationProperties.getApplicationId();
        }
        Map streamConfigGlobalProperties = (Map)applicationContext.getBean("streamConfigGlobalProperties", Map.class);
        streamConfigGlobalProperties.put("application.id", group);
        StreamsConfig streamsConfig = new StreamsConfig(streamConfigGlobalProperties){
            DeserializationExceptionHandler deserializationExceptionHandler;

            public <T> T getConfiguredInstance(String key, Class<T> clazz) {
                if (key.equals("default.deserialization.exception.handler")) {
                    if (this.deserializationExceptionHandler != null) {
                        return (T)this.deserializationExceptionHandler;
                    }
                    Object t = super.getConfiguredInstance(key, clazz);
                    this.deserializationExceptionHandler = (DeserializationExceptionHandler)t;
                    return (T)t;
                }
                return (T)super.getConfiguredInstance(key, clazz);
            }
        };
        AbstractBeanDefinition streamsConfigBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(streamsConfig.getClass(), () -> streamsConfig).getRawBeanDefinition();
        ((BeanDefinitionRegistry)beanFactory).registerBeanDefinition("streamsConfig-" + method.getName(), (BeanDefinition)streamsConfigBeanDefinition);
        streamsBuilder.setStreamsConfig(streamsConfig);
        this.methodStreamsBuilderFactoryBeanMap.put(method, streamsBuilderX);
        return streamsConfig;
    }

    public final void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = (ConfigurableApplicationContext)applicationContext;
    }

    private void validateStreamListenerMethod(StreamListener streamListener, Method method, String[] methodAnnotatedOutboundNames) {
        String methodAnnotatedInboundName = streamListener.value();
        if (methodAnnotatedOutboundNames != null) {
            for (String s : methodAnnotatedOutboundNames) {
                if (!StringUtils.hasText((String)s)) continue;
                Assert.isTrue((boolean)this.isDeclarativeOutput(method, s), (String)"Method must be declarative");
            }
        }
        if (StringUtils.hasText((String)methodAnnotatedInboundName)) {
            int methodArgumentsLength = method.getParameterTypes().length;
            for (int parameterIndex = 0; parameterIndex < methodArgumentsLength; ++parameterIndex) {
                MethodParameter methodParameter = MethodParameter.forExecutable((Executable)method, (int)parameterIndex);
                Assert.isTrue((boolean)this.isDeclarativeInput(methodAnnotatedInboundName, methodParameter), (String)"Method must be declarative");
            }
        }
    }

    private boolean isDeclarativeOutput(Method m, String targetBeanName) {
        Class<?> returnType = m.getReturnType();
        if (returnType.isArray()) {
            Class targetBeanClass = this.applicationContext.getType(targetBeanName);
            boolean declarative = this.streamListenerResultAdapters.stream().anyMatch(slpa -> slpa.supports(returnType.getComponentType(), targetBeanClass));
            return declarative;
        }
        Class targetBeanClass = this.applicationContext.getType(targetBeanName);
        boolean declarative = this.streamListenerResultAdapters.stream().anyMatch(slpa -> slpa.supports(returnType, targetBeanClass));
        return declarative;
    }

    private boolean isDeclarativeInput(String targetBeanName, MethodParameter methodParameter) {
        if (!methodParameter.getParameterType().isAssignableFrom(Object.class) && this.applicationContext.containsBean(targetBeanName)) {
            Class targetBeanClass = this.applicationContext.getType(targetBeanName);
            return this.streamListenerParameterAdapter.supports(targetBeanClass, methodParameter);
        }
        return false;
    }

    private static String[] getOutboundBindingTargetNames(Method method) {
        SendTo sendTo = (SendTo)AnnotationUtils.findAnnotation((Method)method, SendTo.class);
        if (sendTo != null) {
            Assert.isTrue((!ObjectUtils.isEmpty((Object[])sendTo.value()) ? 1 : 0) != 0, (String)"At least one output must be specified");
            Assert.isTrue((sendTo.value().length >= 1 ? 1 : 0) != 0, (String)"At least one outbound destination need to be provided.");
            return sendTo.value();
        }
        return null;
    }
}

