/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.function.web;

import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.core.FluxWrapper;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.cloud.function.web.StringConverter;
import org.springframework.cloud.function.web.util.HeaderUtils;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.codec.Hints;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.ServerWebInputException;
import org.springframework.web.server.UnsupportedMediaTypeStatusException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class RequestProcessor {
    private static Log logger = LogFactory.getLog(RequestProcessor.class);
    private final FunctionInspector inspector;
    private final StringConverter converter;
    private final JsonMapper mapper;
    private final List<HttpMessageReader<?>> messageReaders;

    public RequestProcessor(FunctionInspector inspector, ObjectProvider<JsonMapper> mapper, StringConverter converter, ObjectProvider<ServerCodecConfigurer> codecs) {
        this.mapper = (JsonMapper)mapper.getIfAvailable();
        this.inspector = inspector;
        this.converter = converter;
        ServerCodecConfigurer source = (ServerCodecConfigurer)codecs.getIfAvailable();
        this.messageReaders = source == null ? null : source.getReaders();
    }

    public static FunctionWrapper wrapper(Function<? extends Publisher<?>, ? extends Publisher<?>> function, Consumer<? extends Publisher<?>> consumer, Supplier<? extends Publisher<?>> supplier) {
        return new FunctionWrapper(function, consumer, supplier);
    }

    public static FunctionWrapper wrapper(Function<? extends Publisher<?>, ? extends Publisher<?>> function) {
        return new FunctionWrapper(function, null, null);
    }

    public Mono<ResponseEntity<?>> get(FunctionWrapper wrapper) {
        if (wrapper.function() != null) {
            return this.response(wrapper, wrapper.function(), this.value(wrapper.function(), wrapper.argument()), true, true);
        }
        return this.response(wrapper, wrapper.supplier(), wrapper.supplier().get(), null, true);
    }

    public Mono<ResponseEntity<?>> post(FunctionWrapper wrapper, ServerWebExchange exchange) {
        return Mono.from(this.body(wrapper.handler(), exchange)).flatMap(body -> this.response(wrapper, body, false));
    }

    public Mono<ResponseEntity<?>> post(FunctionWrapper wrapper, String body, boolean stream) {
        Object function = wrapper.handler();
        Class<Collection> inputType = this.inspector.getInputType(function);
        Type itemType = this.getItemType(function);
        Object input = body;
        if (StringUtils.hasText((String)body)) {
            if (this.shouldUseJsonConversion(body, wrapper.headers.getContentType())) {
                Object jsonType;
                Object object = jsonType = body.startsWith("[") && Collection.class.isAssignableFrom(inputType) || body.startsWith("{") ? inputType : Collection.class;
                if (body.startsWith("[")) {
                    jsonType = ResolvableType.forClassWithGenerics(jsonType, (Class[])new Class[]{(Class)itemType}).getType();
                }
                input = this.mapper.toObject(body, jsonType);
            } else {
                input = this.converter.convert(function, body);
            }
        }
        return this.response(wrapper, input, stream);
    }

    private boolean shouldUseJsonConversion(String body, MediaType contentType) {
        return (body.startsWith("[") || body.startsWith("{")) && (contentType == null || contentType != null && !"text".equalsIgnoreCase(contentType.getType()));
    }

    public Mono<ResponseEntity<?>> stream(FunctionWrapper request) {
        Publisher<?> result = request.function() != null ? this.value(request.function(), request.argument()) : request.supplier().get();
        return this.stream(request, result);
    }

    private List<HttpMessageReader<?>> getMessageReaders() {
        return this.messageReaders;
    }

    private Mono<ResponseEntity<?>> response(FunctionWrapper wrapper, Object body, boolean stream) {
        Flux<?> flux;
        Function<Publisher<?>, Publisher<?>> function = wrapper.function();
        Consumer<Publisher<?>> consumer = wrapper.consumer();
        if (body != null) {
            if (Collection.class.isAssignableFrom(this.inspector.getInputType(wrapper.handler()))) {
                flux = Flux.just((Object)body);
            } else {
                Set<Object> iterable = body instanceof Collection ? (Set<Object>)body : (body instanceof Set ? Collections.singleton(body) : Collections.singletonList(body));
                flux = Flux.fromIterable(iterable);
            }
        } else if (MultiValueMap.class.isAssignableFrom(this.inspector.getInputType(wrapper.handler()))) {
            flux = Flux.just(wrapper.params());
        } else {
            throw new IllegalStateException("Failed to determine input for function call with parameters: '" + wrapper.params + "' and headers: `" + wrapper.headers + "`");
        }
        if (this.inspector.isMessage(function)) {
            flux = this.messages(wrapper, function == null ? consumer : function, flux);
        }
        Mono<ResponseEntity<?>> responseEntityMono = null;
        if (function != null) {
            Flux result = Flux.from(function.apply((Publisher<?>)flux));
            logger.debug((Object)"Handled POST with function");
            responseEntityMono = stream ? this.stream(wrapper, (Publisher<?>)result) : this.response(wrapper, function, (Publisher<?>)result, body == null ? null : Boolean.valueOf(!(body instanceof Collection)), false);
        } else if (consumer != null) {
            consumer.accept((Publisher<?>)flux);
            logger.debug((Object)"Handled POST with consumer");
            responseEntityMono = Mono.just((Object)ResponseEntity.status((HttpStatus)HttpStatus.ACCEPTED).build());
        }
        return responseEntityMono;
    }

    private Flux<?> messages(FunctionWrapper request, Object function, Flux<?> flux) {
        MessageHeaders headers = HeaderUtils.fromHttp(request.headers());
        return flux.map(arg_0 -> RequestProcessor.lambda$messages$1(function, (Map)headers, arg_0));
    }

    private void addHeaders(ResponseEntity.BodyBuilder builder, Message<?> message) {
        builder.headers(HeaderUtils.fromMessage(message.getHeaders()));
    }

    private Mono<ResponseEntity<?>> stream(FunctionWrapper request, Publisher<?> result) {
        ResponseEntity.BodyBuilder builder = ResponseEntity.ok();
        if (this.inspector.isMessage(request.handler())) {
            result = Flux.from(result).doOnNext(value -> this.addHeaders(builder, (Message)value)).map(message -> MessageUtils.unpack((Object)request.handler(), (Object)message).getPayload());
        } else {
            builder.headers(HeaderUtils.sanitize(request.headers()));
        }
        Flux output = result;
        return Flux.from((Publisher)output).then(Mono.fromSupplier(() -> RequestProcessor.lambda$stream$4(builder, (Publisher)output)));
    }

    private Mono<ResponseEntity<?>> response(FunctionWrapper request, Object handler, Publisher<?> result, Boolean single, boolean getter) {
        ResponseEntity.BodyBuilder builder = ResponseEntity.ok();
        if (this.inspector.isMessage(handler)) {
            result = Flux.from(result).map(message -> MessageUtils.unpack((Object)handler, (Object)message)).doOnNext(value -> this.addHeaders(builder, (Message<?>)value)).map(message -> message.getPayload());
        } else {
            builder.headers(HeaderUtils.sanitize(request.headers()));
        }
        if (this.isOutputSingle(handler) && (single != null && single.booleanValue() || getter || this.isInputMultiple(handler))) {
            result = Mono.from((Publisher)result);
        }
        if (result instanceof Flux) {
            result = Flux.from((Publisher)result).collectList();
        }
        return Mono.from((Publisher)result).flatMap(body -> Mono.just((Object)builder.body(body)));
    }

    private boolean isInputMultiple(Object handler) {
        if (handler instanceof FluxWrapper) {
            handler = ((FluxWrapper)handler).getTarget();
        }
        Class type = this.inspector.getInputType(handler);
        Class wrapper = this.inspector.getInputWrapper(handler);
        return Collection.class.isAssignableFrom(type) || Flux.class.equals((Object)wrapper);
    }

    private boolean isOutputSingle(Object handler) {
        if (handler instanceof FluxWrapper) {
            handler = ((FluxWrapper)handler).getTarget();
        }
        Class type = this.inspector.getOutputType(handler);
        Class wrapper = this.inspector.getOutputWrapper(handler);
        if (Stream.class.isAssignableFrom(type)) {
            return false;
        }
        return wrapper == type || Mono.class.equals((Object)wrapper) || Optional.class.equals((Object)wrapper);
    }

    private Publisher<?> body(Object handler, ServerWebExchange exchange) {
        MediaType mediaType;
        ResolvableType elementType;
        ResolvableType actualType = elementType = ResolvableType.forClass((Class)this.inspector.getInputType(handler));
        Class resolvedType = elementType.resolve();
        ReactiveAdapter adapter = resolvedType != null ? this.getAdapterRegistry().getAdapter(resolvedType) : null;
        ServerHttpRequest request = exchange.getRequest();
        ServerHttpResponse response = exchange.getResponse();
        MediaType contentType = request.getHeaders().getContentType();
        MediaType mediaType2 = mediaType = contentType != null ? contentType : MediaType.APPLICATION_OCTET_STREAM;
        if (logger.isDebugEnabled()) {
            logger.debug((Object)(exchange.getLogPrefix() + (contentType != null ? "Content-Type:" + contentType : "No Content-Type, using " + MediaType.APPLICATION_OCTET_STREAM)));
        }
        boolean isBodyRequired = adapter != null && !adapter.supportsEmpty();
        MethodParameter bodyParam = new MethodParameter(this.handlerMethod(handler), 0);
        for (HttpMessageReader<?> reader : this.getMessageReaders()) {
            if (!reader.canRead(elementType, mediaType)) continue;
            Map readHints = Hints.from((String)Hints.LOG_PREFIX_HINT, (Object)exchange.getLogPrefix());
            if (adapter != null && adapter.isMultiValue()) {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)(exchange.getLogPrefix() + "0..N [" + elementType + "]"));
                }
                Flux flux = reader.read(actualType, elementType, request, response, readHints);
                flux = flux.onErrorResume(ex -> Flux.error((Throwable)this.handleReadError(bodyParam, (Throwable)ex)));
                if (isBodyRequired) {
                    flux = flux.switchIfEmpty((Publisher)Flux.error(() -> this.handleMissingBody(bodyParam)));
                }
                return Mono.just((Object)adapter.fromPublisher((Publisher)flux));
            }
            if (logger.isDebugEnabled()) {
                logger.debug((Object)(exchange.getLogPrefix() + "0..1 [" + elementType + "]"));
            }
            Mono mono = reader.readMono(actualType, elementType, request, response, readHints);
            mono = mono.onErrorResume(ex -> Mono.error((Throwable)this.handleReadError(bodyParam, (Throwable)ex)));
            if (isBodyRequired) {
                mono = mono.switchIfEmpty(Mono.error(() -> this.handleMissingBody(bodyParam)));
            }
            return adapter != null ? Mono.just((Object)adapter.fromPublisher((Publisher)mono)) : Mono.from((Publisher)mono);
        }
        return Mono.error((Throwable)new UnsupportedMediaTypeStatusException(mediaType, Arrays.asList(MediaType.APPLICATION_JSON), elementType));
    }

    private Method handlerMethod(Object handler) {
        return ReflectionUtils.findMethod(handler.getClass(), (String)"apply", (Class[])null);
    }

    private Throwable handleReadError(MethodParameter parameter, Throwable ex) {
        return ex instanceof DecodingException ? new ServerWebInputException("Failed to read HTTP message", parameter, ex) : ex;
    }

    private ServerWebInputException handleMissingBody(MethodParameter param) {
        return new ServerWebInputException("Request body is missing: " + param.getExecutable().toGenericString());
    }

    private ReactiveAdapterRegistry getAdapterRegistry() {
        return ReactiveAdapterRegistry.getSharedInstance();
    }

    private Publisher<?> value(Function<Publisher<?>, Publisher<?>> function, Publisher<String> value) {
        Flux input = Flux.from(value).map(body -> this.converter.convert(function, (String)body));
        return Mono.from(function.apply((Publisher<?>)input));
    }

    private Object getTargetFunction(Object function) {
        Object target = this.inspector.getRegistration(function).getTarget();
        if (target instanceof FluxWrapper) {
            target = ((FluxWrapper)target).getTarget();
        }
        return target;
    }

    private Type getItemType(Object function) {
        Class inputType = this.inspector.getInputType(function);
        if (!Collection.class.isAssignableFrom(inputType)) {
            return inputType;
        }
        Type type = this.inspector.getRegistration(this.getTargetFunction(function)).getType().getType();
        if (type instanceof ParameterizedType) {
            type = ((ParameterizedType)type).getActualTypeArguments()[0];
        } else {
            for (Type iface : ((Class)type).getGenericInterfaces()) {
                if (!iface.getTypeName().startsWith("java.util.function")) continue;
                type = ((ParameterizedType)iface).getActualTypeArguments()[0];
                break;
            }
        }
        type = type instanceof ParameterizedType ? ((ParameterizedType)type).getActualTypeArguments()[0] : inputType;
        return type;
    }

    private static /* synthetic */ ResponseEntity lambda$stream$4(ResponseEntity.BodyBuilder builder, Publisher output) {
        return builder.body((Object)output);
    }

    private static /* synthetic */ Object lambda$messages$1(Object function, Map headers, Object payload) {
        return MessageUtils.create((Object)function, (Object)payload, (Map)headers);
    }

    public static class FunctionWrapper {
        private final Function<Publisher<?>, Publisher<?>> function;
        private final Consumer<Publisher<?>> consumer;
        private final Supplier<Publisher<?>> supplier;
        private final MultiValueMap<String, String> params = new LinkedMultiValueMap();
        private HttpHeaders headers = new HttpHeaders();
        private Publisher<String> argument;

        public FunctionWrapper(Function<? extends Publisher<?>, ? extends Publisher<?>> function, Consumer<? extends Publisher<?>> consumer, Supplier<? extends Publisher<?>> supplier) {
            this.function = function;
            this.consumer = consumer;
            this.supplier = supplier;
        }

        public Object handler() {
            return this.function != null ? this.function : (this.consumer != null ? this.consumer : this.supplier);
        }

        public Function<Publisher<?>, Publisher<?>> function() {
            return this.function;
        }

        public Consumer<Publisher<?>> consumer() {
            return this.consumer;
        }

        public Supplier<Publisher<?>> supplier() {
            return this.supplier;
        }

        public MultiValueMap<String, String> params() {
            return this.params;
        }

        public HttpHeaders headers() {
            return this.headers;
        }

        public FunctionWrapper headers(HttpHeaders headers) {
            this.headers = headers;
            return this;
        }

        public FunctionWrapper params(MultiValueMap<String, String> params) {
            this.params.addAll(params);
            return this;
        }

        public FunctionWrapper argument(Publisher<String> argument) {
            this.argument = argument;
            return this;
        }

        public FunctionWrapper argument(String argument) {
            this.argument = Mono.just((Object)argument);
            return this;
        }

        public Publisher<String> argument() {
            return this.argument;
        }
    }
}

