/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.telemetry.kafka;

import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.producer.AbstractKafkaBasedMessageSender;
import org.eclipse.hono.client.kafka.producer.KafkaProducerConfigProperties;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.util.QoS;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.ResourceLimits;
import org.eclipse.hono.util.TenantObject;

public abstract class AbstractKafkaBasedDownstreamSender
extends AbstractKafkaBasedMessageSender {
    private final boolean isDefaultsEnabled;

    public AbstractKafkaBasedDownstreamSender(KafkaProducerFactory<String, Buffer> producerFactory, String producerName, KafkaProducerConfigProperties config, boolean includeDefaults, Tracer tracer) {
        super(producerFactory, producerName, config, tracer);
        this.isDefaultsEnabled = includeDefaults;
    }

    protected Future<Void> send(HonoTopic topic, TenantObject tenant, RegistrationAssertion device, QoS qos, String contentType, Buffer payload, Map<String, Object> properties, String spanOperationName, SpanContext context) {
        Objects.requireNonNull(topic);
        Objects.requireNonNull(tenant);
        Objects.requireNonNull(device);
        Objects.requireNonNull(qos);
        String tenantId = tenant.getTenantId();
        String deviceId = device.getDeviceId();
        this.log.trace("sending to Kafka [topic: {}, tenantId: {}, deviceId: {}, qos: {}, contentType: {}, properties: {}]", new Object[]{topic, tenantId, deviceId, qos, contentType, properties});
        Map<String, Object> propsWithDefaults = this.addDefaults(tenant, device, qos, contentType, properties);
        if (QoS.AT_LEAST_ONCE.equals((Object)qos)) {
            return this.sendAndWaitForOutcome(topic.toString(), tenantId, deviceId, payload, propsWithDefaults, spanOperationName, context);
        }
        this.send(topic.toString(), tenantId, deviceId, payload, propsWithDefaults, spanOperationName, context);
        return Future.succeededFuture();
    }

    private Map<String, Object> addDefaults(TenantObject tenant, RegistrationAssertion device, QoS qos, String contentType, Map<String, Object> properties) {
        HashMap<String, Object> headerProperties = new HashMap<String, Object>();
        if (this.isDefaultsEnabled) {
            headerProperties.putAll(tenant.getDefaults().copy().getMap());
            headerProperties.putAll(device.getDefaults());
        }
        Optional.ofNullable(properties).ifPresent(headerProperties::putAll);
        headerProperties.put("device_id", device.getDeviceId());
        headerProperties.put("qos", qos.ordinal());
        if (contentType != null) {
            headerProperties.put("content-type", contentType);
        }
        headerProperties.putIfAbsent("content-type", "application/octet-stream");
        if ((headerProperties.containsKey("ttd") || headerProperties.containsKey("ttl")) && !headerProperties.containsKey("creation-time")) {
            long timestamp = Instant.now().toEpochMilli();
            headerProperties.put("creation-time", timestamp);
        }
        headerProperties.compute("ttl", (k, v) -> {
            long ttlSeconds;
            long maxTtl = Optional.ofNullable(tenant).flatMap(t -> Optional.ofNullable(t.getResourceLimits())).map(ResourceLimits::getMaxTtl).orElse(-1L);
            if (v instanceof Number) {
                long ttl = ((Number)v).longValue();
                if (maxTtl != -1L && ttl > maxTtl) {
                    this.log.debug("limiting TTL [{}s] to max TTL [{}s]", (Object)ttl, (Object)maxTtl);
                    ttlSeconds = maxTtl;
                } else {
                    this.log.trace("keeping message TTL [{}s, max TTL: {}s]", (Object)ttl, (Object)maxTtl);
                    ttlSeconds = ttl;
                }
            } else if (maxTtl != -1L) {
                this.log.debug("setting TTL to tenant's max TTL [{}s]", (Object)maxTtl);
                ttlSeconds = maxTtl;
            } else {
                return null;
            }
            return ttlSeconds * 1000L;
        });
        return headerProperties;
    }
}

