/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.notification.kafka;

import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import java.util.Objects;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.KafkaClientFactory;
import org.eclipse.hono.client.kafka.producer.KafkaProducerConfigProperties;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.client.notification.kafka.NotificationKafkaProducerConfigProperties;
import org.eclipse.hono.client.notification.kafka.NotificationTopicHelper;
import org.eclipse.hono.notification.AbstractNotification;
import org.eclipse.hono.notification.NotificationSender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaBasedNotificationSender
implements NotificationSender {
    public static final String PRODUCER_NAME = "notification";
    private static final Logger LOG = LoggerFactory.getLogger(KafkaBasedNotificationSender.class);
    private final NotificationKafkaProducerConfigProperties config;
    private final KafkaProducerFactory<String, JsonObject> producerFactory;
    private boolean stopped = false;

    public KafkaBasedNotificationSender(KafkaProducerFactory<String, JsonObject> producerFactory, NotificationKafkaProducerConfigProperties config) {
        this.producerFactory = Objects.requireNonNull(producerFactory);
        this.config = Objects.requireNonNull(config);
    }

    public Future<Void> publish(AbstractNotification notification) {
        Objects.requireNonNull(notification);
        if (this.stopped) {
            return Future.failedFuture((Throwable)new ServerErrorException(503, "sender already stopped"));
        }
        return this.createProducerRecord(notification).compose(record -> {
            KafkaProducer producer = this.producerFactory.getOrCreateProducer(PRODUCER_NAME, (KafkaProducerConfigProperties)this.config);
            return producer.send(record).recover(t -> {
                LOG.debug("error publishing notification [{}]", (Object)notification, t);
                return Future.failedFuture((Throwable)new ServerErrorException(503, t));
            });
        }).mapEmpty();
    }

    private Future<KafkaProducerRecord<String, JsonObject>> createProducerRecord(AbstractNotification notification) {
        try {
            String topic = NotificationTopicHelper.getTopicName(notification.getType());
            String key = notification.getKey();
            JsonObject value = JsonObject.mapFrom((Object)notification);
            return Future.succeededFuture((Object)KafkaProducerRecord.create((String)topic, (Object)key, (Object)value));
        }
        catch (RuntimeException ex) {
            LOG.error("error creating producer record for notification [{}]", (Object)notification, (Object)ex);
            return Future.failedFuture((Throwable)new ServerErrorException(500, (Throwable)ex));
        }
    }

    public Future<Void> start() {
        this.stopped = false;
        return this.producerFactory.getOrCreateProducerWithRetries(PRODUCER_NAME, (KafkaProducerConfigProperties)this.config, KafkaClientFactory.UNLIMITED_RETRIES_DURATION).mapEmpty();
    }

    public Future<Void> stop() {
        this.stopped = true;
        return this.producerFactory.closeProducer(PRODUCER_NAME);
    }
}

