/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.telemetry.pubsub;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.eclipse.hono.client.pubsub.AbstractPubSubBasedMessageSender;
import org.eclipse.hono.client.pubsub.PubSubPublisherFactory;
import org.eclipse.hono.client.telemetry.EventSender;
import org.eclipse.hono.client.telemetry.TelemetrySender;
import org.eclipse.hono.client.util.DownstreamMessageProperties;
import org.eclipse.hono.notification.NotificationEventBusSupport;
import org.eclipse.hono.notification.NotificationType;
import org.eclipse.hono.notification.deviceregistry.LifecycleChange;
import org.eclipse.hono.notification.deviceregistry.TenantChangeNotification;
import org.eclipse.hono.util.QoS;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.TenantObject;

public final class PubSubBasedDownstreamSender
extends AbstractPubSubBasedMessageSender
implements TelemetrySender,
EventSender {
    private static final String PUBSUB_PROPERTY_PROJECT_ID = "projectId";
    private static final String PUBSUB_PROPERTY_TENANT_ID = "tenantId";
    private static final String PUBSUB_PROPERTY_DEVICE_ID = "deviceId";
    private static final String PUBSUB_PROPERTY_DEVICE_REGISTRY_ID = "deviceRegistryId";
    private final boolean isDefaultsEnabled;

    public PubSubBasedDownstreamSender(Vertx vertx, PubSubPublisherFactory publisherFactory, String topic, String projectId, boolean includeDefaults, Tracer tracer) {
        super(publisherFactory, topic, projectId, tracer);
        Objects.requireNonNull(vertx);
        this.isDefaultsEnabled = includeDefaults;
        NotificationEventBusSupport.registerConsumer((Vertx)vertx, (NotificationType)TenantChangeNotification.TYPE, notification -> {
            if (LifecycleChange.DELETE.equals((Object)notification.getChange())) {
                publisherFactory.getPublisher(topic, notification.getTenantId()).ifPresent(publisher -> publisherFactory.closePublisher(topic, notification.getTenantId()));
            }
        });
    }

    public Future<Void> sendEvent(TenantObject tenant, RegistrationAssertion device, String contentType, Buffer payload, Map<String, Object> properties, SpanContext context) {
        Objects.requireNonNull(tenant);
        Objects.requireNonNull(device);
        if (this.log.isTraceEnabled()) {
            this.log.trace("sending event data [tenantId: {}, deviceId: {}, contentType: {}, properties: {}]", new Object[]{tenant.getTenantId(), device.getDeviceId(), contentType, properties});
        }
        String topicEndpoint = "event";
        String tenantId = tenant.getTenantId();
        String deviceId = device.getDeviceId();
        Map<String, Object> propsWithDefaults = this.addDefaults("event", tenant, device, QoS.AT_LEAST_ONCE, contentType, properties);
        Span currentSpan = this.startSpan("forward event", tenantId, deviceId, "child_of", context);
        return this.sendAndWaitForOutcome("event", tenantId, deviceId, payload, propsWithDefaults, currentSpan).onComplete(ar -> currentSpan.finish());
    }

    public Future<Void> sendTelemetry(TenantObject tenant, RegistrationAssertion device, QoS qos, String contentType, Buffer payload, Map<String, Object> properties, SpanContext context) {
        Objects.requireNonNull(tenant);
        Objects.requireNonNull(device);
        Objects.requireNonNull(qos);
        if (this.log.isTraceEnabled()) {
            this.log.trace("sending telemetry data [tenantId: {}, deviceId: {}, qos: {}, contentType: {}, properties: {}]", new Object[]{tenant.getTenantId(), device.getDeviceId(), qos, contentType, properties});
        }
        String topicEndpoint = "telemetry";
        String tenantId = tenant.getTenantId();
        String deviceId = device.getDeviceId();
        Map<String, Object> propsWithDefaults = this.addDefaults("telemetry", tenant, device, qos, contentType, properties);
        Span currentSpan = this.startSpan("forward telemetry", tenantId, deviceId, qos == QoS.AT_MOST_ONCE ? "follows_from" : "child_of", context);
        Future outcome = this.sendAndWaitForOutcome("telemetry", tenantId, deviceId, payload, propsWithDefaults, currentSpan).onComplete(ar -> currentSpan.finish());
        if (qos == QoS.AT_MOST_ONCE) {
            return Future.succeededFuture();
        }
        return outcome;
    }

    private Map<String, Object> addDefaults(String topicEndpoint, TenantObject tenant, RegistrationAssertion device, QoS qos, String contentType, Map<String, Object> properties) {
        Objects.requireNonNull(topicEndpoint);
        Objects.requireNonNull(tenant);
        Objects.requireNonNull(device);
        Objects.requireNonNull(qos);
        Map messageProperties = Optional.ofNullable(properties).map(HashMap::new).orElseGet(HashMap::new);
        messageProperties.put(PUBSUB_PROPERTY_DEVICE_ID, device.getDeviceId());
        messageProperties.put("qos", qos.ordinal());
        messageProperties.put(PUBSUB_PROPERTY_TENANT_ID, tenant.getTenantId());
        messageProperties.put(PUBSUB_PROPERTY_DEVICE_REGISTRY_ID, tenant.getTenantId());
        messageProperties.put(PUBSUB_PROPERTY_PROJECT_ID, this.projectId);
        Optional.ofNullable(contentType).ifPresent(ct -> messageProperties.put("content-type", ct));
        return new DownstreamMessageProperties(topicEndpoint, this.isDefaultsEnabled ? tenant.getDefaults().getMap() : null, this.isDefaultsEnabled ? device.getDefaults() : null, messageProperties, tenant.getResourceLimits()).asMap();
    }
}

