/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.connectors;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.reactive.messaging.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.helpers.Validation;
import io.smallrye.reactive.messaging.i18n.ProviderExceptions;
import io.smallrye.reactive.messaging.i18n.ProviderLogging;
import io.smallrye.reactive.messaging.i18n.ProviderMessages;
import io.vertx.core.Handler;
import io.vertx.mutiny.core.Promise;
import io.vertx.mutiny.core.WorkerExecutor;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Priority;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.BeforeDestroyed;
import javax.enterprise.event.Observes;
import javax.enterprise.event.Reception;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.spi.AnnotatedType;
import javax.inject.Inject;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class WorkerPoolRegistry {
    private static final String WORKER_CONFIG_PREFIX = "smallrye.messaging.worker";
    private static final String WORKER_CONCURRENCY = "max-concurrency";
    @Inject
    private ExecutionHolder executionHolder;
    @Inject
    private Instance<Config> configInstance;
    private final Map<String, Integer> workerConcurrency = new HashMap<String, Integer>();
    private final Map<String, WorkerExecutor> workerExecutors = new ConcurrentHashMap<String, WorkerExecutor>();

    public void terminate(@Observes(notifyObserver=Reception.IF_EXISTS) @Priority(value=100) @BeforeDestroyed(value=ApplicationScoped.class) Object event) {
        if (!this.workerExecutors.isEmpty()) {
            for (WorkerExecutor executor : this.workerExecutors.values()) {
                executor.close();
            }
        }
    }

    public <T> Uni<T> executeWork(Handler<Promise<T>> blockingCodeHandler, String workerName, boolean ordered) {
        Objects.requireNonNull(blockingCodeHandler, ProviderMessages.msg.actionNotProvided());
        if (workerName == null) {
            return this.executionHolder.vertx().executeBlocking(blockingCodeHandler, ordered);
        }
        return this.getWorker(workerName).executeBlocking(blockingCodeHandler, ordered);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private WorkerExecutor getWorker(String workerName) {
        Objects.requireNonNull(workerName, ProviderMessages.msg.workerNameNotSpecified());
        if (this.workerExecutors.containsKey(workerName)) {
            return this.workerExecutors.get(workerName);
        }
        if (this.workerConcurrency.containsKey(workerName)) {
            WorkerExecutor executor = this.workerExecutors.get(workerName);
            if (executor == null) {
                WorkerPoolRegistry workerPoolRegistry = this;
                synchronized (workerPoolRegistry) {
                    executor = this.workerExecutors.get(workerName);
                    if (executor == null) {
                        executor = this.executionHolder.vertx().createSharedWorkerExecutor(workerName, this.workerConcurrency.get(workerName).intValue());
                        ProviderLogging.log.workerPoolCreated(workerName, this.workerConcurrency.get(workerName));
                        this.workerExecutors.put(workerName, executor);
                    }
                }
            }
            if (executor != null) {
                return executor;
            }
            throw ProviderExceptions.ex.runtimeForFailedWorker(workerName);
        }
        throw ProviderExceptions.ex.illegalArgumentForFailedWorker();
    }

    public <T> void analyzeWorker(AnnotatedType<T> annotatedType) {
        Objects.requireNonNull(annotatedType, ProviderMessages.msg.annotatedTypeWasEmpty());
        Set methods = annotatedType.getMethods();
        methods.stream().filter(m -> m.isAnnotationPresent(Blocking.class)).forEach(m -> this.defineWorker(m.getJavaMember()));
    }

    public void defineWorker(String className, String method, String poolName) {
        Objects.requireNonNull(className, ProviderMessages.msg.classNameWasEmpty());
        Objects.requireNonNull(method, ProviderMessages.msg.methodWasEmpty());
        if (!poolName.equals("<no-value>")) {
            if (Validation.isBlank(poolName)) {
                throw ProviderExceptions.ex.illegalArgumentForAnnotationNullOrBlank("@Blocking", className + "#" + method);
            }
            String workerConfigKey = "smallrye.messaging.worker." + poolName + "." + WORKER_CONCURRENCY;
            Optional concurrency = ((Config)this.configInstance.get()).getOptionalValue(workerConfigKey, Integer.class);
            if (!concurrency.isPresent()) {
                throw ProviderExceptions.ex.illegalArgumentForWorkerConfigKey("@Blocking", className + "#" + method, workerConfigKey);
            }
            this.workerConcurrency.put(poolName, (Integer)concurrency.get());
        }
    }

    private void defineWorker(Method method) {
        Objects.requireNonNull(method, ProviderMessages.msg.methodWasEmpty());
        Blocking blocking = method.getAnnotation(Blocking.class);
        String methodName = method.getName();
        String className = method.getDeclaringClass().getName();
        if (!method.isAnnotationPresent(Incoming.class) && !method.isAnnotationPresent(Outgoing.class)) {
            throw ProviderExceptions.ex.illegalBlockingSignature(className + "#" + method);
        }
        this.defineWorker(className, methodName, blocking.value());
    }
}

