package org.springframework.cloud.function.stream.config;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/springframework/cloud/function/stream/config/SupplierInvokingMessageProducer.class */
public class SupplierInvokingMessageProducer<T> extends MessageProducerSupport {
    private final FunctionCatalog functionCatalog;
    private final Set<String> suppliers = new HashSet();
    private final Map<String, Disposable> disposables = new HashMap();
    private String defaultRoute;

    public SupplierInvokingMessageProducer(FunctionCatalog functionCatalog, String str) {
        this.functionCatalog = functionCatalog;
        this.defaultRoute = str;
        setOutputChannelName("output");
    }

    protected void doStart() {
        if (StringUtils.hasText(this.defaultRoute)) {
            start(this.defaultRoute);
            return;
        }
        Iterator it = this.functionCatalog.getNames(Supplier.class).iterator();
        while (it.hasNext()) {
            start((String) it.next());
        }
    }

    protected void doStop() {
        Iterator it = new HashSet(this.suppliers).iterator();
        while (it.hasNext()) {
            stop((String) it.next());
        }
    }

    public void stop(String str) {
        if (this.disposables.containsKey(str)) {
            synchronized (this.disposables) {
                if (this.disposables.containsKey(str)) {
                    try {
                        this.disposables.get(str).dispose();
                        this.disposables.remove(str);
                        this.suppliers.remove(str);
                    } catch (Throwable th) {
                        this.disposables.remove(str);
                        this.suppliers.remove(str);
                        throw th;
                    }
                }
            }
        }
    }

    public void start(String str) {
        Supplier supplier;
        if (this.disposables.containsKey(str)) {
            return;
        }
        synchronized (this.disposables) {
            if (!this.disposables.containsKey(str) && (supplier = (Supplier) this.functionCatalog.lookup(Supplier.class, str)) != null) {
                this.suppliers.add(str);
                this.disposables.put(str, Flux.from((Publisher) supplier.get()).subscribeOn(Schedulers.elastic()).subscribe(obj -> {
                    send(str, obj);
                }));
            }
        }
    }

    private void send(String str, Object obj) {
        getOutputChannel().send(MessageBuilder.fromMessage(MessageUtils.unpack((Supplier) this.functionCatalog.lookup(Supplier.class, str), obj)).setHeaderIfAbsent(StreamConfigurationProperties.ROUTE_KEY, str).build());
    }
}
