/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.command;

import io.opentracing.SpanContext;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.amqp.connection.ConnectionLifecycle;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandHandlerWrapper;
import org.eclipse.hono.client.command.CommandHandlers;
import org.eclipse.hono.client.command.CommandRouterClient;
import org.eclipse.hono.client.command.CommandRoutingUtil;
import org.eclipse.hono.client.command.InternalCommandConsumer;
import org.eclipse.hono.client.command.KubernetesContainerInfoProvider;
import org.eclipse.hono.client.command.ProtocolAdapterCommandConsumer;
import org.eclipse.hono.client.command.ProtocolAdapterCommandConsumerFactory;
import org.eclipse.hono.client.util.ServiceClient;
import org.eclipse.hono.util.Lifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProtocolAdapterCommandConsumerFactoryImpl
implements ProtocolAdapterCommandConsumerFactory,
ServiceClient {
    private static final Logger LOG = LoggerFactory.getLogger(ProtocolAdapterCommandConsumerFactoryImpl.class);
    private static final AtomicInteger ADAPTER_INSTANCE_ID_COUNTER = new AtomicInteger();
    private final Vertx vertx;
    private final int adapterInstanceIdCounterValue;
    private final String adapterName;
    private final CommandHandlers commandHandlers = new CommandHandlers();
    private final CommandRouterClient commandRouterClient;
    private final List<InternalCommandConsumer> internalCommandConsumers = new ArrayList<InternalCommandConsumer>();
    private final AtomicBoolean stopCalled = new AtomicBoolean();
    private int maxTenantIdsPerRequest = 100;
    private KubernetesContainerInfoProvider kubernetesContainerInfoProvider = KubernetesContainerInfoProvider.getInstance();
    private final List<BiFunction<String, CommandHandlers, InternalCommandConsumer>> internalCommandConsumerSuppliers = new ArrayList<BiFunction<String, CommandHandlers, InternalCommandConsumer>>();
    private HealthCheckHandler readinessHandler;
    private String adapterInstanceId;
    private String startFailureMessage;

    public ProtocolAdapterCommandConsumerFactoryImpl(Vertx vertx, CommandRouterClient commandRouterClient, String adapterName) {
        this.vertx = Objects.requireNonNull(vertx);
        this.commandRouterClient = Objects.requireNonNull(commandRouterClient);
        this.adapterName = Objects.requireNonNull(adapterName);
        this.adapterInstanceIdCounterValue = ADAPTER_INSTANCE_ID_COUNTER.getAndIncrement();
        if (commandRouterClient instanceof ConnectionLifecycle) {
            ((ConnectionLifecycle)commandRouterClient).addReconnectListener(con -> this.reenableCommandRouting());
        }
    }

    void setMaxTenantIdsPerRequest(int count) {
        this.maxTenantIdsPerRequest = count;
    }

    void setKubernetesContainerInfoProvider(KubernetesContainerInfoProvider kubernetesContainerInfoProvider) {
        this.kubernetesContainerInfoProvider = kubernetesContainerInfoProvider;
    }

    private void reenableCommandRouting() {
        List tenantIds = this.commandHandlers.getCommandHandlers().stream().map(CommandHandlerWrapper::getTenantId).distinct().collect(Collectors.toList());
        int idx = 0;
        while (idx < tenantIds.size()) {
            int from = idx;
            int to = from + Math.min(this.maxTenantIdsPerRequest, tenantIds.size() - idx);
            List<String> chunk = tenantIds.subList(from, to);
            this.commandRouterClient.enableCommandRouting(chunk, null);
            idx = to;
        }
    }

    public void registerInternalCommandConsumer(BiFunction<String, CommandHandlers, InternalCommandConsumer> internalCommandConsumerSupplier) {
        this.internalCommandConsumerSuppliers.add(internalCommandConsumerSupplier);
    }

    public Future<Void> start() {
        if (this.internalCommandConsumerSuppliers.isEmpty()) {
            this.startFailureMessage = "no command consumer registered";
            LOG.error("cannot start, {}", (Object)this.startFailureMessage);
            return Future.failedFuture((String)this.startFailureMessage);
        }
        return this.getK8sContainerId(1).compose(containerId -> {
            this.adapterInstanceId = CommandRoutingUtil.getNewAdapterInstanceId(this.adapterName, containerId, this.adapterInstanceIdCounterValue);
            this.internalCommandConsumerSuppliers.stream().map(sup -> (InternalCommandConsumer)sup.apply(this.adapterInstanceId, this.commandHandlers)).forEach(consumer -> {
                LOG.info("created internal command consumer {}", (Object)consumer.getClass().getSimpleName());
                this.internalCommandConsumers.add((InternalCommandConsumer)consumer);
                Optional.ofNullable(this.readinessHandler).ifPresent(arg_0 -> ((InternalCommandConsumer)consumer).registerReadinessChecks(arg_0));
            });
            this.internalCommandConsumerSuppliers.clear();
            this.readinessHandler = null;
            List futures = this.internalCommandConsumers.stream().map(Lifecycle::start).collect(Collectors.toList());
            if (futures.isEmpty()) {
                return Future.failedFuture((String)"no command consumer registered");
            }
            return CompositeFuture.all(futures).mapEmpty();
        }).recover(thr -> {
            this.startFailureMessage = thr.getMessage();
            return Future.failedFuture((Throwable)thr);
        }).mapEmpty();
    }

    private Future<String> getK8sContainerId(int attempt) {
        Context context = this.vertx.getOrCreateContext();
        return this.kubernetesContainerInfoProvider.getContainerId(context).recover(thr -> {
            if (thr instanceof IllegalStateException || this.stopCalled.get()) {
                return Future.failedFuture((Throwable)thr);
            }
            LOG.info("attempt {} to get K8s container id failed, trying again...", (Object)attempt);
            Promise containerIdPromise = Promise.promise();
            context.runOnContext(action -> this.getK8sContainerId(attempt + 1).onComplete((Handler)containerIdPromise));
            return containerIdPromise.future();
        });
    }

    public Future<Void> stop() {
        if (!this.stopCalled.compareAndSet(false, true)) {
            return Future.succeededFuture();
        }
        List futures = this.internalCommandConsumers.stream().map(Lifecycle::stop).collect(Collectors.toList());
        return CompositeFuture.all(futures).mapEmpty();
    }

    public void registerReadinessChecks(HealthCheckHandler readinessHandler) {
        if (!this.internalCommandConsumers.isEmpty()) {
            LOG.warn("registerReadinessChecks expected to be called before start()");
            this.internalCommandConsumers.forEach(consumer -> consumer.registerReadinessChecks(readinessHandler));
            return;
        }
        this.readinessHandler = readinessHandler;
        readinessHandler.register("command-consumer-factory", 1000L, this::checkIfInternalCommandConsumersCreated);
    }

    private void checkIfInternalCommandConsumersCreated(Promise<Status> status) {
        if (this.internalCommandConsumers.isEmpty() || this.startFailureMessage != null) {
            JsonObject data = new JsonObject();
            if (this.startFailureMessage != null) {
                LOG.error("failed to start command consumer factory: {}", (Object)this.startFailureMessage);
                data.put("status", (Object)"startup of command consumer factory failed, check logs for details");
            }
            status.tryComplete((Object)Status.KO((JsonObject)data));
        } else {
            status.tryComplete((Object)Status.OK());
        }
    }

    public void registerLivenessChecks(HealthCheckHandler livenessHandler) {
    }

    @Override
    public final Future<ProtocolAdapterCommandConsumer> createCommandConsumer(String tenantId, String deviceId, boolean sendEvent, Function<CommandContext, Future<Void>> commandHandler, Duration lifespan, SpanContext context) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(commandHandler);
        return this.doCreateCommandConsumer(tenantId, deviceId, null, sendEvent, commandHandler, lifespan, context);
    }

    @Override
    public final Future<ProtocolAdapterCommandConsumer> createCommandConsumer(String tenantId, String deviceId, String gatewayId, boolean sendEvent, Function<CommandContext, Future<Void>> commandHandler, Duration lifespan, SpanContext context) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(gatewayId);
        Objects.requireNonNull(commandHandler);
        return this.doCreateCommandConsumer(tenantId, deviceId, gatewayId, sendEvent, commandHandler, lifespan, context);
    }

    private Future<ProtocolAdapterCommandConsumer> doCreateCommandConsumer(String tenantId, String deviceId, String gatewayId, boolean sendEvent, Function<CommandContext, Future<Void>> commandHandler, Duration lifespan, SpanContext context) {
        if (this.adapterInstanceId == null) {
            return Future.failedFuture((String)"not started yet");
        }
        final Duration sanitizedLifespan = lifespan == null || lifespan.isNegative() || lifespan.getSeconds() > 9223372036L ? Duration.ofSeconds(-1L) : lifespan;
        LOG.trace("create command consumer [tenant-id: {}, device-id: {}, gateway-id: {}]", new Object[]{tenantId, deviceId, gatewayId});
        SpanContext consumerCreationContextToUse = !sanitizedLifespan.isNegative() && sanitizedLifespan.toSeconds() <= 60L ? context : null;
        final CommandHandlerWrapper commandHandlerWrapper = new CommandHandlerWrapper(tenantId, deviceId, gatewayId, commandHandler, Vertx.currentContext(), consumerCreationContextToUse);
        this.commandHandlers.putCommandHandler(commandHandlerWrapper);
        final Instant lifespanStart = Instant.now();
        return this.commandRouterClient.registerCommandConsumer(tenantId, deviceId, sendEvent, this.adapterInstanceId, sanitizedLifespan, context).onFailure(thr -> {
            LOG.info("error registering consumer with the command router service [tenant: {}, device: {}, sendEvent: {}]", new Object[]{tenantId, deviceId, sendEvent, thr});
            this.commandHandlers.removeCommandHandler(tenantId, deviceId);
        }).map(v -> new ProtocolAdapterCommandConsumer(){

            @Override
            public Future<Void> close(boolean sendEvent, SpanContext spanContext) {
                return ProtocolAdapterCommandConsumerFactoryImpl.this.removeCommandConsumer(commandHandlerWrapper, sendEvent, sanitizedLifespan, lifespanStart, spanContext);
            }
        });
    }

    private Future<Void> removeCommandConsumer(CommandHandlerWrapper commandHandlerWrapper, boolean sendEvent, Duration lifespan, Instant lifespanStart, SpanContext onCloseSpanContext) {
        if (this.adapterInstanceId == null) {
            return Future.failedFuture((String)"not started yet");
        }
        String tenantId = commandHandlerWrapper.getTenantId();
        String deviceId = commandHandlerWrapper.getDeviceId();
        LOG.trace("remove command consumer [tenant-id: {}, device-id: {}]", (Object)tenantId, (Object)deviceId);
        if (!this.commandHandlers.removeCommandHandler(commandHandlerWrapper)) {
            LOG.debug("command consumer not removed - handler already replaced or removed [tenant: {}, device: {}]", (Object)tenantId, (Object)deviceId);
            return Future.failedFuture((Throwable)new ClientErrorException(412, "local command handler already replaced or removed"));
        }
        return this.commandRouterClient.unregisterCommandConsumer(tenantId, deviceId, sendEvent, this.adapterInstanceId, onCloseSpanContext).recover(thr -> {
            if (ServiceInvocationException.extractStatusCode((Throwable)thr) == 412) {
                boolean entryMayHaveExpired;
                boolean bl = entryMayHaveExpired = !lifespan.isNegative() && Instant.now().isAfter(lifespanStart.plus(lifespan));
                if (entryMayHaveExpired) {
                    LOG.trace("ignoring 412 error when unregistering consumer with the command router service; entry may have already expired [tenant: {}, device: {}]", (Object)tenantId, (Object)deviceId);
                    return Future.succeededFuture();
                }
                LOG.debug("consumer not unregistered - not matched or already removed [tenant: {}, device: {}]", (Object)tenantId, (Object)deviceId);
                return Future.failedFuture((Throwable)new ClientErrorException(412, "no matching command consumer mapping found to be removed"));
            }
            LOG.info("error unregistering consumer with the command router service [tenant: {}, device: {}]", new Object[]{tenantId, deviceId, thr});
            return Future.failedFuture((Throwable)thr);
        });
    }
}

