/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.commandrouter.impl;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tag;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.registry.DeviceRegistrationClient;
import org.eclipse.hono.client.registry.TenantClient;
import org.eclipse.hono.client.util.MessagingClientProvider;
import org.eclipse.hono.client.util.ServiceClient;
import org.eclipse.hono.commandrouter.CommandConsumerFactory;
import org.eclipse.hono.commandrouter.CommandRouterServiceConfigProperties;
import org.eclipse.hono.deviceconnection.infinispan.client.DeviceConnectionInfo;
import org.eclipse.hono.service.HealthCheckProvider;
import org.eclipse.hono.service.commandrouter.CommandRouterResult;
import org.eclipse.hono.service.commandrouter.CommandRouterService;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Lifecycle;
import org.eclipse.hono.util.MessagingType;
import org.eclipse.hono.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommandRouterServiceImpl
implements CommandRouterService,
HealthCheckProvider,
Lifecycle {
    private static final Logger LOG = LoggerFactory.getLogger(CommandRouterServiceImpl.class);
    private final CommandRouterServiceConfigProperties config;
    private final DeviceRegistrationClient registrationClient;
    private final TenantClient tenantClient;
    private final DeviceConnectionInfo deviceConnectionInfo;
    private final MessagingClientProvider<CommandConsumerFactory> commandConsumerFactoryProvider;
    private final Tracer tracer;
    private final Deque<Pair<String, Integer>> tenantsToEnable = new LinkedList<Pair<String, Integer>>();
    private final Set<String> reenabledTenants = new HashSet<String>();
    private final Set<String> tenantsInProcess = new HashSet<String>();
    private final AtomicBoolean running = new AtomicBoolean();
    private Context context;

    public CommandRouterServiceImpl(CommandRouterServiceConfigProperties config, DeviceRegistrationClient registrationClient, TenantClient tenantClient, DeviceConnectionInfo deviceConnectionInfo, MessagingClientProvider<CommandConsumerFactory> commandConsumerFactoryProvider, Tracer tracer) {
        this.config = Objects.requireNonNull(config);
        this.registrationClient = Objects.requireNonNull(registrationClient);
        this.tenantClient = Objects.requireNonNull(tenantClient);
        this.deviceConnectionInfo = Objects.requireNonNull(deviceConnectionInfo);
        this.commandConsumerFactoryProvider = Objects.requireNonNull(commandConsumerFactoryProvider);
        this.tracer = Objects.requireNonNull(tracer);
    }

    void setContext(Context context) {
        this.context = Objects.requireNonNull(context);
    }

    public Future<Void> start() {
        if (this.context == null) {
            this.context = Vertx.currentContext();
            if (this.context == null) {
                return Future.failedFuture((Throwable)new IllegalStateException("Service must be started in a Vert.x context"));
            }
        }
        if (!this.commandConsumerFactoryProvider.containsImplementations()) {
            return Future.failedFuture((String)"no command consumer factory provider set");
        }
        if (this.running.compareAndSet(false, true)) {
            this.registrationClient.start();
            this.tenantClient.start();
            if (this.deviceConnectionInfo instanceof Lifecycle) {
                ((Lifecycle)this.deviceConnectionInfo).start();
            }
            this.commandConsumerFactoryProvider.start();
        }
        return Future.succeededFuture();
    }

    public Future<Void> stop() {
        LOG.info("stopping command router");
        if (this.running.compareAndSet(true, false)) {
            ArrayList<Future> results = new ArrayList<Future>();
            results.add(this.registrationClient.stop());
            results.add(this.tenantClient.stop());
            if (this.deviceConnectionInfo instanceof Lifecycle) {
                results.add(((Lifecycle)this.deviceConnectionInfo).stop());
            }
            results.add(this.commandConsumerFactoryProvider.stop());
            this.tenantsToEnable.clear();
            return CompositeFuture.all(results).onFailure(t -> LOG.info("error while stopping command router", t)).map(ok -> {
                LOG.info("successfully stopped command router");
                return null;
            });
        }
        return Future.succeededFuture();
    }

    public Future<CommandRouterResult> setLastKnownGatewayForDevice(String tenantId, String deviceId, String gatewayId, Span span) {
        return this.deviceConnectionInfo.setLastKnownGatewayForDevice(tenantId, deviceId, gatewayId, span).map(ok -> CommandRouterResult.from((int)204));
    }

    public Future<CommandRouterResult> setLastKnownGatewayForDevice(String tenantId, Map<String, String> deviceIdToGatewayIdMap, Span span) {
        return this.deviceConnectionInfo.setLastKnownGatewayForDevice(tenantId, deviceIdToGatewayIdMap, span).map(ok -> CommandRouterResult.from((int)204));
    }

    public Future<CommandRouterResult> registerCommandConsumer(String tenantId, String deviceId, String adapterInstanceId, Duration lifespan, Span span) {
        return this.tenantClient.get(tenantId, span.context()).compose(tenantObject -> {
            CommandConsumerFactory primaryFactory = (CommandConsumerFactory)this.commandConsumerFactoryProvider.getClient(tenantObject);
            Future<Void> primaryConsumerFuture = primaryFactory.createCommandConsumer(tenantId, span.context());
            if (primaryFactory.getMessagingType() == MessagingType.kafka && this.commandConsumerFactoryProvider.getClient(MessagingType.amqp) != null) {
                span.log("also creating secondary, AMQP-based consumer");
                Future<Void> amqpConsumerFuture = ((CommandConsumerFactory)this.commandConsumerFactoryProvider.getClient(MessagingType.amqp)).createCommandConsumer(tenantId, span.context());
                return CompositeFuture.join(primaryConsumerFuture, amqpConsumerFuture).map(v -> null).recover(thr -> {
                    if (amqpConsumerFuture.failed()) {
                        span.log("ignoring failure to create secondary, AMQP-based command consumer");
                    }
                    return primaryConsumerFuture;
                });
            }
            return primaryConsumerFuture;
        }).compose(v -> this.deviceConnectionInfo.setCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, this.getSanitizedLifespan(lifespan), span).recover(thr -> {
            LOG.info("error setting command handling adapter instance [tenant: {}, device: {}]", new Object[]{tenantId, deviceId, thr});
            return Future.failedFuture((Throwable)thr);
        })).map(v -> CommandRouterResult.from((int)204)).otherwise(t -> CommandRouterResult.from((int)ServiceInvocationException.extractStatusCode((Throwable)t)));
    }

    private Duration getSanitizedLifespan(Duration lifespan) {
        return lifespan == null || lifespan.isNegative() || lifespan.getSeconds() > 9223372036L ? Duration.ofSeconds(-1L) : lifespan;
    }

    public Future<CommandRouterResult> unregisterCommandConsumer(String tenantId, String deviceId, String adapterInstanceId, Span span) {
        return this.deviceConnectionInfo.removeCommandHandlingAdapterInstance(tenantId, deviceId, adapterInstanceId, span).recover(thr -> {
            if (ServiceInvocationException.extractStatusCode((Throwable)thr) != 412) {
                LOG.info("error removing command handling adapter instance [tenant: {}, device: {}]", new Object[]{tenantId, deviceId, thr});
            }
            return Future.failedFuture((Throwable)thr);
        }).map(v -> CommandRouterResult.from((int)204)).otherwise(t -> CommandRouterResult.from((int)ServiceInvocationException.extractStatusCode((Throwable)t)));
    }

    public Future<CommandRouterResult> enableCommandRouting(List<String> tenantIds, Span span) {
        if (!this.running.get()) {
            return Future.succeededFuture((Object)CommandRouterResult.from((int)503));
        }
        Objects.requireNonNull(tenantIds);
        boolean isProcessingRequired = this.tenantsToEnable.isEmpty();
        tenantIds.stream().filter(s -> !this.reenabledTenants.contains(s)).filter(s -> !this.tenantsInProcess.contains(s)).filter(s -> this.tenantsToEnable.stream().allMatch(entry -> !((String)entry.one()).equals(s))).forEach(s -> this.tenantsToEnable.addLast((Pair<String, Integer>)Pair.of((Object)s, (Object)1)));
        if (isProcessingRequired) {
            LOG.debug("triggering re-enabling of command routing");
            this.processTenantQueue(span.context());
        }
        return Future.succeededFuture((Object)CommandRouterResult.from((int)204));
    }

    private void processTenantQueue(SpanContext tracingContext) {
        Pair<String, Integer> attempt = this.tenantsToEnable.pollFirst();
        if (attempt == null) {
            if (this.tenantsInProcess.isEmpty()) {
                this.reenabledTenants.clear();
                LOG.debug("finished re-enabling of command routing");
            }
        } else {
            this.tenantsInProcess.add((String)attempt.one());
            long delay = this.calculateDelayMillis((Integer)attempt.two());
            if (delay <= 0L) {
                this.context.runOnContext(go -> this.activateCommandRouting(attempt, tracingContext));
            } else {
                this.context.owner().setTimer(delay, tid -> this.activateCommandRouting(attempt, tracingContext));
            }
        }
    }

    private long calculateDelayMillis(int attemptNo) {
        if (attemptNo == 1) {
            return 0L;
        }
        if (attemptNo > 6) {
            return 10000L;
        }
        return (long)(1 << attemptNo) * 100L;
    }

    private void activateCommandRouting(Pair<String, Integer> attempt, SpanContext tracingContext) {
        if (!this.running.get()) {
            this.tenantsInProcess.remove(attempt.one());
            return;
        }
        Span span = this.tracer.buildSpan("re-enable command routing for tenant").addReference("follows_from", tracingContext).withTag((Tag)TracingHelper.TAG_TENANT_ID, (Object)((String)attempt.one())).start();
        HashMap<String, Object> logEntries = new HashMap<String, Object>(2);
        logEntries.put("attempt#", attempt.two());
        this.tenantClient.get((String)attempt.one(), span.context()).map(tenantObject -> (CommandConsumerFactory)this.commandConsumerFactoryProvider.getClient(tenantObject)).map(factory -> factory.createCommandConsumer((String)attempt.one(), span.context())).onSuccess(ok -> {
            logEntries.put("message", "successfully created command consumer");
            span.log((Map)logEntries);
            this.reenabledTenants.add((String)attempt.one());
        }).onFailure(t -> {
            logEntries.put("message", "failed to create command consumer");
            logEntries.put("error.object", t);
            TracingHelper.logError((Span)span, (Map)logEntries);
            if (t instanceof ServerErrorException) {
                LOG.info("failed to create command consumer [attempt#: {}]", attempt.two(), t);
                span.log("marking tenant for later re-try to create command consumer");
                this.tenantsToEnable.addLast((Pair<String, Integer>)Pair.of((Object)((String)attempt.one()), (Object)((Integer)attempt.two() + 1)));
            }
        }).onComplete(r -> {
            span.finish();
            this.tenantsInProcess.remove(attempt.one());
            this.processTenantQueue(tracingContext);
        });
    }

    public void registerReadinessChecks(HealthCheckHandler handler) {
        if (this.registrationClient instanceof ServiceClient) {
            ((ServiceClient)this.registrationClient).registerReadinessChecks(handler);
        }
        if (this.tenantClient instanceof ServiceClient) {
            ((ServiceClient)this.tenantClient).registerReadinessChecks(handler);
        }
        if (this.deviceConnectionInfo instanceof ServiceClient) {
            ((ServiceClient)this.deviceConnectionInfo).registerReadinessChecks(handler);
        }
        this.commandConsumerFactoryProvider.registerReadinessChecks(handler);
    }

    public void registerLivenessChecks(HealthCheckHandler handler) {
        this.registerEventLoopBlockedCheck(handler);
        if (this.registrationClient instanceof ServiceClient) {
            ((ServiceClient)this.registrationClient).registerLivenessChecks(handler);
        }
        if (this.tenantClient instanceof ServiceClient) {
            ((ServiceClient)this.tenantClient).registerLivenessChecks(handler);
        }
        if (this.deviceConnectionInfo instanceof ServiceClient) {
            ((ServiceClient)this.deviceConnectionInfo).registerLivenessChecks(handler);
        }
        this.commandConsumerFactoryProvider.registerLivenessChecks(handler);
    }

    protected void registerEventLoopBlockedCheck(HealthCheckHandler handler) {
        handler.register("event-loop-blocked-check", this.config.getEventLoopBlockedCheckTimeout(), procedure -> {
            Context currentContext = Vertx.currentContext();
            if (currentContext != this.context) {
                this.context.runOnContext(action -> procedure.tryComplete((Object)Status.OK()));
            } else {
                LOG.debug("Command router - HealthCheck Server context match. Assume protocol adapter is alive.");
                procedure.tryComplete((Object)Status.OK());
            }
        });
    }
}

