/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.notification.kafka;

import io.opentracing.Tracer;
import io.opentracing.noop.NoopTracerFactory;
import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import java.util.Objects;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.producer.AbstractKafkaBasedMessageSender;
import org.eclipse.hono.client.kafka.producer.KafkaProducerConfigProperties;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.client.notification.kafka.NotificationKafkaProducerConfigProperties;
import org.eclipse.hono.client.notification.kafka.NotificationTopicHelper;
import org.eclipse.hono.notification.AbstractNotification;
import org.eclipse.hono.notification.NotificationSender;

public class KafkaBasedNotificationSender
extends AbstractKafkaBasedMessageSender<JsonObject>
implements NotificationSender {
    static final String PRODUCER_NAME = "notification";

    public KafkaBasedNotificationSender(KafkaProducerFactory<String, JsonObject> producerFactory, NotificationKafkaProducerConfigProperties config) {
        super(producerFactory, PRODUCER_NAME, (KafkaProducerConfigProperties)config, (Tracer)NoopTracerFactory.create());
    }

    public Future<Void> publish(AbstractNotification notification) {
        Objects.requireNonNull(notification);
        if (!this.lifecycleStatus.isStarted()) {
            return Future.failedFuture((Throwable)new ServerErrorException(503, "sender not started"));
        }
        return this.createProducerRecord(notification).compose(record -> this.getOrCreateProducer().send(record).recover(t -> {
            this.log.debug("error publishing notification [{}]", (Object)notification, t);
            return Future.failedFuture((Throwable)new ServerErrorException(503, t));
        })).mapEmpty();
    }

    private Future<KafkaProducerRecord<String, JsonObject>> createProducerRecord(AbstractNotification notification) {
        try {
            String topic = NotificationTopicHelper.getTopicName(notification.getType());
            String key = notification.getKey();
            JsonObject value = JsonObject.mapFrom((Object)notification);
            return Future.succeededFuture((Object)KafkaProducerRecord.create((String)topic, (Object)key, (Object)value));
        }
        catch (RuntimeException ex) {
            this.log.error("error creating producer record for notification [{}]", (Object)notification, (Object)ex);
            return Future.failedFuture((Throwable)new ServerErrorException(500, (Throwable)ex));
        }
    }
}

