/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.command.kafka;

import io.opentracing.Span;
import io.opentracing.Tracer;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.producer.KafkaHeader;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.InternalCommandSender;
import org.eclipse.hono.client.command.kafka.KafkaBasedCommand;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.client.kafka.producer.AbstractKafkaBasedMessageSender;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;

public class KafkaBasedInternalCommandSender
extends AbstractKafkaBasedMessageSender
implements InternalCommandSender {
    public KafkaBasedInternalCommandSender(KafkaProducerFactory<String, Buffer> producerFactory, MessagingKafkaProducerConfigProperties producerConfig, Tracer tracer) {
        super(producerFactory, "internal-cmd-sender", producerConfig, tracer);
    }

    public Future<Void> sendCommand(CommandContext commandContext, String adapterInstanceId) {
        Objects.requireNonNull(commandContext);
        Objects.requireNonNull(adapterInstanceId);
        Command command = commandContext.getCommand();
        if (!(command instanceof KafkaBasedCommand)) {
            commandContext.release();
            this.log.error("command is not an instance of KafkaBasedCommand");
            throw new IllegalArgumentException("command is not an instance of KafkaBasedCommand");
        }
        String topicName = KafkaBasedInternalCommandSender.getInternalCommandTopic(adapterInstanceId);
        Span currentSpan = this.startChildSpan("delegate Command request", topicName, command.getTenant(), command.getDeviceId(), commandContext.getTracingContext());
        return this.sendAndWaitForOutcome(topicName, command.getTenant(), command.getDeviceId(), command.getPayload(), KafkaBasedInternalCommandSender.getHeaders((KafkaBasedCommand)command), currentSpan).onSuccess(v -> commandContext.accept()).onFailure(thr -> commandContext.release((Throwable)new ServerErrorException(command.getTenant(), 503, "failed to publish command message on internal command topic", thr))).onComplete(ar -> currentSpan.finish());
    }

    private static String getInternalCommandTopic(String adapterInstanceId) {
        return new HonoTopic(HonoTopic.Type.COMMAND_INTERNAL, adapterInstanceId).toString();
    }

    private static List<KafkaHeader> getHeaders(KafkaBasedCommand command) {
        ArrayList<KafkaHeader> headers = new ArrayList<KafkaHeader>(command.getRecord().headers());
        headers.add(KafkaRecordHelper.createTenantIdHeader((String)command.getTenant()));
        Optional.ofNullable(command.getGatewayId()).ifPresent(id -> headers.add(KafkaRecordHelper.createViaHeader((String)id)));
        headers.add(KafkaRecordHelper.createOriginalPartitionHeader((int)command.getRecord().partition()));
        headers.add(KafkaRecordHelper.createOriginalOffsetHeader((long)command.getRecord().offset()));
        return headers;
    }
}

