/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.commandrouter.impl.amqp;

import io.opentracing.SpanContext;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.hono.client.amqp.AbstractServiceClient;
import org.eclipse.hono.client.amqp.config.AddressHelper;
import org.eclipse.hono.client.amqp.config.ClientConfigProperties;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.amqp.connection.SendMessageSampler;
import org.eclipse.hono.client.command.InternalCommandSender;
import org.eclipse.hono.client.command.amqp.ProtonBasedInternalCommandSender;
import org.eclipse.hono.client.registry.TenantClient;
import org.eclipse.hono.client.util.CachingClientFactory;
import org.eclipse.hono.commandrouter.CommandConsumerFactory;
import org.eclipse.hono.commandrouter.CommandRouterMetrics;
import org.eclipse.hono.commandrouter.CommandTargetMapper;
import org.eclipse.hono.commandrouter.impl.amqp.ProtonBasedCommandProcessingQueue;
import org.eclipse.hono.commandrouter.impl.amqp.ProtonBasedMappingAndDelegatingCommandHandler;
import org.eclipse.hono.notification.NotificationEventBusSupport;
import org.eclipse.hono.notification.NotificationType;
import org.eclipse.hono.notification.deviceregistry.AllDevicesOfTenantDeletedNotification;
import org.eclipse.hono.notification.deviceregistry.LifecycleChange;
import org.eclipse.hono.notification.deviceregistry.TenantChangeNotification;

public class ProtonBasedCommandConsumerFactoryImpl
extends AbstractServiceClient
implements CommandConsumerFactory {
    private static final int RECREATE_CONSUMERS_DELAY = 20;
    private final CachingClientFactory<CommandConsumer> mappingAndDelegatingCommandConsumerFactory;
    private final AtomicBoolean recreatingConsumers = new AtomicBoolean(false);
    private final AtomicBoolean tryAgainRecreatingConsumers = new AtomicBoolean(false);
    private final ProtonBasedCommandProcessingQueue commandQueue;
    private final ProtonBasedMappingAndDelegatingCommandHandler mappingAndDelegatingCommandHandler;
    private final Set<String> consumerLinkTenants = new HashSet<String>();

    public ProtonBasedCommandConsumerFactoryImpl(HonoConnection connection, TenantClient tenantClient, CommandTargetMapper commandTargetMapper, CommandRouterMetrics metrics, SendMessageSampler.Factory samplerFactory) {
        super(connection, samplerFactory);
        Objects.requireNonNull(tenantClient);
        Objects.requireNonNull(commandTargetMapper);
        Vertx vertx = connection.getVertx();
        ProtonBasedInternalCommandSender internalCommandSender = new ProtonBasedInternalCommandSender(connection);
        internalCommandSender.setSkipConnectDisconnectOnStartStop(true);
        this.commandQueue = new ProtonBasedCommandProcessingQueue(vertx);
        this.mappingAndDelegatingCommandHandler = new ProtonBasedMappingAndDelegatingCommandHandler(vertx, tenantClient, this.commandQueue, (InternalCommandSender)internalCommandSender, commandTargetMapper, metrics, connection.getTracer());
        this.mappingAndDelegatingCommandConsumerFactory = new CachingClientFactory(vertx, c -> true);
    }

    public Future<Void> start() {
        this.registerCloseConsumerLinkHandler();
        return Future.all((Future)this.connectOnStart(), this.mappingAndDelegatingCommandHandler.start()).map((Object)null).onSuccess(v -> {
            this.connection.addReconnectListener(c -> this.recreateConsumers());
            this.recreateConsumers();
        });
    }

    public Future<Void> stop() {
        return Future.join((Future)this.disconnectOnStop(), this.mappingAndDelegatingCommandHandler.stop()).map((Object)null);
    }

    private void registerCloseConsumerLinkHandler() {
        NotificationEventBusSupport.registerConsumer((Vertx)this.connection.getVertx(), (NotificationType)AllDevicesOfTenantDeletedNotification.TYPE, notification -> Optional.ofNullable((CommandConsumer)this.mappingAndDelegatingCommandConsumerFactory.getClient(notification.getTenantId())).ifPresent(CommandConsumer::close));
        NotificationEventBusSupport.registerConsumer((Vertx)this.connection.getVertx(), (NotificationType)TenantChangeNotification.TYPE, notification -> {
            if (LifecycleChange.DELETE.equals((Object)notification.getChange()) || LifecycleChange.UPDATE.equals((Object)notification.getChange()) && !notification.isTenantEnabled()) {
                Optional.ofNullable((CommandConsumer)this.mappingAndDelegatingCommandConsumerFactory.getClient(notification.getTenantId())).ifPresent(CommandConsumer::close);
            }
        });
    }

    protected void onDisconnect() {
        this.mappingAndDelegatingCommandConsumerFactory.onDisconnect();
        this.consumerLinkTenants.clear();
    }

    @Override
    public final Future<Void> createCommandConsumer(String tenantId, SpanContext context) {
        Objects.requireNonNull(tenantId);
        return this.connection.executeOnContext(result -> this.getOrCreateMappingAndDelegatingCommandConsumer(tenantId).map((Object)null).onComplete((Handler)result));
    }

    private Future<CommandConsumer> getOrCreateMappingAndDelegatingCommandConsumer(String tenantId) {
        Future messageConsumerFuture = this.connection.isConnected(this.getDefaultConnectionCheckTimeout()).compose(v -> this.connection.executeOnContext(result -> this.mappingAndDelegatingCommandConsumerFactory.getOrCreateClient(tenantId, () -> this.newMappingAndDelegatingCommandConsumer(tenantId), (Handler)result)));
        return messageConsumerFuture.recover(thr -> {
            this.log.debug("failed to create mappingAndDelegatingCommandConsumer for tenant {}", (Object)tenantId, thr);
            return Future.failedFuture((Throwable)thr);
        });
    }

    private Future<CommandConsumer> newMappingAndDelegatingCommandConsumer(final String tenantId) {
        this.log.trace("creating new MappingAndDelegatingCommandConsumer [tenant-id: {}]", (Object)tenantId);
        String address = AddressHelper.getTargetAddress((String)"command", (String)tenantId, null, (ClientConfigProperties)this.connection.getConfig());
        return this.connection.createReceiver(address, ProtonQoS.AT_LEAST_ONCE, (delivery, message) -> this.mappingAndDelegatingCommandHandler.mapAndDelegateIncomingCommandMessage(tenantId, delivery, message), this.connection.getConfig().getInitialCredits(), false, sourceAddress -> {
            this.log.debug("MappingAndDelegatingCommandConsumer receiver link [tenant-id: {}] closed remotely", (Object)tenantId);
            this.mappingAndDelegatingCommandConsumerFactory.removeClient(tenantId);
            this.invokeRecreateConsumersWithDelay();
        }).map(receiver -> {
            this.log.debug("successfully created MappingAndDelegatingCommandConsumer [{}]", (Object)address);
            this.consumerLinkTenants.add(tenantId);
            return new CommandConsumer(){
                final /* synthetic */ ProtonReceiver val$receiver;
                {
                    this.val$receiver = protonReceiver;
                }

                @Override
                public Future<Void> close() {
                    ProtonBasedCommandConsumerFactoryImpl.this.log.debug("MappingAndDelegatingCommandConsumer consumer [tenant-id: {}] closed locally", (Object)tenantId);
                    ProtonBasedCommandConsumerFactoryImpl.this.mappingAndDelegatingCommandConsumerFactory.removeClient(tenantId);
                    ProtonBasedCommandConsumerFactoryImpl.this.consumerLinkTenants.remove(tenantId);
                    Promise result = Promise.promise();
                    ProtonBasedCommandConsumerFactoryImpl.this.connection.closeAndFree((ProtonLink)this.val$receiver, receiverClosed -> result.complete());
                    ProtonBasedCommandConsumerFactoryImpl.this.commandQueue.removeEntriesForTenant(tenantId);
                    return result.future();
                }
            };
        }).recover(t -> {
            this.log.debug("failed to create MappingAndDelegatingCommandConsumer [tenant-id: {}]", (Object)tenantId, t);
            return Future.failedFuture((Throwable)t);
        });
    }

    private void recreateConsumers() {
        if (this.recreatingConsumers.compareAndSet(false, true)) {
            this.log.debug("recreate command consumer links");
            this.connection.isConnected(this.getDefaultConnectionCheckTimeout()).compose(res -> {
                ArrayList consumerCreationFutures = new ArrayList();
                this.consumerLinkTenants.forEach(tenantId -> {
                    this.log.debug("recreate command consumer link for tenant {}", tenantId);
                    consumerCreationFutures.add(this.getOrCreateMappingAndDelegatingCommandConsumer((String)tenantId));
                });
                return Future.join(consumerCreationFutures);
            }).onComplete(ar -> {
                this.recreatingConsumers.set(false);
                if (this.tryAgainRecreatingConsumers.compareAndSet(true, false) || ar.failed()) {
                    if (ar.succeeded()) {
                        this.recreateConsumers();
                    } else {
                        this.invokeRecreateConsumersWithDelay();
                    }
                }
            });
        } else {
            this.log.debug("already recreating consumers");
            this.tryAgainRecreatingConsumers.set(true);
        }
    }

    private void invokeRecreateConsumersWithDelay() {
        this.connection.getVertx().setTimer(20L, tid -> this.recreateConsumers());
    }

    private static interface CommandConsumer {
        public Future<Void> close();
    }
}

