/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.commandrouter.impl.pubsub;

import com.google.cloud.pubsub.v1.MessageReceiver;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.hono.client.command.pubsub.PubSubBasedCommandResponseSender;
import org.eclipse.hono.client.command.pubsub.PubSubBasedInternalCommandSender;
import org.eclipse.hono.client.pubsub.PubSubMessageHelper;
import org.eclipse.hono.client.pubsub.publisher.PubSubPublisherFactory;
import org.eclipse.hono.client.pubsub.subscriber.PubSubSubscriberFactory;
import org.eclipse.hono.client.registry.TenantClient;
import org.eclipse.hono.client.util.ServiceClient;
import org.eclipse.hono.commandrouter.CommandConsumerFactory;
import org.eclipse.hono.commandrouter.CommandRouterMetrics;
import org.eclipse.hono.commandrouter.CommandTargetMapper;
import org.eclipse.hono.commandrouter.impl.pubsub.PubSubBasedCommandProcessingQueue;
import org.eclipse.hono.commandrouter.impl.pubsub.PubSubBasedMappingAndDelegatingCommandHandler;
import org.eclipse.hono.notification.NotificationEventBusSupport;
import org.eclipse.hono.notification.NotificationType;
import org.eclipse.hono.notification.deviceregistry.LifecycleChange;
import org.eclipse.hono.notification.deviceregistry.TenantChangeNotification;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.LifecycleStatus;
import org.eclipse.hono.util.MessagingType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubSubBasedCommandConsumerFactoryImpl
implements CommandConsumerFactory,
ServiceClient {
    private static final Logger LOG = LoggerFactory.getLogger(PubSubBasedCommandConsumerFactoryImpl.class);
    private final Vertx vertx;
    private final TenantClient tenantClient;
    private final Tracer tracer;
    private final PubSubBasedInternalCommandSender internalCommandSender;
    private final PubSubBasedCommandResponseSender responseSender;
    private final LifecycleStatus lifecycleStatus = new LifecycleStatus();
    private final CommandTargetMapper commandTargetMapper;
    private final CommandRouterMetrics metrics;
    private final PubSubSubscriberFactory subscriberFactory;
    private final Set<String> tenantIds = new HashSet<String>();
    private final Map<String, MessageReceiver> activeReceivers = new ConcurrentHashMap<String, MessageReceiver>();
    private PubSubBasedMappingAndDelegatingCommandHandler commandHandler;

    public PubSubBasedCommandConsumerFactoryImpl(Vertx vertx, TenantClient tenantClient, Tracer tracer, PubSubPublisherFactory pubSubPublisherFactory, String projectId, CommandTargetMapper commandTargetMapper, CommandRouterMetrics metrics, PubSubSubscriberFactory subscriberFactory) {
        this.vertx = Objects.requireNonNull(vertx);
        this.tenantClient = Objects.requireNonNull(tenantClient);
        this.tracer = Objects.requireNonNull(tracer);
        this.commandTargetMapper = Objects.requireNonNull(commandTargetMapper);
        this.metrics = Objects.requireNonNull(metrics);
        this.subscriberFactory = Objects.requireNonNull(subscriberFactory);
        Objects.requireNonNull(pubSubPublisherFactory);
        Objects.requireNonNull(projectId);
        this.internalCommandSender = new PubSubBasedInternalCommandSender(pubSubPublisherFactory, projectId, tracer);
        this.responseSender = new PubSubBasedCommandResponseSender(vertx, pubSubPublisherFactory, projectId, tracer);
    }

    public Future<Void> start() {
        if (this.lifecycleStatus.isStarting()) {
            return Future.succeededFuture();
        }
        if (!this.lifecycleStatus.setStarting()) {
            return Future.failedFuture((Throwable)new IllegalStateException("factory is already started/stopping"));
        }
        PubSubBasedCommandProcessingQueue commandQueue = new PubSubBasedCommandProcessingQueue(this.vertx);
        this.commandHandler = new PubSubBasedMappingAndDelegatingCommandHandler(this.vertx, this.tenantClient, commandQueue, this.commandTargetMapper, this.internalCommandSender, this.metrics, this.tracer, this.responseSender);
        this.registerTenantCreationListener();
        return this.commandHandler.start().onSuccess(s -> this.lifecycleStatus.setStarted());
    }

    public Future<Void> stop() {
        return this.lifecycleStatus.runStopAttempt(() -> Future.join((Future)this.subscriberFactory.closeAllSubscribers(), this.commandHandler.stop()).mapEmpty());
    }

    public MessagingType getMessagingType() {
        return MessagingType.pubsub;
    }

    @Override
    public Future<Void> createCommandConsumer(String tenantId, SpanContext context) {
        if (this.tenantIds.contains(tenantId)) {
            LOG.debug("Command consumer for tenant {} is already registered", (Object)tenantId);
            return Future.succeededFuture();
        }
        this.tenantIds.add(tenantId);
        Span span = TracingHelper.buildServerChildSpan((Tracer)this.tracer, (SpanContext)context, (String)"create command consumer", (String)CommandConsumerFactory.class.getSimpleName()).start();
        TracingHelper.TAG_TENANT_ID.set(span, tenantId);
        Tags.MESSAGE_BUS_DESTINATION.set(span, "command");
        MessageReceiver messageReceiver = this.getOrCreateReceiver(tenantId);
        String subscriptionId = PubSubMessageHelper.getTopicName((String)"command", (String)tenantId);
        return this.subscriberFactory.getOrCreateSubscriber(subscriptionId, messageReceiver).subscribe(false);
    }

    public void registerReadinessChecks(HealthCheckHandler readinessHandler) {
        this.internalCommandSender.registerReadinessChecks(readinessHandler);
        this.responseSender.registerReadinessChecks(readinessHandler);
        readinessHandler.register("command-consumer-factory-pubsub-subscriber-%s".formatted(UUID.randomUUID()), status -> status.tryComplete((Object)Status.OK()));
    }

    public void registerLivenessChecks(HealthCheckHandler livenessHandler) {
        this.internalCommandSender.registerLivenessChecks(livenessHandler);
        this.responseSender.registerLivenessChecks(livenessHandler);
    }

    private void registerTenantCreationListener() {
        NotificationEventBusSupport.registerConsumer((Vertx)this.vertx, (NotificationType)TenantChangeNotification.TYPE, notification -> {
            if (this.lifecycleStatus.isStarted() && notification.getChange() == LifecycleChange.CREATE && notification.isTenantEnabled()) {
                String tenantId = notification.getTenantId();
                MessageReceiver messageReceiver = this.getOrCreateReceiver(tenantId);
                String subscriptionId = PubSubMessageHelper.getTopicName((String)"command", (String)tenantId);
                this.subscriberFactory.getOrCreateSubscriber(subscriptionId, messageReceiver);
            }
        });
    }

    private MessageReceiver getOrCreateReceiver(String tenantId) {
        return this.activeReceivers.computeIfAbsent(tenantId, r -> this.createMessageReceiver(tenantId));
    }

    private MessageReceiver createMessageReceiver(String tenantId) {
        return (message, consumer) -> {
            this.commandHandler.mapAndDelegateIncomingCommandMessage(message, tenantId);
            consumer.ack();
        };
    }
}

