/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.notification.kafka;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.eclipse.hono.client.kafka.KafkaClientFactory;
import org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumer;
import org.eclipse.hono.client.notification.kafka.NotificationKafkaConsumerConfigProperties;
import org.eclipse.hono.client.notification.kafka.NotificationTopicHelper;
import org.eclipse.hono.notification.AbstractNotification;
import org.eclipse.hono.notification.NotificationReceiver;
import org.eclipse.hono.notification.NotificationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaBasedNotificationReceiver
extends HonoKafkaConsumer<JsonObject>
implements NotificationReceiver {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaBasedNotificationReceiver.class);
    private static final String NAME = "notification";
    private final Map<Class<? extends AbstractNotification>, Handler<? extends AbstractNotification>> handlerPerType = new HashMap<Class<? extends AbstractNotification>, Handler<? extends AbstractNotification>>();

    public KafkaBasedNotificationReceiver(Vertx vertx, NotificationKafkaConsumerConfigProperties consumerConfig) {
        super(vertx, Set.of(), (Pattern)null, consumerConfig.getConsumerConfig(NAME));
        if (!consumerConfig.isConfigured()) {
            throw new IllegalArgumentException("No Kafka configuration found!");
        }
        this.setPollTimeout(Duration.ofMillis(consumerConfig.getPollTimeout()));
        this.setConsumerCreationRetriesTimeout(KafkaClientFactory.UNLIMITED_RETRIES_DURATION);
        this.setRecordHandler(this::handleRecord);
    }

    public <T extends AbstractNotification> void registerConsumer(NotificationType<T> notificationType, Handler<T> consumer) {
        if (!this.lifecycleStatus.isStopped()) {
            throw new IllegalStateException("consumers cannot be added when receiver is already started");
        }
        this.addTopic(NotificationTopicHelper.getTopicName(notificationType));
        this.handlerPerType.put(notificationType.getClazz(), consumer);
    }

    private void handleRecord(KafkaConsumerRecord<String, JsonObject> record) {
        AbstractNotification notification;
        Handler<? extends AbstractNotification> handler;
        JsonObject json = (JsonObject)record.value();
        if (LOG.isTraceEnabled()) {
            LOG.trace("received notification:{}{}", (Object)System.lineSeparator(), (Object)json.encodePrettily());
        }
        if ((handler = this.handlerPerType.get((notification = (AbstractNotification)json.mapTo(AbstractNotification.class)).getClass())) != null) {
            handler.handle((Object)notification);
        }
    }

    public Future<Void> stop() {
        return super.stop().onComplete(v -> this.handlerPerType.clear()).mapEmpty();
    }
}

