/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.notification.kafka;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.Json;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.eclipse.hono.client.kafka.KafkaClientFactory;
import org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumer;
import org.eclipse.hono.client.notification.kafka.NotificationKafkaConsumerConfigProperties;
import org.eclipse.hono.client.notification.kafka.NotificationTopicHelper;
import org.eclipse.hono.notification.AbstractNotification;
import org.eclipse.hono.notification.NotificationReceiver;
import org.eclipse.hono.notification.NotificationType;

public class KafkaBasedNotificationReceiver
implements NotificationReceiver {
    private static final String NAME = "notification";
    private final Vertx vertx;
    private final NotificationKafkaConsumerConfigProperties consumerConfig;
    private final Map<Class<? extends AbstractNotification>, Handler<? extends AbstractNotification>> handlerPerType = new HashMap<Class<? extends AbstractNotification>, Handler<? extends AbstractNotification>>();
    private final Set<String> topics = new HashSet<String>();
    private Supplier<Consumer<String, Buffer>> kafkaConsumerSupplier;
    private HonoKafkaConsumer honoKafkaConsumer;
    private boolean started = false;

    public KafkaBasedNotificationReceiver(Vertx vertx, NotificationKafkaConsumerConfigProperties consumerConfig) {
        Objects.requireNonNull(vertx);
        Objects.requireNonNull(consumerConfig);
        if (!consumerConfig.isConfigured()) {
            throw new IllegalArgumentException("No Kafka configuration found!");
        }
        this.vertx = vertx;
        this.consumerConfig = consumerConfig;
    }

    void setKafkaConsumerFactory(Supplier<Consumer<String, Buffer>> kafkaConsumerSupplier) {
        this.kafkaConsumerSupplier = Objects.requireNonNull(kafkaConsumerSupplier);
    }

    public Future<Void> start() {
        this.honoKafkaConsumer = new HonoKafkaConsumer(this.vertx, this.topics, this.getRecordHandler(), this.consumerConfig.getConsumerConfig(NAME));
        this.honoKafkaConsumer.setPollTimeout(Duration.ofMillis(this.consumerConfig.getPollTimeout()));
        this.honoKafkaConsumer.setConsumerCreationRetriesTimeout(KafkaClientFactory.UNLIMITED_RETRIES_DURATION);
        Optional.ofNullable(this.kafkaConsumerSupplier).ifPresent(arg_0 -> ((HonoKafkaConsumer)this.honoKafkaConsumer).setKafkaConsumerSupplier(arg_0));
        return this.honoKafkaConsumer.start().onSuccess(ok -> {
            this.started = true;
        });
    }

    public <T extends AbstractNotification> void registerConsumer(NotificationType<T> notificationType, Handler<T> consumer) {
        if (this.started) {
            throw new IllegalStateException("consumers cannot be added when receiver is already started.");
        }
        this.topics.add(NotificationTopicHelper.getTopicName(notificationType));
        this.handlerPerType.put(notificationType.getClazz(), consumer);
    }

    private Handler<KafkaConsumerRecord<String, Buffer>> getRecordHandler() {
        return record -> {
            AbstractNotification notification = (AbstractNotification)Json.decodeValue((Buffer)((Buffer)record.value()), AbstractNotification.class);
            Handler<? extends AbstractNotification> handler = this.handlerPerType.get(notification.getClass());
            if (handler != null) {
                handler.handle((Object)notification);
            }
        };
    }

    public Future<Void> stop() {
        return this.stopKafkaConsumer().onComplete(v -> this.topics.clear()).onComplete(v -> this.handlerPerType.clear()).onComplete(v -> {
            this.started = false;
        }).mapEmpty();
    }

    private Future<Void> stopKafkaConsumer() {
        if (this.honoKafkaConsumer != null) {
            return this.honoKafkaConsumer.stop();
        }
        return Future.succeededFuture();
    }
}

