/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.function;

import java.lang.reflect.Type;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanDefinitionCustomizer;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.stream.binder.BindingCreatedEvent;
import org.springframework.cloud.stream.binding.BindableProxyFactory;
import org.springframework.cloud.stream.config.BinderFactoryAutoConfiguration;
import org.springframework.cloud.stream.config.BindingServiceConfiguration;
import org.springframework.cloud.stream.function.StreamFunctionProperties;
import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.channel.MessageChannelReactiveUtils;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.StandardIntegrationFlow;
import org.springframework.integration.handler.ServiceActivatingHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.ObjectUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

@Configuration
@EnableConfigurationProperties(value={StreamFunctionProperties.class})
@Import(value={BinderFactoryAutoConfiguration.class})
@AutoConfigureBefore(value={BindingServiceConfiguration.class})
public class FunctionConfiguration {
    @Bean
    public InitializingBean functionChannelBindingInitializer(FunctionCatalog functionCatalog, FunctionInspector functionInspector, StreamFunctionProperties functionProperties, @Nullable BindableProxyFactory[] bindableProxyFactory) {
        return new FunctionChannelBindingInitializer(functionCatalog, functionInspector, functionProperties, ObjectUtils.isEmpty((Object[])bindableProxyFactory) ? null : bindableProxyFactory[0]);
    }

    @Bean
    public IntegrationFlow standAloneSupplierFlow(FunctionCatalog functionCatalog, FunctionInspector functionInspector, StreamFunctionProperties functionProperties, GenericApplicationContext context) {
        StandardIntegrationFlow integrationFlow = null;
        BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper functionWrapper = (BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper)functionCatalog.lookup(functionProperties.getDefinition());
        if (functionWrapper != null) {
            AtomicReference triggerRef = new AtomicReference();
            Mono beginPublishingTrigger = Mono.create(emmiter -> triggerRef.set(emmiter));
            context.addApplicationListener(event -> {
                if (event instanceof BindingCreatedEvent && triggerRef.get() != null) {
                    ((MonoSink)triggerRef.get()).success();
                }
            });
            if (!functionProperties.isComposeFrom() && !functionProperties.isComposeTo() && functionWrapper.isSupplier()) {
                integrationFlow = ((IntegrationFlowBuilder)this.integrationFlowFromProvidedSupplier((Supplier<?>)functionWrapper, functionInspector, (Publisher<Object>)beginPublishingTrigger).channel("output")).get();
            }
        }
        return integrationFlow;
    }

    private IntegrationFlowBuilder integrationFlowFromProvidedSupplier(Supplier<?> supplier, FunctionInspector inspector, Publisher<Object> beginPublishingTrigger) {
        IntegrationFlowBuilder integrationFlowBuilder;
        Type functionType = FunctionTypeUtils.getFunctionType(supplier, (FunctionInspector)inspector);
        if (FunctionTypeUtils.isReactive((Type)FunctionTypeUtils.getInputType((Type)functionType, (int)0))) {
            Publisher publisher = (Publisher)supplier.get();
            publisher = publisher instanceof Mono ? ((Mono)publisher).delaySubscription(beginPublishingTrigger).map(this::wrapToMessageIfNecessary) : ((Flux)publisher).delaySubscription(beginPublishingTrigger).map(this::wrapToMessageIfNecessary);
            integrationFlowBuilder = IntegrationFlows.from((Publisher)publisher);
        } else {
            integrationFlowBuilder = IntegrationFlows.from(supplier);
        }
        return integrationFlowBuilder;
    }

    private <T> Message<T> wrapToMessageIfNecessary(T value) {
        return value instanceof Message ? (Message)value : MessageBuilder.withPayload(value).setHeader("contentType", (Object)MimeTypeUtils.APPLICATION_JSON).build();
    }

    private static class FunctionWrapper
    implements Function<Message<byte[]>, Message<byte[]>> {
        private final Function function;

        FunctionWrapper(Function function) {
            this.function = function;
        }

        @Override
        public Message<byte[]> apply(Message<byte[]> t) {
            Message resultMessage = (Message)this.function.apply(t);
            return resultMessage;
        }
    }

    private static class FunctionChannelBindingInitializer
    implements InitializingBean,
    ApplicationContextAware {
        private static Log logger = LogFactory.getLog(FunctionChannelBindingInitializer.class);
        private final FunctionCatalog functionCatalog;
        private final FunctionInspector functionInspector;
        private final StreamFunctionProperties functionProperties;
        private final BindableProxyFactory bindableProxyFactory;
        private GenericApplicationContext context;

        FunctionChannelBindingInitializer(FunctionCatalog functionCatalog, FunctionInspector functionInspector, StreamFunctionProperties functionProperties, BindableProxyFactory bindableProxyFactory) {
            this.functionCatalog = functionCatalog;
            this.functionInspector = functionInspector;
            this.functionProperties = functionProperties;
            this.bindableProxyFactory = bindableProxyFactory;
        }

        public void afterPropertiesSet() throws Exception {
            Object bean;
            MessageChannel messageChannel = null;
            String channelName = "input";
            if (this.context.containsBean(channelName) && (bean = this.context.getBean(channelName)) instanceof MessageChannel) {
                messageChannel = (MessageChannel)this.context.getBean(channelName, MessageChannel.class);
            }
            if (messageChannel == null && this.context.containsBean("output") && (bean = this.context.getBean(channelName = "output")) instanceof MessageChannel) {
                messageChannel = (MessageChannel)this.context.getBean(channelName, SubscribableChannel.class);
            }
            if (messageChannel != null && this.functionCatalog.lookup(this.functionProperties.getDefinition()) != null) {
                this.doPostProcess(channelName, (SubscribableChannel)messageChannel);
            }
        }

        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.context = (GenericApplicationContext)applicationContext;
        }

        private void doPostProcess(String channelName, SubscribableChannel messageChannel) {
            if (this.functionProperties.isComposeTo() && messageChannel instanceof SubscribableChannel && "input".equals(channelName)) {
                throw new UnsupportedOperationException("Composing at tail is not currently supported");
            }
            if (this.functionProperties.isComposeFrom() && "output".equals(channelName)) {
                Assert.notNull((Object)this.bindableProxyFactory, (String)"Can not compose function into the existing app since `bindableProxyFactory` is null.");
                logger.info((Object)"Composing at the head of 'output' channel");
                BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper function = (BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper)this.functionCatalog.lookup(this.functionProperties.getDefinition(), new String[]{"application/json"});
                ServiceActivatingHandler handler = new ServiceActivatingHandler((Object)new FunctionWrapper((Function)function));
                handler.setBeanFactory((BeanFactory)this.context);
                handler.afterPropertiesSet();
                DirectWithAttributesChannel newOutputChannel = new DirectWithAttributesChannel();
                newOutputChannel.setAttribute("type", "output");
                newOutputChannel.setComponentName("output.extended");
                this.context.registerBean("output.extended", MessageChannel.class, () -> newOutputChannel, new BeanDefinitionCustomizer[0]);
                this.bindableProxyFactory.replaceOutputChannel(channelName, "output.extended", (MessageChannel)newOutputChannel);
                handler.setOutputChannelName("output.extended");
                SubscribableChannel subscribeChannel = messageChannel;
                subscribeChannel.subscribe((MessageHandler)handler);
            } else {
                BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper function = (BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper)this.functionCatalog.lookup(this.functionProperties.getDefinition(), new String[]{"application/json"});
                if (!function.isSupplier()) {
                    if (function.isConsumer()) {
                        throw new UnsupportedOperationException("Consumers are not currently supported");
                    }
                    if (function.isFunction() && "input".equals(channelName)) {
                        this.postProcessForStandAloneFunction(function, (MessageChannel)messageChannel);
                    }
                }
            }
        }

        private void postProcessForStandAloneFunction(BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper function, MessageChannel inputChannel) {
            Type functionType = FunctionTypeUtils.getFunctionType((Object)function, (FunctionInspector)this.functionInspector);
            if (FunctionTypeUtils.isReactive((Type)FunctionTypeUtils.getInputType((Type)functionType, (int)0))) {
                MessageChannel outputChannel = (MessageChannel)this.context.getBean("output", MessageChannel.class);
                SubscribableChannel subscribeChannel = (SubscribableChannel)inputChannel;
                Publisher publisher = this.enhancePublisher(MessageChannelReactiveUtils.toPublisher((MessageChannel)subscribeChannel));
                this.subscribeToInput((Function)function, publisher, arg_0 -> ((MessageChannel)outputChannel).send(arg_0));
            } else {
                ServiceActivatingHandler handler = new ServiceActivatingHandler((Object)new FunctionWrapper((Function)function));
                handler.setBeanFactory((BeanFactory)this.context);
                handler.afterPropertiesSet();
                handler.setOutputChannelName("output");
                SubscribableChannel subscribeChannel = (SubscribableChannel)inputChannel;
                subscribeChannel.subscribe((MessageHandler)handler);
            }
        }

        private Publisher enhancePublisher(Publisher publisher) {
            Flux flux = Flux.from((Publisher)publisher).concatMap(message -> Flux.just((Object)message).doOnError(e -> e.printStackTrace()).retryBackoff(3L, Duration.ofMillis(1000L), Duration.ofMillis(1000L)).onErrorResume(e -> {
                e.printStackTrace();
                return Mono.empty();
            }));
            return flux;
        }

        private <I, O> void subscribeToInput(Function function, Publisher<?> publisher, Consumer<Message<O>> outputProcessor) {
            Function functionInvoker = function;
            Flux inputPublisher = Flux.from(publisher);
            this.subscribeToOutput(outputProcessor, (Publisher)functionInvoker.apply(inputPublisher)).subscribe();
        }

        private <O> Mono<Void> subscribeToOutput(Consumer<Message<O>> outputProcessor, Publisher<Message<O>> outputPublisher) {
            Flux output = outputProcessor == null ? Flux.from(outputPublisher) : Flux.from(outputPublisher).doOnNext(outputProcessor);
            return output.then();
        }
    }
}

