/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.function;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.function.context.FunctionType;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.cloud.stream.function.FluxedConsumerWrapper;
import org.springframework.cloud.stream.function.FunctionCatalogWrapper;
import org.springframework.cloud.stream.function.StreamFunctionProperties;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.ReflectionUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class FunctionInvoker<I, O>
implements Function<Flux<Message<I>>, Flux<Message<O>>> {
    private static final Log logger = LogFactory.getLog(FunctionInvoker.class);
    private static final Field MESSAGE_HEADERS_FIELD = ReflectionUtils.findField(MessageHeaders.class, (String)"headers");
    private final Class<?> inputClass;
    private final Class<?> outputClass;
    private final Function<Flux<?>, Flux<?>> userFunction;
    private final CompositeMessageConverter messageConverter;
    private final MessageChannel errorChannel;
    private final boolean isInputArgumentMessage;
    private final ConsumerProperties consumerProperties;
    private final ProducerProperties producerProperties;
    private final BindingServiceProperties bindingServiceProperties;
    private final StreamFunctionProperties functionProperties;

    FunctionInvoker(StreamFunctionProperties functionProperties, FunctionCatalogWrapper functionCatalog, FunctionInspector functionInspector, CompositeMessageConverterFactory compositeMessageConverterFactory) {
        this(functionProperties, functionCatalog, functionInspector, compositeMessageConverterFactory, null);
    }

    FunctionInvoker(StreamFunctionProperties functionProperties, FunctionCatalogWrapper functionCatalog, FunctionInspector functionInspector, CompositeMessageConverterFactory compositeMessageConverterFactory, MessageChannel errorChannel) {
        this.functionProperties = functionProperties;
        Object originalUserFunction = functionCatalog.lookup(functionProperties.getDefinition());
        this.userFunction = originalUserFunction instanceof Consumer ? new FluxedConsumerWrapper((Consumer)originalUserFunction) : (Function)originalUserFunction;
        Assert.isInstanceOf(Function.class, this.userFunction);
        this.messageConverter = compositeMessageConverterFactory.getMessageConverterForAllRegistered();
        FunctionType functionType = functionInspector.getRegistration(originalUserFunction).getType();
        this.isInputArgumentMessage = functionType.isMessage();
        this.inputClass = functionType.getInputType();
        this.outputClass = functionType.getOutputType();
        this.errorChannel = errorChannel;
        this.bindingServiceProperties = functionProperties.getBindingServiceProperties();
        this.consumerProperties = this.bindingServiceProperties.getConsumerProperties(functionProperties.getInputDestinationName());
        this.producerProperties = this.bindingServiceProperties.getProducerProperties(functionProperties.getOutputDestinationName());
    }

    @Override
    public Flux<Message<O>> apply(Flux<Message<I>> input) {
        AtomicReference originalMessageRef = new AtomicReference();
        return input.concatMap(message -> Flux.just((Object)message).doOnNext(originalMessageRef::set).map(this::resolveArgument).transform(this.userFunction::apply).retryBackoff((long)this.consumerProperties.getMaxAttempts(), Duration.ofMillis(this.consumerProperties.getBackOffInitialInterval()), Duration.ofMillis(this.consumerProperties.getBackOffMaxInterval())).onErrorResume(e -> {
            this.onError((Throwable)e, (Message<I>)((Message)originalMessageRef.get()));
            return Mono.empty();
        })).log().map(resultMessage -> this.toMessage(resultMessage, (Message)originalMessageRef.get()));
    }

    private void onError(Throwable t, Message<I> originalMessage) {
        if (this.errorChannel != null) {
            ErrorMessage em = new ErrorMessage(t, originalMessage);
            logger.error((Object)em);
            this.errorChannel.send((Message)em);
        } else {
            logger.error((Object)t);
        }
    }

    private <T> Message<O> toMessage(T value, Message<I> originalMessage) {
        Message<O> returnMessage;
        if (logger.isDebugEnabled()) {
            logger.debug((Object)("Converting result back to message using the original message: " + originalMessage));
        }
        if (this.producerProperties.isUseNativeEncoding()) {
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Native encoding enabled wrapping result to message using the original message: " + originalMessage));
            }
            returnMessage = this.wrapOutputToMessage(value, originalMessage);
        } else {
            returnMessage = (Message<O>)(value instanceof Message ? value : this.messageConverter.toMessage(value, originalMessage.getHeaders(), this.outputClass));
            if (returnMessage == null && value.getClass().isAssignableFrom(this.outputClass)) {
                returnMessage = this.wrapOutputToMessage(value, originalMessage);
            } else if (this.bindingServiceProperties != null && this.bindingServiceProperties.getBindingProperties(this.functionProperties.getOutputDestinationName()) != null && !returnMessage.getHeaders().containsKey((Object)"contentType")) {
                ((Map)ReflectionUtils.getField((Field)MESSAGE_HEADERS_FIELD, (Object)returnMessage.getHeaders())).put("contentType", MimeType.valueOf((String)this.bindingServiceProperties.getBindingProperties("output").getContentType()));
            }
            Assert.notNull(returnMessage, (String)("Failed to convert result value '" + value + "' to message."));
        }
        return returnMessage;
    }

    private <T> Message<O> wrapOutputToMessage(T value, Message<I> originalMessage) {
        Message returnMessage = MessageBuilder.withPayload(value).copyHeaders((Map)originalMessage.getHeaders()).removeHeader("contentType").build();
        return returnMessage;
    }

    private <T> T resolveArgument(Message<I> message) {
        if (logger.isDebugEnabled()) {
            logger.debug((Object)("Resolving input argument from message: " + message));
        }
        Object argument = this.shouldConvertFromMessage(message) ? this.messageConverter.fromMessage(message, this.inputClass) : message;
        Assert.notNull(argument, (String)("Failed to resolve argument type '" + this.inputClass + "' from message: " + message));
        if (!this.isInputArgumentMessage && argument instanceof Message) {
            argument = argument.getPayload();
        }
        return (T)argument;
    }

    private boolean shouldConvertFromMessage(Message<?> message) {
        return !this.inputClass.isAssignableFrom(Message.class) && !this.inputClass.isAssignableFrom(message.getPayload().getClass()) && !this.inputClass.isAssignableFrom(Object.class);
    }

    static {
        MESSAGE_HEADERS_FIELD.setAccessible(true);
    }
}

