/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.commandrouter.impl;

import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.tag.Tag;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.eclipse.hono.adapter.client.registry.DeviceRegistrationClient;
import org.eclipse.hono.adapter.client.registry.TenantClient;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.util.MessagingClient;
import org.eclipse.hono.client.util.ServiceClient;
import org.eclipse.hono.commandrouter.CommandConsumerFactory;
import org.eclipse.hono.commandrouter.CommandRouterServiceConfigProperties;
import org.eclipse.hono.deviceconnection.infinispan.client.DeviceConnectionInfo;
import org.eclipse.hono.service.HealthCheckProvider;
import org.eclipse.hono.service.commandrouter.CommandRouterResult;
import org.eclipse.hono.service.commandrouter.CommandRouterService;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Lifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommandRouterServiceImpl
implements CommandRouterService,
HealthCheckProvider,
Lifecycle {
    private static final Logger LOG = LoggerFactory.getLogger(CommandRouterServiceImpl.class);
    private final CommandRouterServiceConfigProperties config;
    private final DeviceRegistrationClient registrationClient;
    private final TenantClient tenantClient;
    private final DeviceConnectionInfo deviceConnectionInfo;
    private final MessagingClient<CommandConsumerFactory> commandConsumerFactories;
    private final Tracer tracer;
    private final Deque<String> tenantsToEnable = new LinkedList<String>();
    private Context context;

    public CommandRouterServiceImpl(CommandRouterServiceConfigProperties config, DeviceRegistrationClient registrationClient, TenantClient tenantClient, DeviceConnectionInfo deviceConnectionInfo, MessagingClient<CommandConsumerFactory> commandConsumerFactories, Tracer tracer) {
        this.config = Objects.requireNonNull(config);
        this.registrationClient = Objects.requireNonNull(registrationClient);
        this.tenantClient = Objects.requireNonNull(tenantClient);
        this.deviceConnectionInfo = Objects.requireNonNull(deviceConnectionInfo);
        this.commandConsumerFactories = Objects.requireNonNull(commandConsumerFactories);
        this.tracer = Objects.requireNonNull(tracer);
    }

    void setContext(Context context) {
        this.context = Objects.requireNonNull(context);
    }

    public Future<Void> start() {
        if (this.context == null) {
            this.context = Vertx.currentContext();
            if (this.context == null) {
                return Future.failedFuture((Throwable)new IllegalStateException("Service must be started in a Vert.x context"));
            }
        }
        if (!this.commandConsumerFactories.containsImplementations()) {
            return Future.failedFuture((String)"no command consumer factories set");
        }
        this.registrationClient.start();
        this.tenantClient.start();
        if (this.deviceConnectionInfo instanceof Lifecycle) {
            ((Lifecycle)this.deviceConnectionInfo).start();
        }
        this.commandConsumerFactories.start();
        return Future.succeededFuture();
    }

    public Future<Void> stop() {
        LOG.info("stopping command router");
        ArrayList<Future> results = new ArrayList<Future>();
        results.add(this.registrationClient.stop());
        results.add(this.tenantClient.stop());
        if (this.deviceConnectionInfo instanceof Lifecycle) {
            results.add(((Lifecycle)this.deviceConnectionInfo).stop());
        }
        results.add(this.commandConsumerFactories.stop());
        return CompositeFuture.all(results).recover(t -> {
            LOG.info("error while stopping command router", t);
            return Future.failedFuture((Throwable)t);
        }).map(ok -> {
            LOG.info("successfully stopped command router");
            return null;
        });
    }

    public Future<CommandRouterResult> setLastKnownGatewayForDevice(String tenantId, String deviceId, String gatewayId, Span span) {
        return this.deviceConnectionInfo.setLastKnownGatewayForDevice(tenantId, deviceId, gatewayId, span).map(ok -> CommandRouterResult.from((int)204));
    }

    public Future<CommandRouterResult> registerCommandConsumer(String tenantId, String deviceId, String adapterInstanceId, Duration lifespan, Span span) {
        return this.tenantClient.get(tenantId, span.context()).compose(tenantObject -> ((CommandConsumerFactory)this.commandConsumerFactories.getClient(tenantObject)).createCommandConsumer(tenantId, span.context())).compose(v -> this.deviceConnectionInfo.setCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, this.getSanitizedLifespan(lifespan), span).recover(thr -> {
            LOG.info("error setting command handling adapter instance [tenant: {}, device: {}]", new Object[]{tenantId, deviceId, thr});
            return Future.failedFuture((Throwable)thr);
        })).map(v -> CommandRouterResult.from((int)204)).otherwise(t -> CommandRouterResult.from((int)ServiceInvocationException.extractStatusCode((Throwable)t)));
    }

    private Duration getSanitizedLifespan(Duration lifespan) {
        return lifespan == null || lifespan.isNegative() || lifespan.getSeconds() > 9223372036L ? Duration.ofSeconds(-1L) : lifespan;
    }

    public Future<CommandRouterResult> unregisterCommandConsumer(String tenantId, String deviceId, String adapterInstanceId, Span span) {
        return this.deviceConnectionInfo.removeCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, span).recover(thr -> {
            if (ServiceInvocationException.extractStatusCode((Throwable)thr) != 412) {
                LOG.info("error removing command handling adapter instance [tenant: {}, device: {}]", new Object[]{tenantId, deviceId, thr});
            }
            return Future.failedFuture((Throwable)thr);
        }).map(v -> CommandRouterResult.from((int)204)).otherwise(t -> CommandRouterResult.from((int)ServiceInvocationException.extractStatusCode((Throwable)t)));
    }

    public Future<CommandRouterResult> enableCommandRouting(List<String> tenantIds, Span span) {
        Objects.requireNonNull(tenantIds);
        boolean isProcessingRequired = this.tenantsToEnable.isEmpty();
        this.tenantsToEnable.addAll(tenantIds);
        if (isProcessingRequired) {
            Span processingSpan = this.tracer.buildSpan("re-enable command routing for tenants").addReference("follows_from", span.context()).start();
            this.processTenantQueue((Set<String>)new ConcurrentHashSet(), processingSpan);
        }
        return Future.succeededFuture((Object)CommandRouterResult.from((int)204));
    }

    private void processTenantQueue(Set<String> processedTenants, Span parentSpan) {
        String tenantId = this.tenantsToEnable.poll();
        if (tenantId == null) {
            parentSpan.finish();
            return;
        }
        this.context.runOnContext(go -> {
            if (processedTenants.contains(tenantId)) {
                parentSpan.log(Map.of("message", "skipping tenant, already processed ...", TracingHelper.TAG_TENANT_ID.getKey(), tenantId));
            } else {
                Span span = this.tracer.buildSpan("re-enable command routing for tenant").addReference("child_of", parentSpan.context()).withTag((Tag)TracingHelper.TAG_TENANT_ID, (Object)tenantId).start();
                this.tenantClient.get(tenantId, span.context()).map(tenantObject -> (CommandConsumerFactory)this.commandConsumerFactories.getClient(tenantObject)).map(factory -> factory.createCommandConsumer(tenantId, span.context())).onSuccess(ok -> {
                    span.log("successfully created command consumer");
                    processedTenants.add(tenantId);
                }).onFailure(t -> {
                    TracingHelper.logError((Span)span, (String)"failed to create command consumer", (Throwable)t);
                    if (t instanceof ServerErrorException) {
                        span.log("marking tenant for later re-try to create command consumer");
                        this.tenantsToEnable.add(tenantId);
                    }
                }).onComplete(r -> span.finish());
            }
            this.processTenantQueue(processedTenants, parentSpan);
        });
    }

    public void registerReadinessChecks(HealthCheckHandler handler) {
        if (this.registrationClient instanceof ServiceClient) {
            ((ServiceClient)this.registrationClient).registerReadinessChecks(handler);
        }
        if (this.tenantClient instanceof ServiceClient) {
            ((ServiceClient)this.tenantClient).registerReadinessChecks(handler);
        }
        if (this.deviceConnectionInfo instanceof ServiceClient) {
            ((ServiceClient)this.deviceConnectionInfo).registerReadinessChecks(handler);
        }
        this.commandConsumerFactories.registerReadinessChecks(handler);
    }

    public void registerLivenessChecks(HealthCheckHandler handler) {
        this.registerEventLoopBlockedCheck(handler);
        if (this.registrationClient instanceof ServiceClient) {
            ((ServiceClient)this.registrationClient).registerLivenessChecks(handler);
        }
        if (this.tenantClient instanceof ServiceClient) {
            ((ServiceClient)this.tenantClient).registerLivenessChecks(handler);
        }
        if (this.deviceConnectionInfo instanceof ServiceClient) {
            ((ServiceClient)this.deviceConnectionInfo).registerLivenessChecks(handler);
        }
        this.commandConsumerFactories.registerLivenessChecks(handler);
    }

    protected void registerEventLoopBlockedCheck(HealthCheckHandler handler) {
        handler.register("event-loop-blocked-check", this.config.getEventLoopBlockedCheckTimeout(), procedure -> {
            Context currentContext = Vertx.currentContext();
            if (currentContext != this.context) {
                this.context.runOnContext(action -> procedure.tryComplete((Object)Status.OK()));
            } else {
                LOG.debug("Command router - HealthCheck Server context match. Assume protocol adapter is alive.");
                procedure.tryComplete((Object)Status.OK());
            }
        });
    }
}

