/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.commandrouter.impl.kafka;

import io.micrometer.core.instrument.Timer;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandResponseSender;
import org.eclipse.hono.client.command.InternalCommandSender;
import org.eclipse.hono.client.command.kafka.KafkaBasedCommand;
import org.eclipse.hono.client.command.kafka.KafkaBasedCommandContext;
import org.eclipse.hono.client.command.kafka.KafkaBasedCommandResponseSender;
import org.eclipse.hono.client.command.kafka.KafkaBasedInternalCommandSender;
import org.eclipse.hono.client.impl.CommandConsumer;
import org.eclipse.hono.client.kafka.tracing.KafkaTracingHelper;
import org.eclipse.hono.client.registry.TenantClient;
import org.eclipse.hono.commandrouter.CommandRouterMetrics;
import org.eclipse.hono.commandrouter.CommandTargetMapper;
import org.eclipse.hono.commandrouter.impl.AbstractMappingAndDelegatingCommandHandler;
import org.eclipse.hono.commandrouter.impl.kafka.KafkaCommandProcessingQueue;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Futures;
import org.eclipse.hono.util.MessagingType;
import org.eclipse.hono.util.TenantObject;

public class KafkaBasedMappingAndDelegatingCommandHandler
extends AbstractMappingAndDelegatingCommandHandler {
    private static final Duration PROCESSING_TIMEOUT = Duration.ofSeconds(8L);
    private final Vertx vertx;
    private final KafkaBasedCommandResponseSender kafkaBasedCommandResponseSender;
    private final Tracer tracer;
    private final KafkaCommandProcessingQueue commandQueue;

    public KafkaBasedMappingAndDelegatingCommandHandler(Vertx vertx, TenantClient tenantClient, KafkaCommandProcessingQueue commandQueue, CommandTargetMapper commandTargetMapper, KafkaBasedInternalCommandSender internalCommandSender, KafkaBasedCommandResponseSender kafkaBasedCommandResponseSender, CommandRouterMetrics metrics, Tracer tracer) {
        super(tenantClient, commandTargetMapper, (InternalCommandSender)internalCommandSender, metrics);
        this.vertx = Objects.requireNonNull(vertx);
        this.commandQueue = Objects.requireNonNull(commandQueue);
        this.kafkaBasedCommandResponseSender = Objects.requireNonNull(kafkaBasedCommandResponseSender);
        this.tracer = Objects.requireNonNull(tracer);
    }

    @Override
    protected final MessagingType getMessagingType() {
        return MessagingType.kafka;
    }

    @Override
    public Future<Void> stop() {
        this.commandQueue.setCurrentlyHandledPartitions(List.of());
        return super.stop();
    }

    public Future<Void> mapAndDelegateIncomingCommandMessage(KafkaConsumerRecord<String, Buffer> consumerRecord) {
        KafkaBasedCommand command;
        Objects.requireNonNull(consumerRecord);
        Timer.Sample timer = this.getMetrics().startTimer();
        try {
            command = KafkaBasedCommand.from(consumerRecord);
        }
        catch (IllegalArgumentException exception) {
            this.log.debug("command record is invalid", (Throwable)exception);
            return Future.failedFuture((String)"command record is invalid");
        }
        SpanContext spanContext = KafkaTracingHelper.extractSpanContext((Tracer)this.tracer, consumerRecord);
        Span currentSpan = CommandConsumer.createSpan((String)"map and delegate command", (String)command.getTenant(), (String)command.getDeviceId(), null, (Tracer)this.tracer, (SpanContext)spanContext);
        KafkaTracingHelper.setRecordTags((Span)currentSpan, consumerRecord);
        KafkaBasedCommandContext commandContext = new KafkaBasedCommandContext(command, (CommandResponseSender)this.kafkaBasedCommandResponseSender, currentSpan);
        command.logToSpan(currentSpan);
        if (!command.isValid()) {
            this.log.debug("received invalid command record [{}]", (Object)command);
            return this.tenantClient.get(command.getTenant(), currentSpan.context()).compose(tenantConfig -> {
                commandContext.put("tenant-config", tenantConfig);
                return Future.failedFuture((String)"command is invalid");
            }).onComplete(ar -> {
                commandContext.reject("malformed command message");
                this.reportInvalidCommand((CommandContext)commandContext, timer);
            }).mapEmpty();
        }
        this.log.trace("received valid command record [{}]", (Object)command);
        this.commandQueue.add(commandContext);
        Promise resultPromise = Promise.promise();
        long timerId = this.vertx.setTimer(PROCESSING_TIMEOUT.toMillis(), tid -> {
            if (this.commandQueue.remove(commandContext) || !commandContext.isCompleted()) {
                this.log.info("command processing timed out after {}s [{}]", (Object)PROCESSING_TIMEOUT.toSeconds(), (Object)commandContext.getCommand());
                TracingHelper.logError((Span)commandContext.getTracingSpan(), (String)String.format("command processing timed out after %ds", PROCESSING_TIMEOUT.toSeconds()));
                ServerErrorException error = new ServerErrorException(503, "command processing timed out");
                commandContext.release((Throwable)error);
                resultPromise.tryFail((Throwable)error);
            }
        });
        this.mapAndDelegateIncomingCommand((CommandContext)commandContext, timer).onComplete(ar -> {
            this.vertx.cancelTimer(timerId);
            if (ar.failed()) {
                this.commandQueue.remove(commandContext);
            }
            Futures.tryHandleResult((Promise)resultPromise, (AsyncResult)ar);
        });
        return resultPromise.future();
    }

    @Override
    protected Future<Void> sendCommand(CommandContext commandContext, String targetAdapterInstanceId, TenantObject tenantObject, Timer.Sample timer) {
        KafkaBasedCommandContext cmdContext = (KafkaBasedCommandContext)commandContext;
        return this.commandQueue.applySendCommandAction(cmdContext, () -> super.sendCommand(commandContext, targetAdapterInstanceId, tenantObject, timer));
    }
}

