/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.command.kafka;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.producer.KafkaProducer;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;
import org.eclipse.hono.client.command.CommandResponse;
import org.eclipse.hono.client.command.CommandResponseSender;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.producer.AbstractKafkaBasedMessageSender;
import org.eclipse.hono.client.kafka.producer.KafkaProducerConfigProperties;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.KafkaProducerHelper;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.client.util.DownstreamMessageProperties;
import org.eclipse.hono.notification.NotificationEventBusSupport;
import org.eclipse.hono.notification.NotificationType;
import org.eclipse.hono.notification.deviceregistry.LifecycleChange;
import org.eclipse.hono.notification.deviceregistry.TenantChangeNotification;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.TenantObject;

public class KafkaBasedCommandResponseSender
extends AbstractKafkaBasedMessageSender<Buffer>
implements CommandResponseSender {
    public KafkaBasedCommandResponseSender(Vertx vertx, KafkaProducerFactory<String, Buffer> producerFactory, MessagingKafkaProducerConfigProperties producerConfig, Tracer tracer) {
        super(producerFactory, "command_response", (KafkaProducerConfigProperties)producerConfig, tracer);
        NotificationEventBusSupport.registerConsumer((Vertx)vertx, (NotificationType)TenantChangeNotification.TYPE, notification -> {
            if (LifecycleChange.DELETE.equals((Object)notification.getChange())) {
                producerFactory.getProducer("command_response").ifPresent(producer -> this.removeTenantTopicBasedProducerMetrics((KafkaProducer<String, Buffer>)producer, notification.getTenantId()));
            }
        });
    }

    private void removeTenantTopicBasedProducerMetrics(KafkaProducer<String, Buffer> producer, String tenantId) {
        HonoTopic topic = new HonoTopic(HonoTopic.Type.COMMAND_RESPONSE, tenantId);
        KafkaProducerHelper.removeTopicMetrics(producer, Stream.of(topic.toString()));
    }

    public Future<Void> sendCommandResponse(TenantObject tenant, RegistrationAssertion device, CommandResponse response, SpanContext context) {
        Objects.requireNonNull(tenant);
        Objects.requireNonNull(device);
        Objects.requireNonNull(response);
        if (this.log.isTraceEnabled()) {
            this.log.trace("publish command response [{}]", (Object)response);
        }
        String topic = new HonoTopic(HonoTopic.Type.COMMAND_RESPONSE, response.getTenantId()).toString();
        Span span = this.startChildSpan("forward Command response", topic, response.getTenantId(), response.getDeviceId(), context);
        if (response.getMessagingType() != this.getMessagingType()) {
            span.log(String.format("using messaging type %s instead of type %s used for the original command", this.getMessagingType(), response.getMessagingType()));
        }
        return this.sendAndWaitForOutcome(topic, response.getTenantId(), response.getDeviceId(), response.getPayload(), this.getHeaders(response, tenant, device), span).onComplete(ar -> span.finish());
    }

    private Map<String, Object> getHeaders(CommandResponse response, TenantObject tenant, RegistrationAssertion device) {
        Map headers = new DownstreamMessageProperties("command_response", tenant.getDefaults().getMap(), device.getDefaults(), response.getAdditionalProperties(), tenant.getResourceLimits()).asMap();
        headers.put("correlation-id", response.getCorrelationId());
        headers.put("tenant_id", response.getTenantId());
        headers.put("device_id", response.getDeviceId());
        headers.put("status", response.getStatus());
        if (response.getContentType() != null) {
            headers.put("content-type", response.getContentType());
        } else if (response.getPayload() != null) {
            headers.put("content-type", "application/octet-stream");
        }
        return headers;
    }
}

