/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.messaging;

import io.helidon.common.configurable.ThreadPoolSupplier;
import io.helidon.common.reactive.Multi;
import io.helidon.config.Config;
import io.helidon.config.ConfigSources;
import io.helidon.config.ConfigValue;
import io.helidon.messaging.Channel;
import io.helidon.messaging.ConnectorConfigHelper;
import io.helidon.messaging.Emitter;
import io.helidon.messaging.Messaging;
import io.helidon.messaging.MessagingException;
import io.helidon.messaging.Stoppable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.reactivestreams.Subscriber;

class MessagingImpl
implements Messaging {
    private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
    private final int instanceNumber;
    private final Set<Emitter<?>> emitters = new HashSet();
    private final Map<String, Channel<?>> channelMap = new HashMap();
    private final Map<String, IncomingConnectorFactory> incomingConnectors = new HashMap<String, IncomingConnectorFactory>();
    private final Map<String, OutgoingConnectorFactory> outgoingConnectors = new HashMap<String, OutgoingConnectorFactory>();
    private Config config;
    private State state = State.INIT;
    private ThreadPoolSupplier threadPoolSupplier;

    MessagingImpl() {
        this.instanceNumber = INSTANCE_COUNTER.incrementAndGet();
    }

    @Override
    public Messaging start() {
        this.state.start(this);
        if (!this.emitters.isEmpty()) {
            this.threadPoolSupplier = ThreadPoolSupplier.builder().threadNamePrefix("helidon-messaging-" + this.instanceNumber + "-").build();
            this.emitters.forEach(emitter -> emitter.init(this.threadPoolSupplier.get(), Flow.defaultBufferSize()));
        }
        this.channelMap.values().forEach(this::findConnectors);
        this.channelMap.values().forEach(Channel::connect);
        return this;
    }

    @Override
    public void stop() {
        this.state.stop(this);
        Multi.concat((Flow.Publisher)Multi.create(this.incomingConnectors.values()).map(Object.class::cast), (Flow.Publisher)Multi.create(this.outgoingConnectors.values()).map(Object.class::cast)).distinct().filter(Stoppable.class::isInstance).map(Stoppable.class::cast).forEach(Stoppable::stop);
        if (!this.emitters.isEmpty()) {
            this.emitters.forEach(Emitter::complete);
            this.threadPoolSupplier.get().shutdown();
        }
    }

    void setConfig(Config config) {
        this.config = config;
    }

    Config getConfig() {
        return this.config;
    }

    void addIncomingConnector(IncomingConnectorFactory connector) {
        this.incomingConnectors.put(this.getConnectorName(connector.getClass()), connector);
    }

    void addOutgoingConnector(OutgoingConnectorFactory connector) {
        this.outgoingConnectors.put(this.getConnectorName(connector.getClass()), connector);
    }

    void addEmitter(Emitter<?> emitter) {
        this.emitters.add(emitter);
    }

    void registerChannel(Channel<?> channel) {
        Channel<?> ch = this.channelMap.get(channel.name());
        if (ch == null) {
            ch = channel;
            this.channelMap.put(ch.name(), ch);
        }
    }

    private String getConnectorName(Class<?> clazz) {
        Connector annotation = clazz.getAnnotation(Connector.class);
        if (annotation == null) {
            throw new MessagingException("Missing @Connector annotation in provided " + clazz.getSimpleName());
        }
        return annotation.value();
    }

    private void findConnectors(Channel<?> channel) {
        String connectorName;
        Config.Builder configBuilder = Config.builder().disableSystemPropertiesSource().disableEnvironmentVariablesSource();
        if (this.config != null) {
            configBuilder.addSource(ConfigSources.create((Config)this.config));
        }
        if (channel.getPublisherConfig() != null) {
            configBuilder.addSource(ConnectorConfigHelper.prefixedConfigSource("mp.messaging.incoming." + channel.name(), channel.getPublisherConfig()));
        }
        if (channel.getSubscriberConfig() != null) {
            configBuilder.addSource(ConnectorConfigHelper.prefixedConfigSource("mp.messaging.outgoing." + channel.name(), channel.getSubscriberConfig()));
        }
        Config mergedConfig = configBuilder.build();
        ConfigValue<String> incomingConnectorName = ConnectorConfigHelper.getIncomingConnectorName(mergedConfig, channel.name());
        ConfigValue<String> outgoingConnectorName = ConnectorConfigHelper.getOutgoingConnectorName(mergedConfig, channel.name());
        if (incomingConnectorName.isPresent()) {
            connectorName = (String)incomingConnectorName.get();
            org.eclipse.microprofile.config.Config incomingConnectorConfig = ConnectorConfigHelper.getConnectorConfig(channel.name(), connectorName, mergedConfig);
            channel.setPublisher(Optional.ofNullable(this.incomingConnectors.get(connectorName)).orElseThrow(() -> new MessagingException("Unknown incoming connector " + connectorName)).getPublisherBuilder(incomingConnectorConfig).buildRs());
        }
        if (outgoingConnectorName.isPresent()) {
            connectorName = (String)outgoingConnectorName.get();
            org.eclipse.microprofile.config.Config outgoingConnectorConfig = ConnectorConfigHelper.getConnectorConfig(channel.name(), connectorName, mergedConfig);
            channel.setSubscriber((Subscriber<Message<?>>)Optional.ofNullable(this.outgoingConnectors.get(connectorName)).orElseThrow(() -> new MessagingException("Unknown outgoing connector " + connectorName)).getSubscriberBuilder(outgoingConnectorConfig).build());
        }
    }

    static enum State {
        INIT{

            @Override
            void start(MessagingImpl m) {
                m.state = STARTED;
            }

            @Override
            void stop(MessagingImpl m) {
                throw new MessagingException("Messaging is not started yet!");
            }
        }
        ,
        STARTED{

            @Override
            void start(MessagingImpl m) {
                throw new MessagingException("Messaging has been started already!");
            }

            @Override
            void stop(MessagingImpl m) {
                m.state = STOPPED;
            }
        }
        ,
        STOPPED{

            @Override
            void start(MessagingImpl m) {
                throw new MessagingException("Messaging has been stopped already!");
            }

            @Override
            void stop(MessagingImpl m) {
                this.start(m);
            }
        };


        abstract void start(MessagingImpl var1);

        abstract void stop(MessagingImpl var1);
    }
}

