/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka.streams;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.core.FluxedConsumer;
import org.springframework.cloud.function.core.FluxedFunction;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.GlobalKTableBoundElementFactory;
import org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory;
import org.springframework.cloud.stream.binder.kafka.streams.KTableBoundElementFactory;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBindingInformationCatalogue;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate;
import org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolver;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binding.BindableProxyFactory;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.ResolvableType;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

public class KafkaStreamsFunctionProcessor
implements ApplicationContextAware {
    private static final Log LOG = LogFactory.getLog(KafkaStreamsFunctionProcessor.class);
    private final BindingServiceProperties bindingServiceProperties;
    private final Map<String, StreamsBuilderFactoryBean> methodStreamsBuilderFactoryBeanMap = new HashMap<String, StreamsBuilderFactoryBean>();
    private final KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties;
    private final KeyValueSerdeResolver keyValueSerdeResolver;
    private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
    private final KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate;
    private final CleanupConfig cleanupConfig;
    private final FunctionCatalog functionCatalog;
    private final BindableProxyFactory bindableProxyFactory;
    private ConfigurableApplicationContext applicationContext;
    private Set<String> origInputs = new TreeSet<String>();
    private Set<String> origOutputs = new TreeSet<String>();

    public KafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProperties, KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties, KeyValueSerdeResolver keyValueSerdeResolver, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate, CleanupConfig cleanupConfig, FunctionCatalog functionCatalog, BindableProxyFactory bindableProxyFactory) {
        this.bindingServiceProperties = bindingServiceProperties;
        this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
        this.keyValueSerdeResolver = keyValueSerdeResolver;
        this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
        this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate;
        this.cleanupConfig = cleanupConfig;
        this.functionCatalog = functionCatalog;
        this.bindableProxyFactory = bindableProxyFactory;
        this.origInputs.addAll(this.bindableProxyFactory.getInputs());
        this.origOutputs.addAll(this.bindableProxyFactory.getOutputs());
    }

    private Map<String, ResolvableType> buildTypeMap(ResolvableType resolvableType) {
        int inputCount = 1;
        for (ResolvableType resolvableTypeGeneric = resolvableType.getGeneric(new int[]{1}); resolvableTypeGeneric != null && resolvableTypeGeneric.getRawClass() != null && (resolvableTypeGeneric.getRawClass().equals(Function.class) || resolvableTypeGeneric.getRawClass().equals(Consumer.class)); resolvableTypeGeneric = resolvableTypeGeneric.getGeneric(new int[]{1})) {
            ++inputCount;
        }
        TreeSet<String> inputs = new TreeSet<String>(this.origInputs);
        LinkedHashMap<String, ResolvableType> resolvableTypeMap = new LinkedHashMap<String, ResolvableType>();
        Iterator iterator = inputs.iterator();
        String next = (String)iterator.next();
        resolvableTypeMap.put(next, resolvableType.getGeneric(new int[]{0}));
        this.origInputs.remove(next);
        for (int i = 1; i < inputCount; ++i) {
            ResolvableType generic;
            if (!iterator.hasNext() || (generic = resolvableType.getGeneric(new int[]{1})).getRawClass() == null || !generic.getRawClass().equals(Function.class) && !generic.getRawClass().equals(Consumer.class)) continue;
            String next1 = (String)iterator.next();
            resolvableTypeMap.put(next1, generic.getGeneric(new int[]{0}));
            this.origInputs.remove(next1);
        }
        return resolvableTypeMap;
    }

    public void orchestrateFunctionInvoking(ResolvableType resolvableType, String functionName) {
        Map<String, ResolvableType> stringResolvableTypeMap = this.buildTypeMap(resolvableType);
        Object[] adaptedInboundArguments = this.adaptAndRetrieveInboundArguments(stringResolvableTypeMap, functionName);
        try {
            if (resolvableType.getRawClass() != null && resolvableType.getRawClass().equals(Consumer.class)) {
                Consumer consumer;
                FluxedConsumer fluxedConsumer = (FluxedConsumer)this.functionCatalog.lookup(FluxedConsumer.class, functionName);
                Assert.isTrue((fluxedConsumer != null ? 1 : 0) != 0, (String)"No corresponding consumer beans found in the catalog");
                Object target = fluxedConsumer.getTarget();
                Consumer consumer2 = consumer = Consumer.class.isAssignableFrom(target.getClass()) ? (Consumer)target : null;
                if (consumer != null) {
                    consumer.accept(adaptedInboundArguments[0]);
                }
            } else {
                Function function = (Function)this.functionCatalog.lookup(Function.class, functionName);
                Object target = null;
                if (function instanceof FluxedFunction) {
                    target = ((FluxedFunction)function).getTarget();
                }
                function = (Function)target;
                Object result = function.apply(adaptedInboundArguments[0]);
                int i = 1;
                while (result instanceof Function || result instanceof Consumer) {
                    if (result instanceof Function) {
                        result = ((Function)result).apply(adaptedInboundArguments[i]);
                    } else {
                        ((Consumer)result).accept(adaptedInboundArguments[i]);
                        result = null;
                    }
                    ++i;
                }
                if (result != null) {
                    TreeSet<String> outputs = new TreeSet<String>(this.origOutputs);
                    Iterator iterator = outputs.iterator();
                    if (result.getClass().isArray()) {
                        int length = ((Object[])result).length;
                        String[] methodAnnotatedOutboundNames = new String[length];
                        for (int j = 0; j < length; ++j) {
                            String next;
                            if (!iterator.hasNext()) continue;
                            methodAnnotatedOutboundNames[j] = next = (String)iterator.next();
                            this.origOutputs.remove(next);
                        }
                        Object[] outboundKStreams = (Object[])result;
                        int k = 0;
                        for (Object outboundKStream : outboundKStreams) {
                            Object targetBean = this.applicationContext.getBean(methodAnnotatedOutboundNames[k++]);
                            KStreamBoundElementFactory.KStreamWrapper boundElement = (KStreamBoundElementFactory.KStreamWrapper)targetBean;
                            boundElement.wrap((KStream<Object, Object>)((KStream)outboundKStream));
                        }
                    } else if (iterator.hasNext()) {
                        String next = (String)iterator.next();
                        Object targetBean = this.applicationContext.getBean(next);
                        this.origOutputs.remove(next);
                        KStreamBoundElementFactory.KStreamWrapper boundElement = (KStreamBoundElementFactory.KStreamWrapper)targetBean;
                        boundElement.wrap((KStream<Object, Object>)((KStream)result));
                    }
                }
            }
        }
        catch (Exception ex) {
            throw new BeanInitializationException("Cannot setup StreamListener for foobar", (Throwable)ex);
        }
    }

    private Object[] adaptAndRetrieveInboundArguments(Map<String, ResolvableType> stringResolvableTypeMap, String functionName) {
        Object[] arguments = new Object[stringResolvableTypeMap.size()];
        int i = 0;
        for (String input : stringResolvableTypeMap.keySet()) {
            Class parameterType = stringResolvableTypeMap.get(input).getRawClass();
            if (input != null) {
                Assert.isInstanceOf(String.class, (Object)input, (String)"Annotation value must be a String");
                Object targetBean = this.applicationContext.getBean(input);
                BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(input);
                this.enableNativeDecodingForKTableAlways(parameterType, bindingProperties);
                if (!this.methodStreamsBuilderFactoryBeanMap.containsKey(functionName)) {
                    this.buildStreamsBuilderAndRetrieveConfig(functionName, (ApplicationContext)this.applicationContext, input);
                }
                try {
                    KTable<?, ?> table;
                    String bindingDestination;
                    String materializedAs;
                    StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.methodStreamsBuilderFactoryBeanMap.get(functionName);
                    StreamsBuilder streamsBuilder = (StreamsBuilder)streamsBuilderFactoryBean.getObject();
                    KafkaStreamsConsumerProperties extendedConsumerProperties = (KafkaStreamsConsumerProperties)((Object)this.kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(input));
                    Serde<?> keySerde = this.keyValueSerdeResolver.getInboundKeySerde(extendedConsumerProperties);
                    Serde<?> valueSerde = this.keyValueSerdeResolver.getInboundValueSerde(bindingProperties.getConsumer(), extendedConsumerProperties);
                    KafkaConsumerProperties.StartOffset startOffset = extendedConsumerProperties.getStartOffset();
                    Topology.AutoOffsetReset autoOffsetReset = null;
                    if (startOffset != null) {
                        switch (startOffset) {
                            case earliest: {
                                autoOffsetReset = Topology.AutoOffsetReset.EARLIEST;
                                break;
                            }
                            case latest: {
                                autoOffsetReset = Topology.AutoOffsetReset.LATEST;
                                break;
                            }
                        }
                    }
                    if (extendedConsumerProperties.isResetOffsets()) {
                        LOG.warn((Object)("Detected resetOffsets configured on binding " + input + ". Setting resetOffsets in Kafka Streams binder does not have any effect."));
                    }
                    if (parameterType.isAssignableFrom(KStream.class)) {
                        KStream<?, ?> stream = this.getkStream(input, bindingProperties, streamsBuilder, keySerde, valueSerde, autoOffsetReset);
                        KStreamBoundElementFactory.KStreamWrapper kStreamWrapper = (KStreamBoundElementFactory.KStreamWrapper)targetBean;
                        kStreamWrapper.wrap(stream);
                        this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactory(streamsBuilderFactoryBean);
                        if (KStream.class.isAssignableFrom(stringResolvableTypeMap.get(input).getRawClass())) {
                            Class valueClass = stringResolvableTypeMap.get(input).getGeneric(new int[]{1}).getRawClass() != null ? stringResolvableTypeMap.get(input).getGeneric(new int[]{1}).getRawClass() : Object.class;
                            arguments[i] = this.kafkaStreamsBindingInformationCatalogue.isUseNativeDecoding((KStream)kStreamWrapper) ? stream : this.kafkaStreamsMessageConversionDelegate.deserializeOnInbound(valueClass, stream);
                        }
                        if (arguments[i] == null) {
                            arguments[i] = stream;
                        }
                        Assert.notNull((Object)arguments[i], (String)"problems..");
                    } else if (parameterType.isAssignableFrom(KTable.class)) {
                        materializedAs = extendedConsumerProperties.getMaterializedAs();
                        bindingDestination = this.bindingServiceProperties.getBindingDestination(input);
                        table = this.getKTable(streamsBuilder, keySerde, valueSerde, materializedAs, bindingDestination, autoOffsetReset);
                        KTableBoundElementFactory.KTableWrapper kTableWrapper = (KTableBoundElementFactory.KTableWrapper)targetBean;
                        kTableWrapper.wrap(table);
                        this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactory(streamsBuilderFactoryBean);
                        arguments[i] = table;
                    } else if (parameterType.isAssignableFrom(GlobalKTable.class)) {
                        materializedAs = extendedConsumerProperties.getMaterializedAs();
                        bindingDestination = this.bindingServiceProperties.getBindingDestination(input);
                        table = this.getGlobalKTable(streamsBuilder, keySerde, valueSerde, materializedAs, bindingDestination, autoOffsetReset);
                        GlobalKTableBoundElementFactory.GlobalKTableWrapper globalKTableWrapper = (GlobalKTableBoundElementFactory.GlobalKTableWrapper)targetBean;
                        globalKTableWrapper.wrap((GlobalKTable<Object, Object>)table);
                        this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactory(streamsBuilderFactoryBean);
                        arguments[i] = table;
                    }
                    ++i;
                    continue;
                }
                catch (Exception ex) {
                    throw new IllegalStateException(ex);
                }
            }
            throw new IllegalStateException("A method annotated with @StreamListener may use @Input or @Output annotations only in declarative mode and for parameters that are binding targets or convertible from binding targets.");
        }
        return arguments;
    }

    private GlobalKTable<?, ?> getGlobalKTable(StreamsBuilder streamsBuilder, Serde<?> keySerde, Serde<?> valueSerde, String materializedAs, String bindingDestination, Topology.AutoOffsetReset autoOffsetReset) {
        return materializedAs != null ? this.materializedAsGlobalKTable(streamsBuilder, bindingDestination, materializedAs, keySerde, valueSerde, autoOffsetReset) : streamsBuilder.globalTable(bindingDestination, Consumed.with(keySerde, valueSerde).withOffsetResetPolicy(autoOffsetReset));
    }

    private KTable<?, ?> getKTable(StreamsBuilder streamsBuilder, Serde<?> keySerde, Serde<?> valueSerde, String materializedAs, String bindingDestination, Topology.AutoOffsetReset autoOffsetReset) {
        return materializedAs != null ? this.materializedAs(streamsBuilder, bindingDestination, materializedAs, keySerde, valueSerde, autoOffsetReset) : streamsBuilder.table(bindingDestination, Consumed.with(keySerde, valueSerde).withOffsetResetPolicy(autoOffsetReset));
    }

    private <K, V> KTable<K, V> materializedAs(StreamsBuilder streamsBuilder, String destination, String storeName, Serde<K> k, Serde<V> v, Topology.AutoOffsetReset autoOffsetReset) {
        return streamsBuilder.table(this.bindingServiceProperties.getBindingDestination(destination), Consumed.with(k, v).withOffsetResetPolicy(autoOffsetReset), this.getMaterialized(storeName, k, v));
    }

    private <K, V> GlobalKTable<K, V> materializedAsGlobalKTable(StreamsBuilder streamsBuilder, String destination, String storeName, Serde<K> k, Serde<V> v, Topology.AutoOffsetReset autoOffsetReset) {
        return streamsBuilder.globalTable(this.bindingServiceProperties.getBindingDestination(destination), Consumed.with(k, v).withOffsetResetPolicy(autoOffsetReset), this.getMaterialized(storeName, k, v));
    }

    private <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> getMaterialized(String storeName, Serde<K> k, Serde<V> v) {
        return Materialized.as((String)storeName).withKeySerde(k).withValueSerde(v);
    }

    private KStream<?, ?> getkStream(String inboundName, BindingProperties bindingProperties, StreamsBuilder streamsBuilder, Serde<?> keySerde, Serde<?> valueSerde, Topology.AutoOffsetReset autoOffsetReset) {
        try {
            Map storeBuilders = this.applicationContext.getBeansOfType(StoreBuilder.class);
            if (!CollectionUtils.isEmpty((Map)storeBuilders)) {
                storeBuilders.values().forEach(storeBuilder -> {
                    streamsBuilder.addStateStore(storeBuilder);
                    if (LOG.isInfoEnabled()) {
                        LOG.info((Object)("state store " + storeBuilder.name() + " added to topology"));
                    }
                });
            }
        }
        catch (Exception storeBuilders) {
            // empty catch block
        }
        String[] bindingTargets = StringUtils.commaDelimitedListToStringArray((String)this.bindingServiceProperties.getBindingDestination(inboundName));
        KStream stream = streamsBuilder.stream(Arrays.asList(bindingTargets), Consumed.with(keySerde, valueSerde).withOffsetResetPolicy(autoOffsetReset));
        boolean nativeDecoding = this.bindingServiceProperties.getConsumerProperties(inboundName).isUseNativeDecoding();
        if (nativeDecoding) {
            LOG.info((Object)("Native decoding is enabled for " + inboundName + ". Inbound deserialization done at the broker."));
        } else {
            LOG.info((Object)("Native decoding is disabled for " + inboundName + ". Inbound message conversion done by Spring Cloud Stream."));
        }
        stream = stream.mapValues(value -> {
            String contentType = bindingProperties.getContentType();
            Object returnValue = value != null && !StringUtils.isEmpty((Object)contentType) && !nativeDecoding ? MessageBuilder.withPayload((Object)value).setHeader("contentType", (Object)contentType).build() : value;
            return returnValue;
        });
        return stream;
    }

    private void enableNativeDecodingForKTableAlways(Class<?> parameterType, BindingProperties bindingProperties) {
        if (parameterType.isAssignableFrom(KTable.class) || parameterType.isAssignableFrom(GlobalKTable.class)) {
            if (bindingProperties.getConsumer() == null) {
                bindingProperties.setConsumer(new ConsumerProperties());
            }
            bindingProperties.getConsumer().setUseNativeDecoding(true);
        }
    }

    private void buildStreamsBuilderAndRetrieveConfig(String functionName, ApplicationContext applicationContext, String inboundName) {
        int concurrency;
        ConfigurableListableBeanFactory beanFactory = this.applicationContext.getBeanFactory();
        Map streamConfigGlobalProperties = (Map)applicationContext.getBean("streamConfigGlobalProperties", Map.class);
        KafkaStreamsConsumerProperties extendedConsumerProperties = (KafkaStreamsConsumerProperties)((Object)this.kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(inboundName));
        streamConfigGlobalProperties.putAll(extendedConsumerProperties.getConfiguration());
        String applicationId = extendedConsumerProperties.getApplicationId();
        if (StringUtils.hasText((String)applicationId)) {
            streamConfigGlobalProperties.put("application.id", applicationId);
        }
        if ((concurrency = this.bindingServiceProperties.getConsumerProperties(inboundName).getConcurrency()) > 1) {
            streamConfigGlobalProperties.put("num.stream.threads", concurrency);
        }
        final Map kafkaStreamsDlqDispatchers = (Map)applicationContext.getBean("kafkaStreamsDlqDispatchers", Map.class);
        KafkaStreamsConfiguration kafkaStreamsConfiguration = new KafkaStreamsConfiguration(streamConfigGlobalProperties){

            public Properties asProperties() {
                Properties properties = super.asProperties();
                properties.put("spring.cloud.stream.kafka.streams.dlq.dispatchers", kafkaStreamsDlqDispatchers);
                return properties;
            }
        };
        StreamsBuilderFactoryBean streamsBuilder = this.cleanupConfig == null ? new StreamsBuilderFactoryBean(kafkaStreamsConfiguration) : new StreamsBuilderFactoryBean(kafkaStreamsConfiguration, this.cleanupConfig);
        streamsBuilder.setAutoStartup(false);
        AbstractBeanDefinition streamsBuilderBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(streamsBuilder.getClass(), () -> streamsBuilder).getRawBeanDefinition();
        ((BeanDefinitionRegistry)beanFactory).registerBeanDefinition("stream-builder-" + functionName, (BeanDefinition)streamsBuilderBeanDefinition);
        StreamsBuilderFactoryBean streamsBuilderX = (StreamsBuilderFactoryBean)applicationContext.getBean("&stream-builder-" + functionName, StreamsBuilderFactoryBean.class);
        this.methodStreamsBuilderFactoryBeanMap.put(functionName, streamsBuilderX);
    }

    public final void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = (ConfigurableApplicationContext)applicationContext;
    }
}

