/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.notification.pubsub;

import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.pubsub.v1.PubsubMessage;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.eclipse.hono.client.pubsub.PubSubMessageHelper;
import org.eclipse.hono.client.pubsub.subscriber.PubSubSubscriberFactory;
import org.eclipse.hono.notification.AbstractNotification;
import org.eclipse.hono.notification.NotificationReceiver;
import org.eclipse.hono.notification.NotificationType;
import org.eclipse.hono.util.LifecycleStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubSubBasedNotificationReceiver
implements NotificationReceiver {
    private static final String TOPIC_ENDPOINT = "notification";
    private static final Logger log = LoggerFactory.getLogger(PubSubBasedNotificationReceiver.class);
    private final Map<Class<? extends AbstractNotification>, Handler<? extends AbstractNotification>> handlerPerType = new HashMap<Class<? extends AbstractNotification>, Handler<? extends AbstractNotification>>();
    private final PubSubSubscriberFactory factory;
    private final LifecycleStatus lifecycleStatus = new LifecycleStatus();
    private final Set<String> notificationTypes = new HashSet<String>();
    private final MessageReceiver receiver;

    public PubSubBasedNotificationReceiver(PubSubSubscriberFactory factory) {
        this.factory = Objects.requireNonNull(factory);
        this.receiver = (pubsubMessage, ackReplyConsumer) -> {
            this.handleMessage(pubsubMessage);
            ackReplyConsumer.ack();
        };
    }

    protected PubSubBasedNotificationReceiver(PubSubSubscriberFactory factory, MessageReceiver receiver) {
        this.factory = Objects.requireNonNull(factory);
        this.receiver = Objects.requireNonNull(receiver);
    }

    public <T extends AbstractNotification> void registerConsumer(NotificationType<T> notificationType, Handler<T> consumer) {
        if (this.notificationTypes.contains(notificationType.getAddress())) {
            log.debug("Notification receiver {} is already registered", (Object)notificationType.getAddress());
            return;
        }
        this.notificationTypes.add(notificationType.getAddress());
        this.handlerPerType.put(notificationType.getClazz(), consumer);
        String subscriptionId = PubSubMessageHelper.getTopicName((String)TOPIC_ENDPOINT, (String)notificationType.getAddress());
        this.factory.getOrCreateSubscriber(subscriptionId, this.receiver).subscribe(true).onFailure(t -> {
            log.error("Error subscribing for notification {}", (Object)notificationType.getAddress(), t);
            throw new IllegalStateException("Error, can not subscribe for notification", (Throwable)t);
        });
    }

    public Future<Void> start() {
        if (this.lifecycleStatus.isStarting()) {
            log.debug("Pub/Sub based notification receiver already started");
            return Future.succeededFuture();
        }
        if (!this.lifecycleStatus.setStarting()) {
            return Future.failedFuture((Throwable)new IllegalStateException("Consumer is already started/stopping"));
        }
        return Future.succeededFuture();
    }

    public Future<Void> stop() {
        return this.factory.closeAllSubscribers();
    }

    private void handleMessage(PubsubMessage message) {
        Buffer buffer = Buffer.buffer((byte[])PubSubMessageHelper.getPayload((PubsubMessage)message));
        try {
            AbstractNotification notification;
            Handler<? extends AbstractNotification> handler;
            JsonObject json = buffer.toJsonObject();
            if (log.isTraceEnabled()) {
                log.trace("received notification: {}{}", (Object)System.lineSeparator(), (Object)json.encodePrettily());
            }
            if ((handler = this.handlerPerType.get((notification = (AbstractNotification)json.mapTo(AbstractNotification.class)).getClass())) != null) {
                handler.handle((Object)notification);
            }
        }
        catch (RuntimeException e) {
            log.debug("Could not handle Pub/Sub message notification, buffer is empty");
        }
    }
}

