package org.springframework.cloud.stream.function;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanDefinitionCustomizer;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.stream.binder.BindingCreatedEvent;
import org.springframework.cloud.stream.binding.BindableProxyFactory;
import org.springframework.cloud.stream.config.BinderFactoryAutoConfiguration;
import org.springframework.cloud.stream.config.BindingServiceConfiguration;
import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.channel.MessageChannelReactiveUtils;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.StandardIntegrationFlow;
import org.springframework.integration.handler.ServiceActivatingHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.ObjectUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

@EnableConfigurationProperties({StreamFunctionProperties.class})
@AutoConfigureBefore({BindingServiceConfiguration.class})
@Configuration
@Import({BinderFactoryAutoConfiguration.class})
/* loaded from: input_file:org/springframework/cloud/stream/function/FunctionConfiguration.class */
public class FunctionConfiguration {

    /* loaded from: input_file:org/springframework/cloud/stream/function/FunctionConfiguration$FunctionChannelBindingInitializer.class */
    private static class FunctionChannelBindingInitializer implements InitializingBean, ApplicationContextAware {
        private static Log logger = LogFactory.getLog(FunctionChannelBindingInitializer.class);
        private final FunctionCatalog functionCatalog;
        private final FunctionInspector functionInspector;
        private final StreamFunctionProperties functionProperties;
        private final BindableProxyFactory bindableProxyFactory;
        private GenericApplicationContext context;

        FunctionChannelBindingInitializer(FunctionCatalog functionCatalog, FunctionInspector functionInspector, StreamFunctionProperties streamFunctionProperties, BindableProxyFactory bindableProxyFactory) {
            this.functionCatalog = functionCatalog;
            this.functionInspector = functionInspector;
            this.functionProperties = streamFunctionProperties;
            this.bindableProxyFactory = bindableProxyFactory;
        }

        public void afterPropertiesSet() throws Exception {
            MessageChannel messageChannel = null;
            String str = Sink.INPUT;
            if (this.context.containsBean(str) && (this.context.getBean(str) instanceof MessageChannel)) {
                messageChannel = (MessageChannel) this.context.getBean(str, MessageChannel.class);
            }
            if (messageChannel == null && this.context.containsBean(Source.OUTPUT)) {
                str = Source.OUTPUT;
                if (this.context.getBean(str) instanceof MessageChannel) {
                    messageChannel = (MessageChannel) this.context.getBean(str, SubscribableChannel.class);
                }
            }
            if (messageChannel == null || this.functionCatalog.lookup(this.functionProperties.getDefinition()) == null) {
                return;
            }
            doPostProcess(str, (SubscribableChannel) messageChannel);
        }

        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.context = (GenericApplicationContext) applicationContext;
        }

        private void doPostProcess(String str, SubscribableChannel subscribableChannel) {
            if (this.functionProperties.isComposeTo() && (subscribableChannel instanceof SubscribableChannel) && Sink.INPUT.equals(str)) {
                throw new UnsupportedOperationException("Composing at tail is not currently supported");
            }
            if (!this.functionProperties.isComposeFrom() || !Source.OUTPUT.equals(str)) {
                BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper functionInvocationWrapper = (BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper) this.functionCatalog.lookup(this.functionProperties.getDefinition(), new String[]{"application/json"});
                if (functionInvocationWrapper.isSupplier()) {
                    return;
                }
                if (functionInvocationWrapper.isConsumer()) {
                    throw new UnsupportedOperationException("Consumers are not currently supported");
                }
                if (functionInvocationWrapper.isFunction() && Sink.INPUT.equals(str)) {
                    postProcessForStandAloneFunction(functionInvocationWrapper, subscribableChannel);
                    return;
                }
                return;
            }
            Assert.notNull(this.bindableProxyFactory, "Can not compose function into the existing app since `bindableProxyFactory` is null.");
            logger.info("Composing at the head of 'output' channel");
            ServiceActivatingHandler serviceActivatingHandler = new ServiceActivatingHandler(new FunctionWrapper((BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper) this.functionCatalog.lookup(this.functionProperties.getDefinition(), new String[]{"application/json"})));
            serviceActivatingHandler.setBeanFactory(this.context);
            serviceActivatingHandler.afterPropertiesSet();
            MessageChannel directWithAttributesChannel = new DirectWithAttributesChannel();
            directWithAttributesChannel.setAttribute("type", Source.OUTPUT);
            directWithAttributesChannel.setComponentName("output.extended");
            this.context.registerBean("output.extended", MessageChannel.class, () -> {
                return directWithAttributesChannel;
            }, new BeanDefinitionCustomizer[0]);
            this.bindableProxyFactory.replaceOutputChannel(str, "output.extended", directWithAttributesChannel);
            serviceActivatingHandler.setOutputChannelName("output.extended");
            subscribableChannel.subscribe(serviceActivatingHandler);
        }

        private void postProcessForStandAloneFunction(BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper functionInvocationWrapper, MessageChannel messageChannel) {
            if (FunctionTypeUtils.isReactive(FunctionTypeUtils.getInputType(FunctionTypeUtils.getFunctionType(functionInvocationWrapper, this.functionInspector), 0))) {
                MessageChannel messageChannel2 = (MessageChannel) this.context.getBean(Source.OUTPUT, MessageChannel.class);
                Publisher enhancePublisher = enhancePublisher(MessageChannelReactiveUtils.toPublisher((SubscribableChannel) messageChannel));
                messageChannel2.getClass();
                subscribeToInput(functionInvocationWrapper, enhancePublisher, messageChannel2::send);
                return;
            }
            ServiceActivatingHandler serviceActivatingHandler = new ServiceActivatingHandler(new FunctionWrapper(functionInvocationWrapper));
            serviceActivatingHandler.setBeanFactory(this.context);
            serviceActivatingHandler.afterPropertiesSet();
            serviceActivatingHandler.setOutputChannelName(Source.OUTPUT);
            ((SubscribableChannel) messageChannel).subscribe(serviceActivatingHandler);
        }

        private Publisher enhancePublisher(Publisher publisher) {
            return Flux.from(publisher).concatMap(obj -> {
                return Flux.just(obj).doOnError(th -> {
                    th.printStackTrace();
                }).retryBackoff(3L, Duration.ofMillis(1000L), Duration.ofMillis(1000L)).onErrorResume(th2 -> {
                    th2.printStackTrace();
                    return Mono.empty();
                });
            });
        }

        private <I, O> void subscribeToInput(Function function, Publisher<?> publisher, Consumer<Message<O>> consumer) {
            subscribeToOutput(consumer, (Publisher) function.apply(Flux.from(publisher))).subscribe();
        }

        private <O> Mono<Void> subscribeToOutput(Consumer<Message<O>> consumer, Publisher<Message<O>> publisher) {
            return (consumer == null ? Flux.from(publisher) : Flux.from(publisher).doOnNext(consumer)).then();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/function/FunctionConfiguration$FunctionWrapper.class */
    public static class FunctionWrapper implements Function<Message<byte[]>, Message<byte[]>> {
        private final Function function;

        FunctionWrapper(Function function) {
            this.function = function;
        }

        @Override // java.util.function.Function
        public Message<byte[]> apply(Message<byte[]> message) {
            return (Message) this.function.apply(message);
        }
    }

    @Bean
    public InitializingBean functionChannelBindingInitializer(FunctionCatalog functionCatalog, FunctionInspector functionInspector, StreamFunctionProperties streamFunctionProperties, @Nullable BindableProxyFactory[] bindableProxyFactoryArr) {
        return new FunctionChannelBindingInitializer(functionCatalog, functionInspector, streamFunctionProperties, ObjectUtils.isEmpty(bindableProxyFactoryArr) ? null : bindableProxyFactoryArr[0]);
    }

    @Bean
    public IntegrationFlow standAloneSupplierFlow(FunctionCatalog functionCatalog, FunctionInspector functionInspector, StreamFunctionProperties streamFunctionProperties, GenericApplicationContext genericApplicationContext) {
        StandardIntegrationFlow standardIntegrationFlow = null;
        BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper functionInvocationWrapper = (BeanFactoryAwareFunctionRegistry.FunctionInvocationWrapper) functionCatalog.lookup(streamFunctionProperties.getDefinition());
        if (functionInvocationWrapper != null) {
            AtomicReference atomicReference = new AtomicReference();
            Mono create = Mono.create(monoSink -> {
                atomicReference.set(monoSink);
            });
            genericApplicationContext.addApplicationListener(applicationEvent -> {
                if (!(applicationEvent instanceof BindingCreatedEvent) || atomicReference.get() == null) {
                    return;
                }
                ((MonoSink) atomicReference.get()).success();
            });
            if (!streamFunctionProperties.isComposeFrom() && !streamFunctionProperties.isComposeTo() && functionInvocationWrapper.isSupplier()) {
                standardIntegrationFlow = integrationFlowFromProvidedSupplier(functionInvocationWrapper, functionInspector, create).channel(Source.OUTPUT).get();
            }
        }
        return standardIntegrationFlow;
    }

    private IntegrationFlowBuilder integrationFlowFromProvidedSupplier(Supplier<?> supplier, FunctionInspector functionInspector, Publisher<Object> publisher) {
        IntegrationFlowBuilder from;
        if (FunctionTypeUtils.isReactive(FunctionTypeUtils.getInputType(FunctionTypeUtils.getFunctionType(supplier, functionInspector), 0))) {
            Mono mono = (Publisher) supplier.get();
            from = IntegrationFlows.from(mono instanceof Mono ? mono.delaySubscription(publisher).map(this::wrapToMessageIfNecessary) : ((Flux) mono).delaySubscription(publisher).map(this::wrapToMessageIfNecessary));
        } else {
            from = IntegrationFlows.from(supplier);
        }
        return from;
    }

    private <T> Message<T> wrapToMessageIfNecessary(T t) {
        return t instanceof Message ? (Message) t : MessageBuilder.withPayload(t).setHeader("contentType", MimeTypeUtils.APPLICATION_JSON).build();
    }
}
