/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.function.web.source;

import java.net.URI;
import java.time.Duration;
import java.util.Collections;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.web.source.DestinationResolver;
import org.springframework.cloud.function.web.source.ExporterProperties;
import org.springframework.cloud.function.web.source.RequestBuilder;
import org.springframework.context.SmartLifecycle;
import org.springframework.http.HttpHeaders;
import org.springframework.messaging.Message;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

public class SupplierExporter
implements SmartLifecycle {
    private static Log logger = LogFactory.getLog(SupplierExporter.class);
    private final FunctionCatalog catalog;
    private final WebClient client;
    private final DestinationResolver destinationResolver;
    private final RequestBuilder requestBuilder;
    private final String supplier;
    private final String contentType;
    private volatile boolean running;
    private volatile boolean ok = true;
    private boolean autoStartup = true;
    private boolean debug = true;
    private volatile Disposable subscription;

    SupplierExporter(RequestBuilder requestBuilder, DestinationResolver destinationResolver, FunctionCatalog catalog, WebClient client, ExporterProperties exporterProperties) {
        this.requestBuilder = requestBuilder;
        this.destinationResolver = destinationResolver;
        this.catalog = catalog;
        this.client = client;
        this.debug = exporterProperties.isDebug();
        this.autoStartup = exporterProperties.isAutoStartup();
        this.supplier = exporterProperties.getSink().getName();
        this.contentType = exporterProperties.getSink().getContentType();
    }

    public void start() {
        if (this.running) {
            return;
        }
        logger.info((Object)"Starting");
        Flux streams = Flux.empty();
        Set names = this.supplier == null ? this.catalog.getNames(Supplier.class) : Collections.singleton(this.supplier);
        boolean suppliersPresent = false;
        for (String name : names) {
            Supplier supplier = (Supplier)this.catalog.lookup(name, new String[]{this.contentType});
            if (supplier == null) {
                logger.warn((Object)("No such Supplier: " + name));
                continue;
            }
            streams = streams.mergeWith(this.forward(supplier, name));
            suppliersPresent = true;
        }
        if (suppliersPresent) {
            this.subscription = streams.retryWhen((Retry)Retry.backoff((long)5L, (Duration)Duration.ofSeconds(1L))).doOnComplete(() -> this.stop()).subscribe();
            this.ok = true;
            this.running = true;
        }
    }

    public boolean isOk() {
        return this.ok;
    }

    public void stop() {
        logger.info((Object)"Stopping");
        this.running = false;
        this.subscription.dispose();
    }

    public boolean isRunning() {
        return this.running;
    }

    public int getPhase() {
        return 0;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void stop(Runnable callback) {
        this.stop();
        callback.run();
    }

    private Flux<ClientResponse> forward(Supplier<Publisher<Object>> supplier, String name) {
        Flux o = (Flux)supplier.get();
        return Flux.from((Publisher)o).flatMap(value -> {
            String destination = this.destinationResolver.destination(supplier, name, value);
            if (this.debug) {
                logger.info((Object)("Posting to: " + destination));
            }
            return this.post(this.uri(destination), destination, value);
        });
    }

    private Mono<ClientResponse> post(URI uri, String destination, Object value) {
        Object body = value;
        if (value instanceof Message) {
            Message message = (Message)value;
            body = message.getPayload();
        }
        if (this.debug) {
            logger.debug((Object)("Sending BODY as type: " + body.getClass().getName()));
        }
        Mono result = ((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)this.client.post().uri(uri)).headers(headers -> this.headers((HttpHeaders)headers, destination, value))).bodyValue(body).exchangeToMono(response -> {
            if (this.debug) {
                logger.debug((Object)("Response STATUS: " + response.statusCode()));
            }
            return Mono.just((Object)response);
        });
        if (this.debug) {
            result = result.log();
        }
        return result;
    }

    private void headers(HttpHeaders headers, String destination, Object value) {
        headers.putAll(this.requestBuilder.headers(destination, value));
    }

    private URI uri(String destination) {
        return this.requestBuilder.uri(destination);
    }
}

