/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.command.pubsub;

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.pubsub.v1.PubsubMessage;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import java.util.Map;
import java.util.Objects;
import org.eclipse.hono.client.NoConsumerException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandHandlerWrapper;
import org.eclipse.hono.client.command.CommandHandlers;
import org.eclipse.hono.client.command.CommandResponseSender;
import org.eclipse.hono.client.command.InternalCommandConsumer;
import org.eclipse.hono.client.command.pubsub.PubSubBasedCommand;
import org.eclipse.hono.client.command.pubsub.PubSubBasedCommandContext;
import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager;
import org.eclipse.hono.client.pubsub.PubSubMessageHelper;
import org.eclipse.hono.client.pubsub.subscriber.PubSubSubscriberFactory;
import org.eclipse.hono.client.pubsub.tracing.PubSubTracingHelper;
import org.eclipse.hono.client.registry.TenantClient;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.LifecycleStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubSubBasedInternalCommandConsumer
implements InternalCommandConsumer {
    private static final Logger log = LoggerFactory.getLogger(PubSubBasedInternalCommandConsumer.class);
    private final CommandResponseSender commandResponseSender;
    private final String adapterInstanceId;
    private final CommandHandlers commandHandlers;
    private final TenantClient tenantClient;
    private final Tracer tracer;
    private final PubSubSubscriberFactory subscriberFactory;
    private final LifecycleStatus lifecycleStatus = new LifecycleStatus();
    private final PubSubBasedAdminClientManager adminClientManager;
    private final Vertx vertx;
    private final MessageReceiver receiver;

    public PubSubBasedInternalCommandConsumer(CommandResponseSender commandResponseSender, Vertx vertx, String adapterInstanceId, CommandHandlers commandHandlers, TenantClient tenantClient, Tracer tracer, PubSubSubscriberFactory subscriberFactory, PubSubBasedAdminClientManager adminClientManager, MessageReceiver receiver) {
        this.vertx = Objects.requireNonNull(vertx);
        this.commandResponseSender = Objects.requireNonNull(commandResponseSender);
        this.adapterInstanceId = Objects.requireNonNull(adapterInstanceId);
        this.commandHandlers = Objects.requireNonNull(commandHandlers);
        this.tenantClient = Objects.requireNonNull(tenantClient);
        this.tracer = Objects.requireNonNull(tracer);
        this.subscriberFactory = Objects.requireNonNull(subscriberFactory);
        this.adminClientManager = Objects.requireNonNull(adminClientManager);
        this.receiver = receiver != null ? receiver : this.createReceiver();
    }

    private MessageReceiver createReceiver() {
        return this::handleCommandMessage;
    }

    public void registerReadinessChecks(HealthCheckHandler readinessHandler) {
        log.trace("registering readiness check using Pub/Sub based internal command consumer [adapter instance id: {}]", (Object)this.adapterInstanceId);
        readinessHandler.register("internal-command-consumer[%s]-readiness".formatted(this.adapterInstanceId), status -> {
            if (this.lifecycleStatus.isStarted()) {
                status.tryComplete((Object)Status.OK());
            } else {
                JsonObject data = new JsonObject();
                if (this.lifecycleStatus.isStarting()) {
                    if (this.subscriberFactory.getSubscriber("command_internal", this.adapterInstanceId).isEmpty()) {
                        log.debug("readiness check failed, subscriber not created yet");
                        data.put("status", (Object)"subscriber not created yet");
                    } else {
                        log.debug("readiness check failed");
                    }
                }
                status.tryComplete((Object)Status.KO((JsonObject)data));
            }
        });
    }

    public void registerLivenessChecks(HealthCheckHandler livenessHandler) {
    }

    public Future<Void> start() {
        if (this.lifecycleStatus.isStarting()) {
            return Future.succeededFuture();
        }
        if (!this.lifecycleStatus.setStarting()) {
            return Future.failedFuture((Throwable)new IllegalStateException("subscriber is already started/stopping"));
        }
        return this.adminClientManager.getOrCreateTopic("command_internal", this.adapterInstanceId).onFailure(thr -> log.error("Could not create topic for endpoint {} and {}", new Object[]{"command_internal", this.adapterInstanceId, thr})).compose(t -> this.adminClientManager.getOrCreateSubscription("command_internal", this.adapterInstanceId)).onComplete(v -> this.vertx.executeBlocking(() -> {
            this.adminClientManager.closeAdminClients();
            return null;
        })).onFailure(thr -> log.error("Could not create subscription for endpoint {} and {}", new Object[]{"command_internal", this.adapterInstanceId, thr})).compose(s -> this.subscriberFactory.getOrCreateSubscriber(PubSubMessageHelper.getTopicName((String)"command_internal", (String)this.adapterInstanceId), this.receiver).subscribe(true)).onSuccess(s -> this.lifecycleStatus.setStarted()).onFailure(e -> log.warn("Error starting Internal Command Consumer for adapter {}", (Object)this.adapterInstanceId, e));
    }

    Future<Void> handleCommandMessage(PubsubMessage message, AckReplyConsumer consumer) {
        PubSubBasedCommand command;
        consumer.ack();
        try {
            command = PubSubBasedCommand.fromRoutedCommandMessage(message);
        }
        catch (IllegalArgumentException e) {
            log.warn("Command record is invalid [tenant-id: {}, device-id: {}]", new Object[]{PubSubMessageHelper.getTenantId((Map)message.getAttributesMap()).orElse(null), PubSubMessageHelper.getDeviceId((Map)message.getAttributesMap()).orElse(null), e});
            return Future.failedFuture((String)"invalid command message");
        }
        CommandHandlerWrapper commandHandler = this.commandHandlers.getCommandHandler(command.getTenant(), command.getGatewayOrDeviceId());
        if (commandHandler != null && commandHandler.getGatewayId() != null) {
            command.setGatewayId(commandHandler.getGatewayId());
        }
        SpanContext spanContext = PubSubTracingHelper.extractSpanContext((Tracer)this.tracer, (PubsubMessage)message);
        SpanContext followsFromSpanContext = commandHandler != null ? commandHandler.getConsumerCreationSpanContext() : null;
        Span currentSpan = CommandContext.createSpan((Tracer)this.tracer, (Command)command, (SpanContext)spanContext, (SpanContext)followsFromSpanContext, (String)this.getClass().getSimpleName());
        TracingHelper.TAG_ADAPTER_INSTANCE_ID.set(currentSpan, this.adapterInstanceId);
        PubSubBasedCommandContext commandContext = new PubSubBasedCommandContext(command, this.commandResponseSender, currentSpan);
        return this.tenantClient.get(command.getTenant(), null).recover(t -> {
            log.warn("error retrieving tenant configuration [{}]", (Object)command);
            ServerErrorException exception = new ServerErrorException(command.getTenant(), 503, "error retrieving tenant configuration", t);
            commandContext.release((Throwable)exception);
            return Future.failedFuture((Throwable)exception);
        }).compose(tenantConfig -> {
            commandContext.put("tenant-config", tenantConfig);
            if (commandHandler != null) {
                log.debug("using [{}] for received command [{}]", (Object)commandHandler, (Object)command);
                return commandHandler.handleCommand((CommandContext)commandContext);
            }
            log.info("no command handler found for command [{}]", (Object)command);
            NoConsumerException exception = new NoConsumerException("no command handler found for command");
            commandContext.release((Throwable)exception);
            return Future.failedFuture((Throwable)exception);
        });
    }

    public Future<Void> stop() {
        return this.lifecycleStatus.runStopAttempt(() -> this.subscriberFactory.closeSubscriber("command_internal", this.adapterInstanceId));
    }
}

