/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.notification.amqp;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonMessageHandler;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.amqp.AbstractServiceClient;
import org.eclipse.hono.client.amqp.connection.AmqpUtils;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.amqp.connection.SendMessageSampler;
import org.eclipse.hono.client.notification.amqp.NotificationAddressHelper;
import org.eclipse.hono.client.util.CachingClientFactory;
import org.eclipse.hono.notification.AbstractNotification;
import org.eclipse.hono.notification.NotificationReceiver;
import org.eclipse.hono.notification.NotificationType;

public class ProtonBasedNotificationReceiver
extends AbstractServiceClient
implements NotificationReceiver {
    private static final int RECREATE_CONSUMERS_DELAY_MILLIS = 20;
    private final CachingClientFactory<ProtonReceiver> receiverFactory;
    private final AtomicBoolean recreatingConsumers = new AtomicBoolean(false);
    private final AtomicBoolean tryAgainRecreatingConsumers = new AtomicBoolean(false);
    private final Map<Class<? extends AbstractNotification>, Handler<? extends AbstractNotification>> handlerPerType = new HashMap<Class<? extends AbstractNotification>, Handler<? extends AbstractNotification>>();
    private final Set<String> addresses = new HashSet<String>();
    private final AtomicBoolean startCalled = new AtomicBoolean();
    private final AtomicBoolean stopCalled = new AtomicBoolean();

    public ProtonBasedNotificationReceiver(HonoConnection connection) {
        super(connection, SendMessageSampler.Factory.noop());
        this.receiverFactory = new CachingClientFactory(connection.getVertx(), c -> true);
    }

    public Future<Void> start() {
        if (!this.startCalled.compareAndSet(false, true)) {
            return Future.succeededFuture();
        }
        return this.connectOnStart().onComplete(v -> {
            if (this.addresses.isEmpty()) {
                this.log.warn("no notification consumers registered - nothing to do");
            } else {
                this.connection.addReconnectListener(c -> this.recreateConsumers());
                this.recreateConsumers();
            }
        });
    }

    public Future<Void> stop() {
        if (!this.stopCalled.compareAndSet(false, true)) {
            return Future.succeededFuture();
        }
        this.addresses.clear();
        this.handlerPerType.clear();
        return this.disconnectOnStop();
    }

    protected void onDisconnect() {
        this.receiverFactory.onDisconnect();
    }

    public <T extends AbstractNotification> void registerConsumer(NotificationType<T> notificationType, Handler<T> consumer) {
        if (this.startCalled.get()) {
            throw new IllegalStateException("consumers cannot be added when consumer is already started");
        }
        String address = NotificationAddressHelper.getAddress(notificationType);
        this.addresses.add(address);
        this.handlerPerType.put(notificationType.getClazz(), consumer);
        this.log.debug("registered notification receiver [type: {}; address: {}]", (Object)notificationType.getClazz().getSimpleName(), (Object)address);
    }

    private void recreateConsumers() {
        if (this.recreatingConsumers.compareAndSet(false, true)) {
            this.log.debug("recreate notification consumer links");
            this.connection.isConnected(this.getDefaultConnectionCheckTimeout()).compose(res -> {
                ArrayList consumerCreationFutures = new ArrayList();
                this.addresses.forEach(address -> consumerCreationFutures.add(this.createNotificationConsumerIfNeeded((String)address)));
                return Future.join(consumerCreationFutures);
            }).onComplete(ar -> {
                this.recreatingConsumers.set(false);
                if (this.tryAgainRecreatingConsumers.compareAndSet(true, false) || ar.failed()) {
                    if (ar.succeeded()) {
                        this.recreateConsumers();
                    } else {
                        this.invokeRecreateConsumersWithDelay();
                    }
                }
            });
        } else {
            this.log.debug("already recreating consumers");
            this.tryAgainRecreatingConsumers.set(true);
        }
    }

    private void invokeRecreateConsumersWithDelay() {
        this.connection.getVertx().setTimer(20L, tid -> this.recreateConsumers());
    }

    private Future<ProtonReceiver> createNotificationConsumerIfNeeded(String address) {
        return this.connection.isConnected(this.getDefaultConnectionCheckTimeout()).compose(v -> this.connection.executeOnContext(result -> this.receiverFactory.getOrCreateClient(address, () -> this.createProtonReceiver(address), (Handler)result)));
    }

    private Future<ProtonReceiver> createProtonReceiver(String address) {
        this.log.debug("creating new notification receiver link [address: {}]", (Object)address);
        return this.connection.createReceiver(address, ProtonQoS.AT_LEAST_ONCE, this.getProtonMessageHandler(address), sourceAddress -> {
            this.log.debug("notification receiver link [address: {}] closed remotely", (Object)address);
            this.receiverFactory.removeClient(address);
            this.invokeRecreateConsumersWithDelay();
        }).onSuccess(receiver -> this.log.debug("successfully created notification receiver link [address: {}]", (Object)address)).onFailure(t -> this.log.debug("failed to create notification receiver link [address: {}]", (Object)address, t));
    }

    private ProtonMessageHandler getProtonMessageHandler(String address) {
        return (delivery, message) -> {
            Buffer payload = AmqpUtils.getPayload((Message)message);
            if (payload != null) {
                Handler<? extends AbstractNotification> handler;
                AbstractNotification notification;
                String expectedAddress;
                JsonObject json = payload.toJsonObject();
                if (this.log.isTraceEnabled()) {
                    this.log.trace("received notification:{}{}", (Object)System.lineSeparator(), (Object)json.encodePrettily());
                }
                if (!address.equals(expectedAddress = NotificationAddressHelper.getAddress((notification = (AbstractNotification)json.mapTo(AbstractNotification.class)).getType()))) {
                    this.log.warn("got notification of type [{}] on unexpected address [{}]; expected address is [{}]", new Object[]{notification.getClass(), address, expectedAddress});
                }
                if ((handler = this.handlerPerType.get(notification.getClass())) != null) {
                    handler.handle((Object)notification);
                }
            }
        };
    }
}

