/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.commandrouter.impl.amqp;

import io.opentracing.SpanContext;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.amqp.AbstractServiceClient;
import org.eclipse.hono.client.command.CommandConsumer;
import org.eclipse.hono.client.impl.CachingClientFactory;
import org.eclipse.hono.client.registry.TenantClient;
import org.eclipse.hono.commandrouter.CommandConsumerFactory;
import org.eclipse.hono.commandrouter.CommandRouterMetrics;
import org.eclipse.hono.commandrouter.CommandTargetMapper;
import org.eclipse.hono.commandrouter.impl.amqp.ProtonBasedMappingAndDelegatingCommandHandler;
import org.eclipse.hono.config.ClientConfigProperties;
import org.eclipse.hono.util.AddressHelper;

public class ProtonBasedCommandConsumerFactoryImpl
extends AbstractServiceClient
implements CommandConsumerFactory {
    private static final int RECREATE_CONSUMERS_DELAY = 20;
    private final CachingClientFactory<CommandConsumer> mappingAndDelegatingCommandConsumerFactory;
    private final AtomicBoolean recreatingConsumers = new AtomicBoolean(false);
    private final AtomicBoolean tryAgainRecreatingConsumers = new AtomicBoolean(false);
    private final ProtonBasedMappingAndDelegatingCommandHandler mappingAndDelegatingCommandHandler;
    private final Set<String> consumerLinkTenants = new HashSet<String>();

    public ProtonBasedCommandConsumerFactoryImpl(HonoConnection connection, TenantClient tenantClient, CommandTargetMapper commandTargetMapper, CommandRouterMetrics metrics, SendMessageSampler.Factory samplerFactory) {
        super(connection, samplerFactory);
        Objects.requireNonNull(tenantClient);
        Objects.requireNonNull(commandTargetMapper);
        this.mappingAndDelegatingCommandHandler = new ProtonBasedMappingAndDelegatingCommandHandler(tenantClient, connection, commandTargetMapper, metrics);
        this.mappingAndDelegatingCommandConsumerFactory = new CachingClientFactory(connection.getVertx(), c -> true);
    }

    public Future<Void> start() {
        return this.connection.connect().onSuccess(ok -> this.log.info("connection to {} endpoint has been established", (Object)this.connection.getConfig().getServerRole())).onFailure(t -> this.log.warn("failed to establish connection to {} endpoint", (Object)this.connection.getConfig().getServerRole(), t)).map(ok -> {
            this.connection.addReconnectListener(c -> this.recreateConsumers());
            this.recreateConsumers();
            return null;
        });
    }

    protected void onDisconnect() {
        this.mappingAndDelegatingCommandConsumerFactory.clearState();
        this.consumerLinkTenants.clear();
    }

    @Override
    public final Future<Void> createCommandConsumer(String tenantId, SpanContext context) {
        Objects.requireNonNull(tenantId);
        return this.connection.executeOnContext(result -> this.getOrCreateMappingAndDelegatingCommandConsumer(tenantId).map((Object)null).onComplete((Handler)result));
    }

    private Future<CommandConsumer> getOrCreateMappingAndDelegatingCommandConsumer(String tenantId) {
        Future messageConsumerFuture = this.connection.isConnected(this.getDefaultConnectionCheckTimeout()).compose(v -> this.connection.executeOnContext(result -> this.mappingAndDelegatingCommandConsumerFactory.getOrCreateClient(tenantId, () -> this.newMappingAndDelegatingCommandConsumer(tenantId), (Handler)result)));
        return messageConsumerFuture.recover(thr -> {
            this.log.debug("failed to create mappingAndDelegatingCommandConsumer for tenant {}", (Object)tenantId, thr);
            return Future.failedFuture((Throwable)thr);
        });
    }

    private Future<CommandConsumer> newMappingAndDelegatingCommandConsumer(final String tenantId) {
        this.log.trace("creating new MappingAndDelegatingCommandConsumer [tenant-id: {}]", (Object)tenantId);
        String address = AddressHelper.getTargetAddress((String)"command", (String)tenantId, null, (ClientConfigProperties)this.connection.getConfig());
        return this.connection.createReceiver(address, ProtonQoS.AT_LEAST_ONCE, (delivery, message) -> this.mappingAndDelegatingCommandHandler.mapAndDelegateIncomingCommandMessage(tenantId, delivery, message), this.connection.getConfig().getInitialCredits(), false, sourceAddress -> {
            this.log.debug("MappingAndDelegatingCommandConsumer receiver link [tenant-id: {}] closed remotely", (Object)tenantId);
            this.mappingAndDelegatingCommandConsumerFactory.removeClient(tenantId);
            this.invokeRecreateConsumersWithDelay();
        }).map(receiver -> {
            this.log.debug("successfully created MappingAndDelegatingCommandConsumer [{}]", (Object)address);
            this.consumerLinkTenants.add(tenantId);
            return new CommandConsumer((ProtonReceiver)receiver){
                final /* synthetic */ ProtonReceiver val$receiver;
                {
                    this.val$receiver = protonReceiver;
                }

                public Future<Void> close(SpanContext spanContext) {
                    ProtonBasedCommandConsumerFactoryImpl.this.log.debug("MappingAndDelegatingCommandConsumer receiver link [tenant-id: {}] closed locally", (Object)tenantId);
                    ProtonBasedCommandConsumerFactoryImpl.this.mappingAndDelegatingCommandConsumerFactory.removeClient(tenantId);
                    ProtonBasedCommandConsumerFactoryImpl.this.consumerLinkTenants.remove(tenantId);
                    Promise result = Promise.promise();
                    ProtonBasedCommandConsumerFactoryImpl.this.connection.closeAndFree((ProtonLink)this.val$receiver, receiverClosed -> result.complete());
                    return result.future();
                }
            };
        }).recover(t -> {
            this.log.debug("failed to create MappingAndDelegatingCommandConsumer [tenant-id: {}]", (Object)tenantId, t);
            return Future.failedFuture((Throwable)t);
        });
    }

    private void recreateConsumers() {
        if (this.recreatingConsumers.compareAndSet(false, true)) {
            this.log.debug("recreate command consumer links");
            this.connection.isConnected(this.getDefaultConnectionCheckTimeout()).compose(res -> {
                ArrayList consumerCreationFutures = new ArrayList();
                this.consumerLinkTenants.forEach(tenantId -> {
                    this.log.debug("recreate command consumer link for tenant {}", tenantId);
                    consumerCreationFutures.add(this.getOrCreateMappingAndDelegatingCommandConsumer((String)tenantId));
                });
                return CompositeFuture.join(consumerCreationFutures);
            }).onComplete(ar -> {
                this.recreatingConsumers.set(false);
                if (this.tryAgainRecreatingConsumers.compareAndSet(true, false) || ar.failed()) {
                    if (ar.succeeded()) {
                        this.recreateConsumers();
                    } else {
                        this.invokeRecreateConsumersWithDelay();
                    }
                }
            });
        } else {
            this.log.debug("already recreating consumers");
            this.tryAgainRecreatingConsumers.set(true);
        }
    }

    private void invokeRecreateConsumersWithDelay() {
        this.connection.getVertx().setTimer(20L, tid -> this.recreateConsumers());
    }
}

