/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka.streams;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.cloud.stream.binder.kafka.streams.AbstractKafkaStreamsBinderProcessor;
import org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBindingInformationCatalogue;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate;
import org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolver;
import org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsBindableProxyFactory;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.function.StreamFunctionProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.core.ResolvableType;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

public class KafkaStreamsFunctionProcessor
extends AbstractKafkaStreamsBinderProcessor
implements BeanFactoryAware {
    private static final Log LOG = LogFactory.getLog(KafkaStreamsFunctionProcessor.class);
    private static final String OUTBOUND = "outbound";
    private final BindingServiceProperties bindingServiceProperties;
    private final Map<String, StreamsBuilderFactoryBean> methodStreamsBuilderFactoryBeanMap = new HashMap<String, StreamsBuilderFactoryBean>();
    private final KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties;
    private final KeyValueSerdeResolver keyValueSerdeResolver;
    private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
    private final KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate;
    private BeanFactory beanFactory;
    private StreamFunctionProperties streamFunctionProperties;
    private KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties;
    StreamsBuilderFactoryBeanConfigurer customizer;
    ConfigurableEnvironment environment;

    public KafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProperties, KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties, KeyValueSerdeResolver keyValueSerdeResolver, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate, CleanupConfig cleanupConfig, StreamFunctionProperties streamFunctionProperties, KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties, StreamsBuilderFactoryBeanConfigurer customizer, ConfigurableEnvironment environment) {
        super(bindingServiceProperties, kafkaStreamsBindingInformationCatalogue, kafkaStreamsExtendedBindingProperties, keyValueSerdeResolver, cleanupConfig);
        this.bindingServiceProperties = bindingServiceProperties;
        this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
        this.keyValueSerdeResolver = keyValueSerdeResolver;
        this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
        this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate;
        this.streamFunctionProperties = streamFunctionProperties;
        this.kafkaStreamsBinderConfigurationProperties = kafkaStreamsBinderConfigurationProperties;
        this.customizer = customizer;
        this.environment = environment;
    }

    private Map<String, ResolvableType> buildTypeMap(ResolvableType resolvableType, KafkaStreamsBindableProxyFactory kafkaStreamsBindableProxyFactory, Method method, String functionName) {
        LinkedHashMap<String, ResolvableType> resolvableTypeMap = new LinkedHashMap<String, ResolvableType>();
        if (method != null) {
            ResolvableType firstMethodParameter = ResolvableType.forMethodParameter((Method)method, (int)0);
            ResolvableType currentOutputGeneric = ResolvableType.forMethodReturnType((Method)method);
            LinkedHashSet<String> inputs = new LinkedHashSet<String>(kafkaStreamsBindableProxyFactory.getInputs());
            Iterator<String> iterator = inputs.iterator();
            this.populateResolvableTypeMap(firstMethodParameter, resolvableTypeMap, iterator, method, functionName);
            Class outputRawclass = currentOutputGeneric.getRawClass();
            this.traverseReturnTypeForComponentBeans(resolvableTypeMap, currentOutputGeneric, inputs, iterator, outputRawclass);
        } else if (resolvableType != null && resolvableType.getRawClass() != null) {
            ResolvableType outboundResolvableType;
            int i;
            ResolvableType currentOutputGeneric;
            int inputCount = 1;
            if (resolvableType.getRawClass().isAssignableFrom(BiFunction.class) || resolvableType.getRawClass().isAssignableFrom(BiConsumer.class)) {
                inputCount = 2;
                currentOutputGeneric = resolvableType.getGeneric(new int[]{2});
            } else {
                currentOutputGeneric = resolvableType.getGeneric(new int[]{1});
            }
            while (currentOutputGeneric.getRawClass() != null && this.functionOrConsumerFound(currentOutputGeneric)) {
                ++inputCount;
                currentOutputGeneric = currentOutputGeneric.getGeneric(new int[]{1});
            }
            LinkedHashSet<String> inputs = new LinkedHashSet<String>(kafkaStreamsBindableProxyFactory.getInputs());
            Iterator<String> iterator = inputs.iterator();
            this.populateResolvableTypeMap(resolvableType, resolvableTypeMap, iterator);
            ResolvableType iterableResType = resolvableType;
            int n = i = resolvableType.getRawClass().isAssignableFrom(BiFunction.class) || resolvableType.getRawClass().isAssignableFrom(BiConsumer.class) ? 2 : 1;
            if (i == inputCount) {
                outboundResolvableType = iterableResType.getGeneric(new int[]{i});
            } else {
                while (i < inputCount && iterator.hasNext()) {
                    if ((iterableResType = iterableResType.getGeneric(new int[]{1})).getRawClass() != null && this.functionOrConsumerFound(iterableResType)) {
                        this.populateResolvableTypeMap(iterableResType, resolvableTypeMap, iterator);
                    }
                    ++i;
                }
                outboundResolvableType = iterableResType.getGeneric(new int[]{1});
            }
            resolvableTypeMap.put(OUTBOUND, outboundResolvableType);
        }
        return resolvableTypeMap;
    }

    private void traverseReturnTypeForComponentBeans(Map<String, ResolvableType> resolvableTypeMap, ResolvableType currentOutputGeneric, Set<String> inputs, Iterator<String> iterator, Class<?> outputRawclass) {
        if (outputRawclass != null && !outputRawclass.equals(Void.TYPE)) {
            ResolvableType iterableResType = currentOutputGeneric;
            for (int i = 1; i < inputs.size() && iterator.hasNext(); ++i) {
                if (iterableResType.getRawClass() != null && this.functionOrConsumerFound(iterableResType)) {
                    this.populateResolvableTypeMap(iterableResType, resolvableTypeMap, iterator);
                }
                iterableResType = iterableResType.getGeneric(new int[]{1});
            }
            if (iterableResType.getRawClass() != null && KStream.class.isAssignableFrom(iterableResType.getRawClass())) {
                resolvableTypeMap.put(OUTBOUND, iterableResType);
            }
        }
    }

    private boolean functionOrConsumerFound(ResolvableType iterableResType) {
        return iterableResType.getRawClass().equals(Function.class) || iterableResType.getRawClass().equals(Consumer.class);
    }

    private void populateResolvableTypeMap(ResolvableType resolvableType, Map<String, ResolvableType> resolvableTypeMap, Iterator<String> iterator) {
        String next = iterator.next();
        resolvableTypeMap.put(next, resolvableType.getGeneric(new int[]{0}));
        if (resolvableType.getRawClass() != null && (resolvableType.getRawClass().isAssignableFrom(BiFunction.class) || resolvableType.getRawClass().isAssignableFrom(BiConsumer.class)) && iterator.hasNext()) {
            resolvableTypeMap.put(iterator.next(), resolvableType.getGeneric(new int[]{1}));
        }
    }

    private void populateResolvableTypeMap(ResolvableType resolvableType, Map<String, ResolvableType> resolvableTypeMap, Iterator<String> iterator, Method method, String functionName) {
        Object bean;
        String next = iterator.next();
        resolvableTypeMap.put(next, resolvableType);
        if (method != null && (BiFunction.class.isAssignableFrom((bean = this.beanFactory.getBean(functionName)).getClass()) || BiConsumer.class.isAssignableFrom(bean.getClass()))) {
            resolvableTypeMap.put(iterator.next(), ResolvableType.forMethodParameter((Method)method, (int)1));
        }
    }

    private ResolvableType checkOutboundForComposedFunctions(ResolvableType outputResolvableType) {
        ResolvableType currentOutputGeneric = outputResolvableType.getRawClass() != null && outputResolvableType.getRawClass().isAssignableFrom(BiFunction.class) ? outputResolvableType.getGeneric(new int[]{2}) : outputResolvableType.getGeneric(new int[]{1});
        while (currentOutputGeneric.getRawClass() != null && this.functionOrConsumerFound(currentOutputGeneric)) {
            currentOutputGeneric = currentOutputGeneric.getGeneric(new int[]{1});
        }
        return currentOutputGeneric;
    }

    public void setupFunctionInvokerForKafkaStreams(ResolvableType resolvableType, String functionName, KafkaStreamsBindableProxyFactory kafkaStreamsBindableProxyFactory, Method method, ResolvableType outputResolvableType, String ... composedFunctionNames) {
        ResolvableType outboundResolvableType;
        Map<String, ResolvableType> resolvableTypes = this.buildTypeMap(resolvableType, kafkaStreamsBindableProxyFactory, method, functionName);
        if (outputResolvableType != null) {
            outboundResolvableType = this.checkOutboundForComposedFunctions(outputResolvableType);
            resolvableTypes.remove(OUTBOUND);
        } else {
            outboundResolvableType = resolvableTypes.remove(OUTBOUND);
        }
        Object[] adaptedInboundArguments = this.adaptAndRetrieveInboundArguments(resolvableTypes, functionName);
        try {
            if (resolvableType.getRawClass() != null && resolvableType.getRawClass().equals(Consumer.class)) {
                Consumer consumer = (Consumer)this.beanFactory.getBean(functionName);
                consumer.accept(adaptedInboundArguments[0]);
            } else if (resolvableType.getRawClass() != null && resolvableType.getRawClass().equals(BiConsumer.class)) {
                BiConsumer biConsumer = (BiConsumer)this.beanFactory.getBean(functionName);
                biConsumer.accept(adaptedInboundArguments[0], adaptedInboundArguments[1]);
            } else if (method != null) {
                Object bean = this.beanFactory.getBean(functionName);
                if (Consumer.class.isAssignableFrom(bean.getClass())) {
                    ((Consumer)bean).accept(adaptedInboundArguments[0]);
                } else if (BiConsumer.class.isAssignableFrom(bean.getClass())) {
                    ((BiConsumer)bean).accept(adaptedInboundArguments[0], adaptedInboundArguments[1]);
                } else if (Function.class.isAssignableFrom(bean.getClass()) || BiFunction.class.isAssignableFrom(bean.getClass())) {
                    Object result = BiFunction.class.isAssignableFrom(bean.getClass()) ? ((BiFunction)bean).apply(adaptedInboundArguments[0], adaptedInboundArguments[1]) : ((Function)bean).apply(adaptedInboundArguments[0]);
                    if ((result = this.handleCurriedFunctions(adaptedInboundArguments, result)) != null) {
                        TreeSet<String> outputs = new TreeSet<String>(kafkaStreamsBindableProxyFactory.getOutputs());
                        Iterator<String> outboundDefinitionIterator = outputs.iterator();
                        if (result.getClass().isArray()) {
                            String initialInput = resolvableTypes.keySet().iterator().next();
                            StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsBindingInformationCatalogue.getStreamsBuilderFactoryBeanPerBinding().get(initialInput);
                            this.handleKStreamArrayOutbound(resolvableType, functionName, kafkaStreamsBindableProxyFactory, outboundResolvableType, (Object[])result, streamsBuilderFactoryBean);
                        } else if (KTable.class.isAssignableFrom(result.getClass())) {
                            this.handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType != null ? outboundResolvableType : resolvableType.getGeneric(new int[]{1}), (KStream<Object, Object>)((KTable)result).toStream(), outboundDefinitionIterator);
                        } else {
                            this.handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType, (KStream<Object, Object>)((KStream)result), outboundDefinitionIterator);
                        }
                    }
                }
            } else {
                Object result = null;
                if (resolvableType.getRawClass() != null && resolvableType.getRawClass().equals(BiFunction.class)) {
                    if (composedFunctionNames != null && composedFunctionNames.length > 0) {
                        result = this.handleComposedFunctions(adaptedInboundArguments, result, composedFunctionNames);
                    } else {
                        BiFunction biFunction = (BiFunction)this.beanFactory.getBean(functionName);
                        result = biFunction.apply(adaptedInboundArguments[0], adaptedInboundArguments[1]);
                        result = this.handleCurriedFunctions(adaptedInboundArguments, result);
                    }
                } else if (composedFunctionNames != null && composedFunctionNames.length > 0) {
                    result = this.handleComposedFunctions(adaptedInboundArguments, result, composedFunctionNames);
                } else {
                    Function function = (Function)this.beanFactory.getBean(functionName);
                    result = function.apply(adaptedInboundArguments[0]);
                    result = this.handleCurriedFunctions(adaptedInboundArguments, result);
                }
                if (result != null) {
                    TreeSet<String> outputs = new TreeSet<String>(kafkaStreamsBindableProxyFactory.getOutputs());
                    Iterator<String> outboundDefinitionIterator = outputs.iterator();
                    if (result.getClass().isArray()) {
                        String initialInput = resolvableTypes.keySet().iterator().next();
                        StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsBindingInformationCatalogue.getStreamsBuilderFactoryBeanPerBinding().get(initialInput);
                        this.handleKStreamArrayOutbound(resolvableType, functionName, kafkaStreamsBindableProxyFactory, outboundResolvableType, (Object[])result, streamsBuilderFactoryBean);
                    } else if (KTable.class.isAssignableFrom(result.getClass())) {
                        this.handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType != null ? outboundResolvableType : resolvableType.getGeneric(new int[]{1}), (KStream<Object, Object>)((KTable)result).toStream(), outboundDefinitionIterator);
                    } else {
                        this.handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType != null ? outboundResolvableType : resolvableType.getGeneric(new int[]{1}), (KStream<Object, Object>)((KStream)result), outboundDefinitionIterator);
                    }
                }
            }
        }
        catch (Exception ex) {
            throw new BeanInitializationException("Cannot setup function invoker for this Kafka Streams function.", (Throwable)ex);
        }
    }

    private Object handleComposedFunctions(Object[] adaptedInboundArguments, Object result, String ... composedFunctionNames) {
        Object bean = this.beanFactory.getBean(composedFunctionNames[0]);
        if (BiFunction.class.isAssignableFrom(bean.getClass())) {
            result = ((BiFunction)bean).apply(adaptedInboundArguments[0], adaptedInboundArguments[1]);
        } else if (Function.class.isAssignableFrom(bean.getClass())) {
            result = ((Function)bean).apply(adaptedInboundArguments[0]);
        }
        result = this.handleCurriedFunctions(adaptedInboundArguments, result);
        return this.applyComposedFunctions(result, composedFunctionNames);
    }

    private Object applyComposedFunctions(Object result, String[] composedFunctionNames) {
        for (int i = 1; i < composedFunctionNames.length; ++i) {
            Object bean = this.beanFactory.getBean(composedFunctionNames[i]);
            if (Consumer.class.isAssignableFrom(bean.getClass())) {
                ((Consumer)bean).accept(result);
                result = null;
                continue;
            }
            if (Function.class.isAssignableFrom(bean.getClass())) {
                result = ((Function)bean).apply(result);
                continue;
            }
            throw new IllegalStateException("You can only compose functions of type either java.util.function.Function or java.util.function.Consumer.");
        }
        return result;
    }

    private Object handleCurriedFunctions(Object[] adaptedInboundArguments, Object result) {
        int i = 1;
        while (result instanceof Function || result instanceof Consumer) {
            if (result instanceof Function) {
                result = ((Function)result).apply(adaptedInboundArguments[i]);
            } else {
                ((Consumer)result).accept(adaptedInboundArguments[i]);
                result = null;
            }
            ++i;
        }
        return result;
    }

    private void handleSingleKStreamOutbound(Map<String, ResolvableType> resolvableTypes, ResolvableType outboundResolvableType, KStream<Object, Object> result, Iterator<String> outboundDefinitionIterator) {
        if (outboundDefinitionIterator.hasNext()) {
            String outbound = outboundDefinitionIterator.next();
            Object targetBean = this.handleSingleKStreamOutbound(result, outbound);
            this.kafkaStreamsBindingInformationCatalogue.addOutboundKStreamResolvable(targetBean, outboundResolvableType);
            String next = resolvableTypes.keySet().iterator().next();
            StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsBindingInformationCatalogue.getStreamsBuilderFactoryBeanPerBinding().get(next);
            this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(outbound, streamsBuilderFactoryBean);
        }
    }

    private Object handleSingleKStreamOutbound(KStream<Object, Object> result, String next) {
        Object targetBean = this.applicationContext.getBean(next);
        KStreamBoundElementFactory.KStreamWrapper boundElement = (KStreamBoundElementFactory.KStreamWrapper)targetBean;
        boundElement.wrap(result);
        return targetBean;
    }

    private void handleKStreamArrayOutbound(ResolvableType resolvableType, String functionName, KafkaStreamsBindableProxyFactory kafkaStreamsBindableProxyFactory, ResolvableType outboundResolvableType, Object[] result, StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
        int length = result.length;
        List<String> outputBindings = this.getOutputBindings(functionName, length);
        Iterator<String> iterator = outputBindings.iterator();
        BeanDefinitionRegistry registry = (BeanDefinitionRegistry)this.beanFactory;
        for (Object o : result) {
            String next = iterator.next();
            kafkaStreamsBindableProxyFactory.addOutputBinding(next, KStream.class);
            RootBeanDefinition rootBeanDefinition1 = new RootBeanDefinition();
            rootBeanDefinition1.setInstanceSupplier(() -> kafkaStreamsBindableProxyFactory.getOutputHolders().get(next).getBoundTarget());
            registry.registerBeanDefinition(next, (BeanDefinition)rootBeanDefinition1);
            Object targetBean = this.applicationContext.getBean(next);
            KStreamBoundElementFactory.KStreamWrapper boundElement = (KStreamBoundElementFactory.KStreamWrapper)targetBean;
            boundElement.wrap((KStream<Object, Object>)((KStream)o));
            this.kafkaStreamsBindingInformationCatalogue.addOutboundKStreamResolvable(targetBean, outboundResolvableType != null ? outboundResolvableType : resolvableType.getGeneric(new int[]{1}));
            this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(next, streamsBuilderFactoryBean);
        }
    }

    private List<String> getOutputBindings(String functionName, int outputs) {
        List outputBindings = this.streamFunctionProperties.getOutputBindings(functionName);
        ArrayList<String> outputBindingNames = new ArrayList<String>();
        if (!CollectionUtils.isEmpty((Collection)outputBindings)) {
            outputBindingNames.addAll(outputBindings);
            return outputBindingNames;
        }
        for (int i = 0; i < outputs; ++i) {
            outputBindingNames.add(String.format("%s-%s-%d", functionName, "out", i));
        }
        return outputBindingNames;
    }

    private Object[] adaptAndRetrieveInboundArguments(Map<String, ResolvableType> stringResolvableTypeMap, String functionName) {
        Object[] arguments = new Object[stringResolvableTypeMap.size()];
        int i = 0;
        for (String input : stringResolvableTypeMap.keySet()) {
            StreamsBuilderFactoryBean streamsBuilderFactoryBean;
            Class parameterType = stringResolvableTypeMap.get(input).getRawClass();
            if (input == null) continue;
            Object targetBean = this.applicationContext.getBean(input);
            BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(input);
            if (!this.methodStreamsBuilderFactoryBeanMap.containsKey(functionName)) {
                streamsBuilderFactoryBean = this.buildStreamsBuilderAndRetrieveConfig(functionName, (ApplicationContext)this.applicationContext, input, this.kafkaStreamsBinderConfigurationProperties, this.customizer, this.environment, bindingProperties);
                this.methodStreamsBuilderFactoryBeanMap.put(functionName, streamsBuilderFactoryBean);
            }
            try {
                streamsBuilderFactoryBean = this.methodStreamsBuilderFactoryBeanMap.get(functionName);
                StreamsBuilder streamsBuilder = (StreamsBuilder)streamsBuilderFactoryBean.getObject();
                String applicationId = streamsBuilderFactoryBean.getStreamsConfiguration().getProperty("application.id");
                KafkaStreamsConsumerProperties extendedConsumerProperties = (KafkaStreamsConsumerProperties)((Object)this.kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(input));
                extendedConsumerProperties.setApplicationId(applicationId);
                Serde<?> keySerde = this.keyValueSerdeResolver.getInboundKeySerde(extendedConsumerProperties, stringResolvableTypeMap.get(input));
                LOG.info((Object)("Key Serde used for " + input + ": " + keySerde.getClass().getName()));
                Serde<?> valueSerde = this.bindingServiceProperties.getConsumerProperties(input).isUseNativeDecoding() ? this.getValueSerde(input, extendedConsumerProperties, stringResolvableTypeMap.get(input)) : Serdes.ByteArray();
                LOG.info((Object)("Value Serde used for " + input + ": " + valueSerde.getClass().getName()));
                Topology.AutoOffsetReset autoOffsetReset = this.getAutoOffsetReset(input, extendedConsumerProperties);
                if (parameterType.isAssignableFrom(KStream.class)) {
                    KStream<?, ?> stream = this.getKStream(input, bindingProperties, extendedConsumerProperties, streamsBuilder, keySerde, valueSerde, autoOffsetReset, i == 0);
                    KStreamBoundElementFactory.KStreamWrapper kStreamWrapper = (KStreamBoundElementFactory.KStreamWrapper)targetBean;
                    kStreamWrapper.wrap(stream);
                    this.kafkaStreamsBindingInformationCatalogue.addKeySerde((KStream)kStreamWrapper, keySerde);
                    this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(input, streamsBuilderFactoryBean);
                    this.kafkaStreamsBindingInformationCatalogue.addConsumerPropertiesPerSbfb(streamsBuilderFactoryBean, this.bindingServiceProperties.getConsumerProperties(input));
                    if (KStream.class.isAssignableFrom(stringResolvableTypeMap.get(input).getRawClass())) {
                        Class valueClass = stringResolvableTypeMap.get(input).getGeneric(new int[]{1}).getRawClass() != null ? stringResolvableTypeMap.get(input).getGeneric(new int[]{1}).getRawClass() : Object.class;
                        arguments[i] = this.kafkaStreamsBindingInformationCatalogue.isUseNativeDecoding((KStream)kStreamWrapper) ? stream : this.kafkaStreamsMessageConversionDelegate.deserializeOnInbound(valueClass, stream);
                    }
                    if (arguments[i] == null) {
                        arguments[i] = stream;
                    }
                    Assert.notNull((Object)arguments[i], (String)"Problems encountered while adapting the function argument.");
                } else {
                    this.handleKTableGlobalKTableInputs(arguments, i, input, parameterType, targetBean, streamsBuilderFactoryBean, streamsBuilder, extendedConsumerProperties, keySerde, valueSerde, autoOffsetReset, i == 0);
                }
                ++i;
            }
            catch (Exception ex) {
                throw new IllegalStateException(ex);
            }
        }
        return arguments;
    }

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
    }
}

