/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.command.amqp;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.vertx.core.Future;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.NoConsumerException;
import org.eclipse.hono.client.amqp.AbstractServiceClient;
import org.eclipse.hono.client.amqp.connection.AmqpUtils;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.amqp.connection.HonoProtonHelper;
import org.eclipse.hono.client.amqp.connection.SendMessageSampler;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandHandlerWrapper;
import org.eclipse.hono.client.command.CommandHandlers;
import org.eclipse.hono.client.command.InternalCommandConsumer;
import org.eclipse.hono.client.command.amqp.ProtonBasedCommand;
import org.eclipse.hono.client.command.amqp.ProtonBasedCommandContext;
import org.eclipse.hono.tracing.TracingHelper;

public class ProtonBasedInternalCommandConsumer
extends AbstractServiceClient
implements InternalCommandConsumer {
    private static final int RECREATE_CONSUMER_DELAY = 20;
    private final String adapterInstanceId;
    private final CommandHandlers commandHandlers;
    private final AtomicBoolean recreatingConsumer = new AtomicBoolean(false);
    private final AtomicBoolean tryAgainRecreatingConsumer = new AtomicBoolean(false);
    private final Tracer tracer;
    private ProtonReceiver adapterSpecificConsumer;

    public ProtonBasedInternalCommandConsumer(HonoConnection connection, String adapterInstanceId, CommandHandlers commandHandlers) {
        super(connection, SendMessageSampler.Factory.noop());
        this.commandHandlers = Objects.requireNonNull(commandHandlers);
        this.adapterInstanceId = Objects.requireNonNull(adapterInstanceId);
        this.tracer = connection.getTracer();
    }

    public Future<Void> start() {
        return super.start().onComplete(v -> {
            this.connection.addReconnectListener(c -> this.recreateConsumer());
            this.recreateConsumer();
        });
    }

    protected void onDisconnect() {
        this.adapterSpecificConsumer = null;
    }

    private Future<ProtonReceiver> createAdapterSpecificConsumer() {
        this.log.trace("creating new adapter instance command consumer");
        String adapterInstanceConsumerAddress = "command_internal/" + this.adapterInstanceId;
        return this.connection.createReceiver(adapterInstanceConsumerAddress, ProtonQoS.AT_LEAST_ONCE, this::handleCommandMessage, this.connection.getConfig().getInitialCredits(), false, sourceAddress -> {
            this.log.debug("command receiver link closed remotely");
            this.invokeRecreateConsumerWithDelay();
        }).map(receiver -> {
            this.log.debug("successfully created adapter specific command consumer");
            this.adapterSpecificConsumer = receiver;
            return receiver;
        }).recover(t -> {
            this.log.error("failed to create adapter specific command consumer", t);
            return Future.failedFuture((Throwable)t);
        });
    }

    void handleCommandMessage(ProtonDelivery delivery, Message msg) {
        ProtonBasedCommand command;
        try {
            command = ProtonBasedCommand.fromRoutedCommandMessage(msg);
        }
        catch (IllegalArgumentException e) {
            this.log.debug("address of command message is invalid: {}", (Object)msg.getAddress());
            Rejected rejected = new Rejected();
            rejected.setError(new ErrorCondition(AmqpUtils.AMQP_BAD_REQUEST, "invalid command target address"));
            delivery.disposition((DeliveryState)rejected, true);
            return;
        }
        CommandHandlerWrapper commandHandler = this.commandHandlers.getCommandHandler(command.getTenant(), command.getGatewayOrDeviceId());
        if (commandHandler != null && commandHandler.getGatewayId() != null) {
            command.setGatewayId(commandHandler.getGatewayId());
        }
        SpanContext spanContext = AmqpUtils.extractSpanContext((Tracer)this.tracer, (Message)msg);
        SpanContext followsFromSpanContext = commandHandler != null ? commandHandler.getConsumerCreationSpanContext() : null;
        Span currentSpan = CommandContext.createSpan((Tracer)this.tracer, (Command)command, (SpanContext)spanContext, (SpanContext)followsFromSpanContext, (String)((Object)((Object)this)).getClass().getSimpleName());
        TracingHelper.TAG_ADAPTER_INSTANCE_ID.set(currentSpan, this.adapterInstanceId);
        ProtonBasedCommandContext commandContext = new ProtonBasedCommandContext(command, delivery, currentSpan);
        if (commandHandler != null) {
            this.log.trace("using [{}] for received command [{}]", (Object)commandHandler, (Object)command);
            commandHandler.handleCommand((CommandContext)commandContext);
        } else {
            this.log.info("no command handler found for command [{}]", (Object)command);
            commandContext.release((Throwable)new NoConsumerException("no command handler found for command"));
        }
    }

    private void recreateConsumer() {
        if (this.recreatingConsumer.compareAndSet(false, true)) {
            this.connection.isConnected(this.getDefaultConnectionCheckTimeout()).compose(res -> {
                if (!HonoProtonHelper.isLinkOpenAndConnected((ProtonLink)this.adapterSpecificConsumer)) {
                    this.log.debug("recreate adapter specific command consumer link");
                    return this.createAdapterSpecificConsumer();
                }
                return Future.succeededFuture();
            }).onComplete(ar -> {
                this.recreatingConsumer.set(false);
                if (this.tryAgainRecreatingConsumer.compareAndSet(true, false) || ar.failed()) {
                    if (ar.succeeded()) {
                        this.recreateConsumer();
                    } else {
                        this.invokeRecreateConsumerWithDelay();
                    }
                }
            });
        } else {
            this.log.debug("already recreating consumer");
            this.tryAgainRecreatingConsumer.set(true);
        }
    }

    private void invokeRecreateConsumerWithDelay() {
        this.connection.getVertx().setTimer(20L, tid -> this.recreateConsumer());
    }
}

