/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.adapter.amqp;

import io.micrometer.core.instrument.Timer;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.ProtonServer;
import io.vertx.proton.ProtonServerOptions;
import io.vertx.proton.ProtonSession;
import io.vertx.proton.sasl.ProtonSaslAuthenticatorFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.Source;
import org.apache.qpid.proton.amqp.transport.Target;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.adapter.AbstractProtocolAdapterBase;
import org.eclipse.hono.adapter.AdapterConnectionsExceededException;
import org.eclipse.hono.adapter.AdapterDisabledException;
import org.eclipse.hono.adapter.AuthorizationException;
import org.eclipse.hono.adapter.ProtocolAdapterProperties;
import org.eclipse.hono.adapter.TelemetryExecutionContext;
import org.eclipse.hono.adapter.amqp.AmqpAdapterMetrics;
import org.eclipse.hono.adapter.amqp.AmqpAdapterProperties;
import org.eclipse.hono.adapter.amqp.AmqpAdapterSaslAuthenticatorFactory;
import org.eclipse.hono.adapter.amqp.AmqpContext;
import org.eclipse.hono.adapter.amqp.SaslExternalAuthHandler;
import org.eclipse.hono.adapter.amqp.SaslPlainAuthHandler;
import org.eclipse.hono.adapter.amqp.SaslResponseContext;
import org.eclipse.hono.adapter.auth.device.CredentialsApiAuthProvider;
import org.eclipse.hono.adapter.auth.device.DeviceCredentials;
import org.eclipse.hono.adapter.auth.device.DeviceCredentialsAuthProvider;
import org.eclipse.hono.adapter.auth.device.PreCredentialsValidationHandler;
import org.eclipse.hono.adapter.auth.device.usernamepassword.UsernamePasswordAuthProvider;
import org.eclipse.hono.adapter.auth.device.usernamepassword.UsernamePasswordCredentials;
import org.eclipse.hono.adapter.auth.device.x509.SubjectDnCredentials;
import org.eclipse.hono.adapter.auth.device.x509.TenantServiceBasedX509Authentication;
import org.eclipse.hono.adapter.auth.device.x509.X509AuthProvider;
import org.eclipse.hono.adapter.auth.device.x509.X509Authentication;
import org.eclipse.hono.adapter.limiting.ConnectionLimitManager;
import org.eclipse.hono.adapter.limiting.ConnectionLimitStrategy;
import org.eclipse.hono.adapter.limiting.DefaultConnectionLimitManager;
import org.eclipse.hono.adapter.limiting.MemoryBasedConnectionLimitStrategy;
import org.eclipse.hono.auth.Device;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.amqp.connection.AmqpUtils;
import org.eclipse.hono.client.amqp.connection.HonoProtonHelper;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandResponse;
import org.eclipse.hono.client.command.Commands;
import org.eclipse.hono.client.command.ProtocolAdapterCommandConsumer;
import org.eclipse.hono.notification.NotificationEventBusSupport;
import org.eclipse.hono.notification.NotificationType;
import org.eclipse.hono.notification.deviceregistry.AllDevicesOfTenantDeletedNotification;
import org.eclipse.hono.notification.deviceregistry.DeviceChangeNotification;
import org.eclipse.hono.notification.deviceregistry.LifecycleChange;
import org.eclipse.hono.notification.deviceregistry.TenantChangeNotification;
import org.eclipse.hono.service.auth.DeviceUser;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.tracing.TenantTraceSamplingHelper;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.CommandConstants;
import org.eclipse.hono.util.MessagingType;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.ResourceIdentifier;
import org.eclipse.hono.util.Strings;
import org.eclipse.hono.util.TenantObject;

public final class VertxBasedAmqpProtocolAdapter
extends AbstractProtocolAdapterBase<AmqpAdapterProperties> {
    private static final String KEY_COMMAND_SUBSCRIPTIONS_MAP = "commandSubscriptions";
    private static final int MINIMAL_MEMORY_JVM = 100000000;
    private static final int MINIMAL_MEMORY_SUBSTRATE = 35000000;
    private static final int MEMORY_PER_CONNECTION = 20000;
    private final AtomicReference<Promise<Void>> stopResultPromiseRef = new AtomicReference();
    private final Map<ProtonConnection, DeviceUser> authenticatedDeviceConnections = new HashMap<ProtonConnection, DeviceUser>();
    private ProtonServer secureServer;
    private ProtonServer insecureServer;
    private ProtonSaslAuthenticatorFactory authenticatorFactory;
    private AmqpAdapterMetrics metrics = AmqpAdapterMetrics.NOOP;

    public String getTypeName() {
        return "hono-amqp";
    }

    public void setMetrics(AmqpAdapterMetrics metrics) {
        Optional.ofNullable(metrics).ifPresent(m -> this.log.info("reporting metrics using [{}]", (Object)metrics.getClass().getName()));
        this.metrics = metrics;
    }

    protected void doStart(Promise<Void> startPromise) {
        this.registerDeviceAndTenantChangeNotificationConsumers();
        if (this.getConnectionLimitManager() == null) {
            this.setConnectionLimitManager(this.createConnectionLimitManager());
        }
        this.checkPortConfiguration().compose(success -> {
            if (this.authenticatorFactory == null && ((AmqpAdapterProperties)((Object)((Object)this.getConfig()))).isAuthenticationRequired()) {
                this.authenticatorFactory = new AmqpAdapterSaslAuthenticatorFactory(this.metrics, () -> this.tracer.buildSpan("open connection").ignoreActiveSpan().withTag(Tags.SPAN_KIND.getKey(), "server").withTag(Tags.COMPONENT.getKey(), this.getTypeName()).start(), new SaslPlainAuthHandler((DeviceCredentialsAuthProvider<UsernamePasswordCredentials>)new UsernamePasswordAuthProvider(this.getCredentialsClient(), this.tracer), (PreCredentialsValidationHandler<SaslResponseContext>)((PreCredentialsValidationHandler)this::handleBeforeCredentialsValidation)), new SaslExternalAuthHandler((X509Authentication)new TenantServiceBasedX509Authentication(this.getTenantClient(), this.tracer), (DeviceCredentialsAuthProvider<SubjectDnCredentials>)new X509AuthProvider(this.getCredentialsClient(), this.tracer), (PreCredentialsValidationHandler<SaslResponseContext>)((PreCredentialsValidationHandler)this::handleBeforeCredentialsValidation)));
            }
            return Future.succeededFuture();
        }).compose(success -> CompositeFuture.all(this.bindSecureServer(), this.bindInsecureServer())).map(ok -> null).onComplete(startPromise);
    }

    private void registerDeviceAndTenantChangeNotificationConsumers() {
        NotificationEventBusSupport.registerConsumer((Vertx)this.vertx, (NotificationType)DeviceChangeNotification.TYPE, notification -> {
            if (LifecycleChange.DELETE.equals((Object)notification.getChange()) || LifecycleChange.UPDATE.equals((Object)notification.getChange()) && !notification.isDeviceEnabled()) {
                String reason = LifecycleChange.DELETE.equals((Object)notification.getChange()) ? "device deleted" : "device disabled";
                this.closeDeviceConnections(connectedDevice -> connectedDevice.getTenantId().equals(notification.getTenantId()) && connectedDevice.getDeviceId().equals(notification.getDeviceId()), reason);
            }
        });
        NotificationEventBusSupport.registerConsumer((Vertx)this.vertx, (NotificationType)AllDevicesOfTenantDeletedNotification.TYPE, notification -> this.closeDeviceConnections(connectedDevice -> connectedDevice.getTenantId().equals(notification.getTenantId()), "all devices of tenant deleted"));
        NotificationEventBusSupport.registerConsumer((Vertx)this.vertx, (NotificationType)TenantChangeNotification.TYPE, notification -> {
            if (LifecycleChange.DELETE.equals((Object)notification.getChange()) || LifecycleChange.UPDATE.equals((Object)notification.getChange()) && !notification.isTenantEnabled()) {
                String reason = LifecycleChange.DELETE.equals((Object)notification.getChange()) ? "tenant deleted" : "tenant disabled";
                this.closeDeviceConnections(connectedDevice -> connectedDevice.getTenantId().equals(notification.getTenantId()), reason);
            }
        });
    }

    private void closeDeviceConnections(Predicate<DeviceUser> deviceMatchPredicate, String reason) {
        List<ProtonConnection> deviceConnections = this.authenticatedDeviceConnections.entrySet().stream().filter(entry -> deviceMatchPredicate.test((DeviceUser)entry.getValue())).map(Map.Entry::getKey).collect(Collectors.toList());
        deviceConnections.forEach(con -> this.closeDeviceConnection((ProtonConnection)con, reason, false));
    }

    private Future<Void> handleBeforeCredentialsValidation(DeviceCredentials credentials, SaslResponseContext executionContext) {
        String tenantId = credentials.getTenantId();
        Span span = executionContext.getTracingSpan();
        String authId = credentials.getAuthId();
        return this.getTenantConfiguration(tenantId, span.context()).recover(t -> Future.failedFuture((Throwable)CredentialsApiAuthProvider.mapNotFoundToBadCredentialsException((Throwable)t))).compose(tenantObject -> {
            TracingHelper.setDeviceTags((Span)span, (String)tenantId, null, (String)authId);
            OptionalInt traceSamplingPriority = TenantTraceSamplingHelper.applyTraceSamplingPriority((TenantObject)tenantObject, (String)authId, (Span)span);
            executionContext.getProtonConnection().attachments().set((Object)"TRACE_SAMPLING_PRIORITY", OptionalInt.class, (Object)traceSamplingPriority);
            return Future.succeededFuture();
        });
    }

    private ConnectionLimitManager createConnectionLimitManager() {
        return new DefaultConnectionLimitManager((ConnectionLimitStrategy)MemoryBasedConnectionLimitStrategy.forParams((long)(((AmqpAdapterProperties)((Object)this.getConfig())).isSubstrateVm() ? 35000000L : 100000000L), (long)(20000L + (long)((AmqpAdapterProperties)((Object)this.getConfig())).getMaxSessionWindowSize()), (int)((AmqpAdapterProperties)((Object)this.getConfig())).getGcHeapPercentage()), () -> ((AmqpAdapterMetrics)this.metrics).getNumberOfConnections(), (ProtocolAdapterProperties)this.getConfig());
    }

    protected void doStop(Promise<Void> stopPromise) {
        if (!this.stopResultPromiseRef.compareAndSet(null, stopPromise)) {
            this.stopResultPromiseRef.get().future().onComplete(stopPromise);
            this.log.trace("stop already called");
            return;
        }
        CompositeFuture.all(this.stopSecureServer(), this.stopInsecureServer()).map(ok -> null).onComplete(ar -> this.log.info("AMQP server(s) closed")).onComplete(stopPromise);
    }

    private boolean stopCalled() {
        return this.stopResultPromiseRef.get() != null;
    }

    private Future<Void> stopInsecureServer() {
        if (this.insecureServer != null) {
            Promise result = Promise.promise();
            this.log.info("closing insecure AMQP server ...");
            this.insecureServer.close((Handler)result);
            return result.future();
        }
        return Future.succeededFuture();
    }

    private Future<Void> stopSecureServer() {
        if (this.secureServer != null) {
            Promise result = Promise.promise();
            this.log.info("closing secure AMQP server ...");
            this.secureServer.close((Handler)result);
            return result.future();
        }
        return Future.succeededFuture();
    }

    private Future<Void> bindInsecureServer() {
        if (this.isInsecurePortEnabled()) {
            ProtonServerOptions options = new ProtonServerOptions().setHost(((AmqpAdapterProperties)((Object)this.getConfig())).getInsecurePortBindAddress()).setPort(this.determineInsecurePort()).setMaxFrameSize(((AmqpAdapterProperties)((Object)this.getConfig())).getMaxFrameSize()).setHeartbeat(((AmqpAdapterProperties)((Object)this.getConfig())).getIdleTimeout() >> 1);
            Promise result = Promise.promise();
            this.insecureServer = this.createServer(this.insecureServer, options);
            this.insecureServer.connectHandler(this::onConnectRequest).listen(ar -> {
                if (ar.succeeded()) {
                    this.log.info("insecure AMQP server listening on [{}:{}]", (Object)((AmqpAdapterProperties)((Object)((Object)this.getConfig()))).getInsecurePortBindAddress(), (Object)this.getActualInsecurePort());
                    result.complete();
                } else {
                    result.fail(ar.cause());
                }
            });
            return result.future();
        }
        return Future.succeededFuture();
    }

    private Future<Void> bindSecureServer() {
        if (this.isSecurePortEnabled()) {
            ProtonServerOptions options = new ProtonServerOptions().setHost(((AmqpAdapterProperties)((Object)this.getConfig())).getBindAddress()).setPort(this.determineSecurePort()).setMaxFrameSize(((AmqpAdapterProperties)((Object)this.getConfig())).getMaxFrameSize()).setHeartbeat(((AmqpAdapterProperties)((Object)this.getConfig())).getIdleTimeout() >> 1);
            this.addTlsKeyCertOptions((NetServerOptions)options);
            this.addTlsTrustOptions((NetServerOptions)options);
            Promise result = Promise.promise();
            this.secureServer = this.createServer(this.secureServer, options);
            this.secureServer.connectHandler(this::onConnectRequest).listen(ar -> {
                if (ar.succeeded()) {
                    this.log.info("secure AMQP server listening on {}:{}", (Object)((AmqpAdapterProperties)((Object)((Object)this.getConfig()))).getBindAddress(), (Object)this.getActualPort());
                    result.complete();
                } else {
                    this.log.error("cannot bind to secure port", ar.cause());
                    result.fail(ar.cause());
                }
            });
            return result.future();
        }
        return Future.succeededFuture();
    }

    private ProtonServer createServer(ProtonServer server, ProtonServerOptions options) {
        ProtonServer createdServer;
        ProtonServer protonServer = createdServer = server != null ? server : ProtonServer.create((Vertx)this.vertx, (ProtonServerOptions)options);
        if (((AmqpAdapterProperties)((Object)this.getConfig())).isAuthenticationRequired()) {
            createdServer.saslAuthenticatorFactory(this.authenticatorFactory);
        } else {
            createdServer.saslAuthenticatorFactory(null);
        }
        return createdServer;
    }

    void onConnectRequest(ProtonConnection con) {
        con.disconnectHandler(lostConnection -> {
            this.log.debug("lost connection to device [container: {}]", (Object)con.getRemoteContainer());
            this.onConnectionLoss(con);
        });
        con.closeHandler(remoteClose -> {
            this.handleRemoteConnectionClose(con, (AsyncResult<ProtonConnection>)remoteClose);
            this.onConnectionLoss(con);
        });
        con.sessionOpenHandler(session -> {
            HonoProtonHelper.setDefaultCloseHandler((ProtonSession)session);
            this.handleSessionOpen(con, (ProtonSession)session);
        });
        con.receiverOpenHandler(receiver -> {
            HonoProtonHelper.setDefaultCloseHandler((ProtonLink)receiver);
            receiver.setMaxMessageSize(UnsignedLong.valueOf((long)((AmqpAdapterProperties)((Object)((Object)this.getConfig()))).getMaxPayloadSize()));
            this.handleRemoteReceiverOpen(con, (ProtonReceiver)receiver);
        });
        con.senderOpenHandler(sender -> this.handleRemoteSenderOpenForCommands(con, (ProtonSender)sender));
        con.openHandler(remoteOpen -> {
            DeviceUser authenticatedDevice = VertxBasedAmqpProtocolAdapter.getAuthenticatedDevice(con);
            if (authenticatedDevice == null) {
                this.metrics.incrementUnauthenticatedConnections();
            } else {
                this.metrics.incrementConnections(authenticatedDevice.getTenantId());
            }
            if (remoteOpen.failed()) {
                this.log.debug("ignoring device's open frame containing error", remoteOpen.cause());
            } else {
                this.processRemoteOpen((ProtonConnection)remoteOpen.result());
            }
        });
    }

    private void closeDeviceConnection(ProtonConnection con, String reason, boolean sendDisconnectedEvent) {
        DeviceUser authenticatedDevice = VertxBasedAmqpProtocolAdapter.getAuthenticatedDevice(con);
        Span span = this.newSpan("close device connection", (Device)authenticatedDevice, VertxBasedAmqpProtocolAdapter.getTraceSamplingPriority(con));
        HashMap<String, String> logFields = new HashMap<String, String>(2);
        logFields.put("event", "closing device connection");
        Optional.ofNullable(reason).ifPresent(r -> logFields.put("reason", (String)r));
        span.log(logFields);
        this.log.debug("closing device connection [container: {}, {}]; reason: {}", new Object[]{con.getRemoteContainer(), authenticatedDevice, reason});
        con.closeHandler(connection -> {});
        con.disconnectHandler(connection -> {});
        con.close();
        con.disconnect();
        this.handleConnectionLossInternal(con, span, sendDisconnectedEvent, true).onComplete(ar -> span.finish());
    }

    void onConnectionLoss(ProtonConnection con) {
        boolean closeCommandConsumers;
        String spanOperationName;
        if (this.stopCalled()) {
            spanOperationName = "close device connection (server shutdown)";
            closeCommandConsumers = false;
        } else {
            spanOperationName = "handle closing of connection";
            closeCommandConsumers = true;
        }
        DeviceUser authenticatedDevice = VertxBasedAmqpProtocolAdapter.getAuthenticatedDevice(con);
        Span span = this.newSpan(spanOperationName, (Device)authenticatedDevice, VertxBasedAmqpProtocolAdapter.getTraceSamplingPriority(con));
        this.handleConnectionLossInternal(con, span, true, closeCommandConsumers).onComplete(ar -> span.finish());
    }

    private Future<Void> handleConnectionLossInternal(ProtonConnection con, Span span, boolean sendDisconnectedEvent, boolean closeCommandConsumers) {
        this.authenticatedDeviceConnections.remove(con);
        List handlerResults = closeCommandConsumers ? VertxBasedAmqpProtocolAdapter.getCommandSubscriptions(con).stream().map(commandSubscription -> this.closeCommandConsumer(commandSubscription.getConsumer(), commandSubscription.getAddress(), sendDisconnectedEvent, span)).collect(Collectors.toList()) : Collections.emptyList();
        this.decrementConnectionCount(con, span.context(), sendDisconnectedEvent);
        return CompositeFuture.join(handlerResults).recover(thr -> {
            Tags.ERROR.set(span, Boolean.valueOf(true));
            return Future.failedFuture((Throwable)thr);
        }).mapEmpty();
    }

    private void processRemoteOpen(ProtonConnection con) {
        Span span = Optional.ofNullable((Span)con.attachments().get((Object)"CURRENT_SPAN", Span.class)).orElseGet(() -> this.tracer.buildSpan("open connection").ignoreActiveSpan().withTag(Tags.SPAN_KIND.getKey(), "server").withTag(Tags.COMPONENT.getKey(), this.getTypeName()).start());
        DeviceUser authenticatedDevice = VertxBasedAmqpProtocolAdapter.getAuthenticatedDevice(con);
        TracingHelper.TAG_AUTHENTICATED.set(span, Boolean.valueOf(authenticatedDevice != null));
        if (authenticatedDevice != null) {
            TracingHelper.setDeviceTags((Span)span, (String)authenticatedDevice.getTenantId(), (String)authenticatedDevice.getDeviceId());
        }
        String cipherSuite = (String)con.attachments().get((Object)"TLS_CIPHER_SUITE", String.class);
        this.checkConnectionLimitForAdapter().compose(ok -> this.checkAuthorizationAndResourceLimits(authenticatedDevice, con, span)).compose(ok -> this.sendConnectedEvent(Optional.ofNullable(con.getRemoteContainer()).orElse("unknown"), (Device)authenticatedDevice, span.context())).map(ok -> {
            con.setContainer(this.getTypeName());
            con.setOfferedCapabilities(new Symbol[]{AmqpUtils.CAP_ANONYMOUS_RELAY});
            con.open();
            this.log.debug("connection with device [container: {}] established", (Object)con.getRemoteContainer());
            span.log("connection established");
            Optional.ofNullable(authenticatedDevice).ifPresent(device -> this.authenticatedDeviceConnections.put(con, (DeviceUser)device));
            this.metrics.reportConnectionAttempt(MetricsTags.ConnectionAttemptOutcome.SUCCEEDED, Optional.ofNullable(authenticatedDevice).map(Device::getTenantId).orElse(null), cipherSuite);
            return null;
        }).otherwise(t -> {
            con.setCondition(VertxBasedAmqpProtocolAdapter.getErrorCondition(t));
            con.close();
            TracingHelper.logError((Span)span, (Throwable)t);
            this.metrics.reportConnectionAttempt(AbstractProtocolAdapterBase.getOutcome((Throwable)t), Optional.ofNullable(authenticatedDevice).map(Device::getTenantId).orElse(null), cipherSuite);
            return null;
        }).onComplete(s -> span.finish());
    }

    private Future<Void> checkAuthorizationAndResourceLimits(DeviceUser authenticatedDevice, ProtonConnection con, Span span) {
        Promise connectAuthorizationCheck = Promise.promise();
        if (((AmqpAdapterProperties)((Object)this.getConfig())).isAuthenticationRequired()) {
            if (authenticatedDevice == null) {
                connectAuthorizationCheck.fail((Throwable)new ClientErrorException(401, "anonymous devices not supported"));
            } else {
                this.log.trace("received connection request from {}", (Object)authenticatedDevice);
                CompositeFuture.all((Future)this.checkDeviceRegistration((Device)authenticatedDevice, span.context()), (Future)this.getTenantConfiguration(authenticatedDevice.getTenantId(), span.context()).compose(tenantConfig -> CompositeFuture.all((Future)this.isAdapterEnabled((TenantObject)tenantConfig), (Future)this.checkConnectionLimit((TenantObject)tenantConfig, span.context())))).map(ok -> {
                    this.log.debug("{} is registered and enabled", (Object)authenticatedDevice);
                    span.log(String.format("device [%s] is registered and enabled", authenticatedDevice));
                    return null;
                }).onComplete((Handler)connectAuthorizationCheck);
            }
        } else {
            this.log.trace("received connection request from anonymous device [container: {}]", (Object)con.getRemoteContainer());
            connectAuthorizationCheck.complete();
        }
        return connectAuthorizationCheck.future();
    }

    void setInsecureAmqpServer(ProtonServer server) {
        Objects.requireNonNull(server);
        if (server.actualPort() > 0) {
            throw new IllegalStateException("AMQP Server should not be running");
        }
        this.insecureServer = server;
    }

    void setSaslAuthenticatorFactory(ProtonSaslAuthenticatorFactory authFactory) {
        this.authenticatorFactory = Objects.requireNonNull(authFactory, "authFactory must not be null");
    }

    private void handleSessionOpen(ProtonConnection conn, ProtonSession session) {
        this.log.debug("opening new session with client [container: {}, session window size: {}]", (Object)conn.getRemoteContainer(), (Object)((AmqpAdapterProperties)((Object)this.getConfig())).getMaxSessionWindowSize());
        session.setIncomingCapacity(((AmqpAdapterProperties)((Object)this.getConfig())).getMaxSessionWindowSize());
        session.open();
    }

    private void handleRemoteConnectionClose(ProtonConnection con, AsyncResult<ProtonConnection> res) {
        if (res.succeeded()) {
            this.log.debug("client [container: {}] closed connection", (Object)con.getRemoteContainer());
        } else {
            this.log.debug("client [container: {}] closed connection with error", (Object)con.getRemoteContainer(), (Object)res.cause());
        }
        con.disconnectHandler(null);
        con.close();
        con.disconnect();
    }

    private void decrementConnectionCount(ProtonConnection con, SpanContext context, boolean sendDisconnectedEvent) {
        DeviceUser device = VertxBasedAmqpProtocolAdapter.getAuthenticatedDevice(con);
        if (device == null) {
            this.metrics.decrementUnauthenticatedConnections();
        } else {
            this.metrics.decrementConnections(device.getTenantId());
        }
        if (sendDisconnectedEvent) {
            this.sendDisconnectedEvent(Optional.ofNullable(con.getRemoteContainer()).orElse("unknown"), (Device)device, context);
        }
    }

    void handleRemoteReceiverOpen(ProtonConnection conn, ProtonReceiver receiver) {
        DeviceUser authenticatedDevice = VertxBasedAmqpProtocolAdapter.getAuthenticatedDevice(conn);
        OptionalInt traceSamplingPriority = VertxBasedAmqpProtocolAdapter.getTraceSamplingPriority(conn);
        Span span = this.newSpan("attach device sender link", (Device)authenticatedDevice, traceSamplingPriority);
        span.log(Map.of("snd-settle-mode", receiver.getRemoteQoS()));
        String remoteTargetAddress = Optional.ofNullable(receiver.getRemoteTarget()).map(Target::getAddress).orElse(null);
        if (!Strings.isNullOrEmpty((Object)remoteTargetAddress)) {
            this.log.debug("client provided target address [{}] in open frame, closing link [container: {}, {}]", new Object[]{remoteTargetAddress, conn.getRemoteContainer(), authenticatedDevice});
            span.log(Map.of("target address", remoteTargetAddress));
            ClientErrorException ex = new ClientErrorException(400, "container supports anonymous terminus only");
            this.closeLinkWithError((ProtonLink)receiver, (Throwable)ex, span);
        } else {
            receiver.setTarget(receiver.getRemoteTarget());
            receiver.setSource(receiver.getRemoteSource());
            receiver.setQoS(receiver.getRemoteQoS());
            receiver.setPrefetch(30);
            receiver.setAutoAccept(false);
            receiver.maxMessageSizeExceededHandler(recv -> {
                Span errorSpan = this.newSpan("upload message", (Device)authenticatedDevice, traceSamplingPriority);
                this.log.debug("incoming message size exceeds configured maximum of {} bytes; link will be detached [container: {}, {}]", new Object[]{((AmqpAdapterProperties)((Object)((Object)this.getConfig()))).getMaxPayloadSize(), conn.getRemoteContainer(), authenticatedDevice});
                TracingHelper.logError((Span)errorSpan, (String)String.format("incoming message size exceeds configured maximum of %s bytes", ((AmqpAdapterProperties)((Object)((Object)this.getConfig()))).getMaxPayloadSize()));
                errorSpan.log("device sender link will be detached");
                errorSpan.finish();
            });
            HonoProtonHelper.setCloseHandler((ProtonLink)receiver, remoteDetach -> this.onLinkDetach((ProtonLink)receiver));
            HonoProtonHelper.setDetachHandler((ProtonLink)receiver, remoteDetach -> this.onLinkDetach((ProtonLink)receiver));
            receiver.handler((delivery, message) -> {
                block2: {
                    try {
                        SpanContext spanContext = AmqpUtils.extractSpanContext((Tracer)this.tracer, (Message)message);
                        Span msgSpan = this.newSpan("upload message", (Device)authenticatedDevice, traceSamplingPriority, spanContext);
                        HonoProtonHelper.onReceivedMessageDeliveryUpdatedFromRemote((ProtonDelivery)delivery, d -> {
                            this.log.debug("got unexpected disposition update for message received from device [remote state: {}, container: {}, {}]", new Object[]{delivery.getRemoteState(), conn.getRemoteContainer(), authenticatedDevice});
                            msgSpan.log("got unexpected disposition from device [remote state: " + delivery.getRemoteState() + "]");
                        });
                        msgSpan.log(Map.of(Tags.MESSAGE_BUS_DESTINATION.getKey(), message.getAddress(), "settled", delivery.remotelySettled()));
                        AmqpContext ctx = AmqpContext.fromMessage(delivery, message, msgSpan, authenticatedDevice);
                        ctx.setTimer(this.metrics.startTimer());
                        Future<Void> spanPreparationFuture = authenticatedDevice == null ? this.applyTraceSamplingPriorityForAddressTenant(ctx.getAddress(), msgSpan) : Future.succeededFuture();
                        spanPreparationFuture.compose(ar -> this.onMessageReceived(ctx).onSuccess(ok -> msgSpan.finish()).onFailure(error -> this.closeConnectionOnTerminalError((Throwable)error, conn, ctx, msgSpan)));
                    }
                    catch (Exception ex) {
                        this.log.warn("error handling message [container: {}, {}]", new Object[]{conn.getRemoteContainer(), authenticatedDevice, ex});
                        if (conn.isDisconnected()) break block2;
                        ProtonHelper.released((ProtonDelivery)delivery, (boolean)true);
                    }
                }
            });
            receiver.open();
            this.log.debug("established link for receiving messages from device [container: {}, {}]", (Object)conn.getRemoteContainer(), (Object)authenticatedDevice);
            span.log("link established");
        }
        span.finish();
    }

    private Future<Void> applyTraceSamplingPriorityForAddressTenant(ResourceIdentifier address, Span span) {
        Objects.requireNonNull(span);
        if (address == null || address.getTenantId() == null) {
            return Future.succeededFuture();
        }
        return this.getTenantConfiguration(address.getTenantId(), span.context()).map(tenantObject -> {
            TracingHelper.setDeviceTags((Span)span, (String)tenantObject.getTenantId(), null);
            TenantTraceSamplingHelper.applyTraceSamplingPriority((TenantObject)tenantObject, null, (Span)span);
            return null;
        }).recover(t -> Future.succeededFuture());
    }

    Future<Void> onMessageReceived(AmqpContext ctx) {
        this.log.trace("processing message [address: {}, qos: {}]", (Object)ctx.getAddress(), (Object)ctx.getRequestedQos());
        Span msgSpan = ctx.getTracingSpan();
        return this.validateEndpoint(ctx).compose(validatedEndpoint -> this.validateAddress(validatedEndpoint.getAddress(), (Device)validatedEndpoint.getAuthenticatedDevice())).compose(validatedAddress -> this.uploadMessage(ctx, (ResourceIdentifier)validatedAddress, msgSpan)).map(d -> {
            ProtonHelper.accepted((ProtonDelivery)ctx.delivery(), (boolean)true);
            return d;
        }).recover(t -> {
            if (t instanceof ClientErrorException) {
                AmqpUtils.rejected((ProtonDelivery)ctx.delivery(), (ErrorCondition)VertxBasedAmqpProtocolAdapter.getErrorCondition(t));
            } else {
                ProtonHelper.released((ProtonDelivery)ctx.delivery(), (boolean)true);
            }
            this.log.debug("failed to process message from device", t);
            TracingHelper.logError((Span)msgSpan, (Throwable)t);
            return Future.failedFuture((Throwable)t);
        });
    }

    void handleRemoteSenderOpenForCommands(ProtonConnection connection, ProtonSender sender) {
        DeviceUser authenticatedDevice = VertxBasedAmqpProtocolAdapter.getAuthenticatedDevice(connection);
        OptionalInt traceSamplingPriority = VertxBasedAmqpProtocolAdapter.getTraceSamplingPriority(connection);
        Span span = this.newSpan("attach device command receiver link", (Device)authenticatedDevice, traceSamplingPriority);
        VertxBasedAmqpProtocolAdapter.getResourceIdentifier(sender.getRemoteSource()).compose(address -> this.validateAddress((ResourceIdentifier)address, (Device)authenticatedDevice)).compose(validAddress -> {
            if (CommandConstants.isCommandEndpoint((String)validAddress.getEndpoint())) {
                return this.openCommandSenderLink(connection, sender, (ResourceIdentifier)validAddress, (Device)authenticatedDevice, span, traceSamplingPriority);
            }
            return Future.failedFuture((Throwable)new ClientErrorException(404, "no such node"));
        }).map(consumer -> {
            span.log("link established");
            return consumer;
        }).recover(t -> {
            if (t instanceof ServiceInvocationException) {
                this.closeLinkWithError((ProtonLink)sender, (Throwable)t, span);
            } else {
                this.closeLinkWithError((ProtonLink)sender, (Throwable)new ClientErrorException(400, "Invalid source address"), span);
            }
            return Future.failedFuture((Throwable)t);
        }).onComplete(s -> span.finish());
    }

    private void closeConnectionOnTerminalError(Throwable error, ProtonConnection conn, AmqpContext ctx, Span span) {
        ResourceIdentifier address = ctx.getAddress();
        if (address != null) {
            this.isTerminalError(error, address.getResourceId(), (Device)ctx.getAuthenticatedDevice(), span.context()).onSuccess(isTerminalError -> {
                if (isTerminalError.booleanValue()) {
                    span.log("closing connection to device");
                    conn.close();
                    conn.disconnect();
                }
            }).onComplete(o -> span.finish());
        } else {
            span.finish();
        }
    }

    private Span newSpan(String operationName, Device authenticatedDevice, OptionalInt traceSamplingPriority) {
        return this.newSpan(operationName, authenticatedDevice, traceSamplingPriority, null);
    }

    private Span newSpan(String operationName, Device authenticatedDevice, OptionalInt traceSamplingPriority, SpanContext context) {
        Span span = TracingHelper.buildChildSpan((Tracer)this.tracer, (SpanContext)context, (String)operationName, (String)this.getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "server").withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), authenticatedDevice != null).start();
        if (authenticatedDevice != null) {
            TracingHelper.setDeviceTags((Span)span, (String)authenticatedDevice.getTenantId(), (String)authenticatedDevice.getDeviceId());
        }
        traceSamplingPriority.ifPresent(prio -> TracingHelper.setTraceSamplingPriority((Span)span, (int)prio));
        return span;
    }

    private Future<ProtocolAdapterCommandConsumer> openCommandSenderLink(ProtonConnection connection, ProtonSender sender, ResourceIdentifier address, Device authenticatedDevice, Span span, OptionalInt traceSamplingPriority) {
        return this.createCommandConsumer(sender, address, authenticatedDevice, span).map(consumer -> {
            sender.setSource(sender.getRemoteSource());
            sender.setTarget(sender.getRemoteTarget());
            sender.setQoS(ProtonQoS.AT_LEAST_ONCE);
            Handler detachHandler = link -> {
                Span detachHandlerSpan = this.newSpan("detach device command receiver link", authenticatedDevice, traceSamplingPriority);
                VertxBasedAmqpProtocolAdapter.removeCommandSubscription(connection, address.toString());
                this.onLinkDetach((ProtonLink)sender);
                this.closeCommandConsumer((ProtocolAdapterCommandConsumer)consumer, address, true, detachHandlerSpan).onComplete(v -> detachHandlerSpan.finish());
            };
            HonoProtonHelper.setCloseHandler((ProtonLink)sender, (Handler)detachHandler);
            HonoProtonHelper.setDetachHandler((ProtonLink)sender, (Handler)detachHandler);
            sender.open();
            this.log.debug("established link [address: {}] for sending commands to device", (Object)address);
            VertxBasedAmqpProtocolAdapter.registerCommandSubscription(connection, new CommandSubscription((ProtocolAdapterCommandConsumer)consumer, address));
            return consumer;
        }).recover(t -> {
            if (t instanceof ServiceInvocationException) {
                return Future.failedFuture((Throwable)t);
            }
            return Future.failedFuture((Throwable)new ServerErrorException(address.getTenantId(), 503, "cannot create command consumer"));
        });
    }

    private Future<Void> closeCommandConsumer(ProtocolAdapterCommandConsumer consumer, ResourceIdentifier address, boolean sendDisconnectedEvent, Span span) {
        String tenantId = address.getTenantId();
        String deviceId = address.getResourceId();
        return consumer.close(sendDisconnectedEvent, span.context()).recover(thr -> {
            TracingHelper.logError((Span)span, (Throwable)thr);
            if (ServiceInvocationException.extractStatusCode((Throwable)thr) == 412) {
                this.log.debug("command consumer wasn't active anymore [tenant: {}, device-id: {}]", (Object)tenantId, (Object)deviceId);
                span.log("command consumer wasn't active anymore");
                return Future.failedFuture((Throwable)thr);
            }
            return Future.succeededFuture();
        }).mapEmpty();
    }

    private Future<ProtocolAdapterCommandConsumer> createCommandConsumer(ProtonSender sender, ResourceIdentifier sourceAddress, Device authenticatedDevice, Span span) {
        Function<CommandContext, Future> commandHandler = commandContext -> {
            Timer.Sample timer = this.metrics.startTimer();
            VertxBasedAmqpProtocolAdapter.addMicrometerSample((CommandContext)commandContext, (Timer.Sample)timer);
            Tags.COMPONENT.set(commandContext.getTracingSpan(), this.getTypeName());
            Command command = commandContext.getCommand();
            Future tenantTracker = this.getTenantConfiguration(sourceAddress.getTenantId(), commandContext.getTracingContext());
            return tenantTracker.compose(tenantObject -> {
                if (!command.isValid()) {
                    return Future.failedFuture((Throwable)new ClientErrorException(400, "malformed command message"));
                }
                if (!HonoProtonHelper.isLinkOpenAndConnected((ProtonLink)sender)) {
                    return Future.failedFuture((Throwable)new ServerErrorException(503, "sender link is not open"));
                }
                return this.checkMessageLimit((TenantObject)tenantObject, command.getPayloadSize(), commandContext.getTracingContext());
            }).compose(success -> {
                if (authenticatedDevice != null && !authenticatedDevice.getDeviceId().equals(sourceAddress.getResourceId())) {
                    return this.getRegistrationAssertion(authenticatedDevice.getTenantId(), sourceAddress.getResourceId(), authenticatedDevice, commandContext.getTracingContext());
                }
                return Future.succeededFuture();
            }).onFailure(failure -> {
                if (failure instanceof ClientErrorException) {
                    commandContext.reject(failure);
                } else {
                    commandContext.release(failure);
                }
                this.metrics.reportCommand(command.isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, sourceAddress.getTenantId(), (TenantObject)tenantTracker.result(), MetricsTags.ProcessingOutcome.from((Throwable)failure), command.getPayloadSize(), timer);
            }).compose(success -> this.onCommandReceived((TenantObject)tenantTracker.result(), sender, (CommandContext)commandContext));
        };
        Future tokenTracker = Optional.ofNullable(authenticatedDevice).map(v -> this.getRegistrationAssertion(authenticatedDevice.getTenantId(), sourceAddress.getResourceId(), authenticatedDevice, span.context())).orElseGet(Future::succeededFuture);
        if (authenticatedDevice != null && !authenticatedDevice.getDeviceId().equals(sourceAddress.getResourceId())) {
            return tokenTracker.compose(v -> this.getCommandConsumerFactory().createCommandConsumer(sourceAddress.getTenantId(), sourceAddress.getResourceId(), authenticatedDevice.getDeviceId(), true, commandHandler, null, span.context()));
        }
        return tokenTracker.compose(v -> this.getCommandConsumerFactory().createCommandConsumer(sourceAddress.getTenantId(), sourceAddress.getResourceId(), true, commandHandler, null, span.context()));
    }

    Future<Void> onCommandReceived(TenantObject tenantObject, ProtonSender sender, CommandContext commandContext) {
        Objects.requireNonNull(tenantObject);
        Objects.requireNonNull(sender);
        Objects.requireNonNull(commandContext);
        Command command = commandContext.getCommand();
        AtomicBoolean isCommandSettled = new AtomicBoolean(false);
        if (sender.sendQueueFull()) {
            this.log.debug("cannot send command to device: no credit available [{}]", (Object)command);
            ServerErrorException exception = new ServerErrorException(503, "no credit available for sending command to device");
            commandContext.release((Throwable)exception);
            this.reportSentCommand(tenantObject, commandContext, MetricsTags.ProcessingOutcome.UNDELIVERABLE);
            return Future.failedFuture((Throwable)exception);
        }
        Message msg = ProtonHelper.message();
        msg.setAddress(String.format("%s/%s/%s", "command", command.getTenant(), command.getDeviceId()));
        msg.setCorrelationId((Object)command.getCorrelationId());
        msg.setSubject(command.getName());
        AmqpUtils.setPayload((Message)msg, (String)command.getContentType(), (Buffer)command.getPayload());
        if (command.isTargetedAtGateway()) {
            AmqpUtils.addDeviceId((Message)msg, (String)command.getDeviceId());
        }
        if (!command.isOneWay()) {
            msg.setReplyTo(String.format("%s/%s/%s", "command_response", command.getTenant(), Commands.getDeviceFacingReplyToId((String)command.getReplyToId(), (String)command.getDeviceId(), (MessagingType)command.getMessagingType())));
        }
        Long timerId = ((AmqpAdapterProperties)((Object)this.getConfig())).getSendMessageToDeviceTimeout() < 1L ? null : Long.valueOf(this.vertx.setTimer(((AmqpAdapterProperties)((Object)this.getConfig())).getSendMessageToDeviceTimeout(), tid -> {
            if (this.log.isDebugEnabled()) {
                String linkOrConnectionClosedInfo = HonoProtonHelper.isLinkOpenAndConnected((ProtonLink)sender) ? "" : " (link or connection already closed)";
                this.log.debug("waiting for delivery update timed out after {}ms{} [{}]", new Object[]{((AmqpAdapterProperties)((Object)((Object)this.getConfig()))).getSendMessageToDeviceTimeout(), linkOrConnectionClosedInfo, command});
            }
            if (isCommandSettled.compareAndSet(false, true)) {
                commandContext.release((Throwable)new ServerErrorException(503, "timeout waiting for delivery update from device"));
                this.reportSentCommand(tenantObject, commandContext, MetricsTags.ProcessingOutcome.UNDELIVERABLE);
            } else if (this.log.isTraceEnabled()) {
                this.log.trace("command is already settled and downstream application was already notified [{}]", (Object)command);
            }
        }));
        sender.send(msg, delivery -> {
            if (timerId != null) {
                this.vertx.cancelTimer(timerId.longValue());
            }
            if (!isCommandSettled.compareAndSet(false, true)) {
                this.log.trace("command is already settled and downstream application was already notified [{}]", (Object)command);
            } else {
                DeliveryState remoteState = delivery.getRemoteState();
                MetricsTags.ProcessingOutcome outcome = null;
                if (delivery.remotelySettled()) {
                    if (remoteState instanceof Accepted) {
                        outcome = MetricsTags.ProcessingOutcome.FORWARDED;
                        commandContext.accept();
                    } else if (remoteState instanceof Rejected) {
                        Rejected rejected = (Rejected)remoteState;
                        outcome = MetricsTags.ProcessingOutcome.UNPROCESSABLE;
                        String cause = Optional.ofNullable(rejected.getError()).map(ErrorCondition::getDescription).orElse(null);
                        commandContext.reject(cause);
                    } else if (remoteState instanceof Released) {
                        outcome = MetricsTags.ProcessingOutcome.UNDELIVERABLE;
                        commandContext.release();
                    } else if (remoteState instanceof Modified) {
                        Modified modified = (Modified)remoteState;
                        outcome = modified.getUndeliverableHere() != false ? MetricsTags.ProcessingOutcome.UNPROCESSABLE : MetricsTags.ProcessingOutcome.UNDELIVERABLE;
                        commandContext.modify(modified.getDeliveryFailed().booleanValue(), modified.getUndeliverableHere().booleanValue());
                    }
                } else {
                    this.log.debug("device did not settle command message [remote state: {}, {}]", (Object)remoteState, (Object)command);
                    HashMap<String, String> logItems = new HashMap<String, String>(2);
                    logItems.put("event", "device did not settle command");
                    logItems.put("remote state", (String)remoteState);
                    commandContext.getTracingSpan().log(logItems);
                    commandContext.release((Throwable)new ServerErrorException(503, "device did not settle command"));
                    outcome = MetricsTags.ProcessingOutcome.UNDELIVERABLE;
                }
                this.reportSentCommand(tenantObject, commandContext, outcome);
            }
        });
        HashMap<String, Object> items = new HashMap<String, Object>(4);
        items.put("event", "command sent to device");
        if (sender.getRemoteTarget() != null) {
            items.put(Tags.MESSAGE_BUS_DESTINATION.getKey(), sender.getRemoteTarget().getAddress());
        }
        items.put(TracingHelper.TAG_QOS.getKey(), sender.getQoS().name());
        items.put(TracingHelper.TAG_CREDIT.getKey(), sender.getCredit());
        commandContext.getTracingSpan().log(items);
        return Future.succeededFuture();
    }

    private void reportSentCommand(TenantObject tenantObject, CommandContext commandContext, MetricsTags.ProcessingOutcome outcome) {
        this.metrics.reportCommand(commandContext.getCommand().isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, commandContext.getCommand().getTenant(), tenantObject, outcome, commandContext.getCommand().getPayloadSize(), VertxBasedAmqpProtocolAdapter.getMicrometerSample((CommandContext)commandContext));
    }

    private <T extends ProtonLink<T>> void closeLinkWithError(ProtonLink<T> link, Throwable t, Span span) {
        ErrorCondition ec = VertxBasedAmqpProtocolAdapter.getErrorCondition(t);
        this.log.debug("closing link with error condition [symbol: {}, description: {}]", (Object)ec.getCondition(), (Object)ec.getDescription());
        link.setCondition(ec);
        link.close();
        if (span != null) {
            TracingHelper.logError((Span)span, (Throwable)t);
        }
    }

    private Future<Void> uploadMessage(AmqpContext context, ResourceIdentifier resource, Span currentSpan) {
        switch (context.getEndpoint()) {
            case TELEMETRY: 
            case EVENT: {
                Promise contentTypeCheck = Promise.promise();
                if (VertxBasedAmqpProtocolAdapter.isPayloadOfIndicatedType((Buffer)context.getMessagePayload(), (String)context.getMessageContentType())) {
                    contentTypeCheck.complete();
                } else {
                    contentTypeCheck.fail((Throwable)new ClientErrorException(400, "content type [%s] does not match payload".formatted(context.getMessageContentType())));
                }
                return contentTypeCheck.future().compose(ok -> this.doUploadMessage(context, resource, currentSpan));
            }
            case COMMAND_RESPONSE: {
                return this.doUploadCommandResponseMessage(context, resource, currentSpan);
            }
        }
        return Future.failedFuture((Throwable)new ClientErrorException(404, "unknown endpoint"));
    }

    private Future<Void> doUploadMessage(AmqpContext context, ResourceIdentifier resource, Span currentSpan) {
        this.log.trace("forwarding {} message", (Object)context.getEndpoint().getCanonicalName());
        Future tokenFuture = this.getRegistrationAssertion(resource.getTenantId(), resource.getResourceId(), (Device)context.getAuthenticatedDevice(), currentSpan.context());
        Future tenantTracker = this.getTenantConfiguration(resource.getTenantId(), currentSpan.context());
        Future tenantValidationTracker = tenantTracker.compose(tenantObject -> CompositeFuture.all((Future)this.isAdapterEnabled((TenantObject)tenantObject), (Future)this.checkMessageLimit((TenantObject)tenantObject, context.getPayloadSize(), currentSpan.context())).map(success -> tenantObject));
        return CompositeFuture.all((Future)tenantValidationTracker, (Future)tokenFuture).compose(ok -> {
            Map props = this.getDownstreamMessageProperties((TelemetryExecutionContext)context);
            if (context.getEndpoint() == MetricsTags.EndpointType.TELEMETRY) {
                return this.getTelemetrySender((TenantObject)tenantValidationTracker.result()).sendTelemetry((TenantObject)tenantValidationTracker.result(), (RegistrationAssertion)tokenFuture.result(), context.getRequestedQos(), context.getMessageContentType(), context.getMessagePayload(), props, currentSpan.context());
            }
            return this.getEventSender((TenantObject)tenantValidationTracker.result()).sendEvent((TenantObject)tenantValidationTracker.result(), (RegistrationAssertion)tokenFuture.result(), context.getMessageContentType(), context.getMessagePayload(), props, currentSpan.context());
        }).recover(t -> {
            this.log.debug("cannot process {} message from device [tenant: {}, device-id: {}]", new Object[]{context.getEndpoint().getCanonicalName(), resource.getTenantId(), resource.getResourceId(), t});
            this.metrics.reportTelemetry(context.getEndpoint(), resource.getTenantId(), (TenantObject)tenantTracker.result(), MetricsTags.ProcessingOutcome.from((Throwable)t), context.isRemotelySettled() ? MetricsTags.QoS.AT_MOST_ONCE : MetricsTags.QoS.AT_LEAST_ONCE, context.getPayloadSize(), context.getTimer());
            return Future.failedFuture((Throwable)t);
        }).map(ok -> {
            this.metrics.reportTelemetry(context.getEndpoint(), resource.getTenantId(), (TenantObject)tenantTracker.result(), MetricsTags.ProcessingOutcome.FORWARDED, context.isRemotelySettled() ? MetricsTags.QoS.AT_MOST_ONCE : MetricsTags.QoS.AT_LEAST_ONCE, context.getPayloadSize(), context.getTimer());
            return ok;
        });
    }

    private CommandResponse getCommandResponse(Message message) {
        return CommandResponse.fromAddressAndCorrelationId((String)message.getAddress(), (String)(message.getCorrelationId() instanceof String ? (String)message.getCorrelationId() : null), (Buffer)AmqpUtils.getPayload((Message)message), (String)message.getContentType(), (Integer)AmqpUtils.getStatus((Message)message));
    }

    private Future<Void> doUploadCommandResponseMessage(AmqpContext context, ResourceIdentifier resource, Span currentSpan) {
        Future responseTracker = Optional.ofNullable(this.getCommandResponse(context.getMessage())).map(Future::succeededFuture).orElseGet(() -> {
            TracingHelper.logError((Span)currentSpan, (String)String.format("invalid message (correlationId: %s, address: %s, status: %s)", context.getMessage().getCorrelationId(), context.getMessage().getAddress(), AmqpUtils.getStatus((Message)context.getMessage())));
            return Future.failedFuture((Throwable)new ClientErrorException(400, "malformed command response message"));
        });
        Future tenantTracker = this.getTenantConfiguration(resource.getTenantId(), currentSpan.context());
        return CompositeFuture.all((Future)tenantTracker, (Future)responseTracker).compose(ok -> {
            CommandResponse commandResponse = (CommandResponse)responseTracker.result();
            this.log.trace("sending command response [device-id: {}, status: {}, correlation-id: {}, reply-to: {}]", new Object[]{resource.getResourceId(), commandResponse.getStatus(), commandResponse.getCorrelationId(), commandResponse.getReplyToId()});
            HashMap<String, Object> items = new HashMap<String, Object>(3);
            items.put("event", "sending command response");
            items.put(TracingHelper.TAG_CORRELATION_ID.getKey(), commandResponse.getCorrelationId());
            items.put("status", commandResponse.getStatus());
            currentSpan.log(items);
            Future tokenFuture = this.getRegistrationAssertion(resource.getTenantId(), resource.getResourceId(), (Device)context.getAuthenticatedDevice(), currentSpan.context());
            Future tenantValidationTracker = CompositeFuture.all((Future)this.isAdapterEnabled((TenantObject)tenantTracker.result()), (Future)this.checkMessageLimit((TenantObject)tenantTracker.result(), context.getPayloadSize(), currentSpan.context())).map(success -> (TenantObject)tenantTracker.result());
            return CompositeFuture.all((Future)tenantValidationTracker, (Future)tokenFuture).compose(success -> this.sendCommandResponse((TenantObject)tenantTracker.result(), (RegistrationAssertion)tokenFuture.result(), commandResponse, currentSpan.context()));
        }).map(delivery -> {
            this.log.trace("forwarded command response from device [tenant: {}, device-id: {}]", (Object)resource.getTenantId(), (Object)resource.getResourceId());
            this.metrics.reportCommand(MetricsTags.Direction.RESPONSE, resource.getTenantId(), (TenantObject)tenantTracker.result(), MetricsTags.ProcessingOutcome.FORWARDED, context.getPayloadSize(), context.getTimer());
            return delivery;
        }).recover(t -> {
            this.log.debug("cannot process command response from device [tenant: {}, device-id: {}]", new Object[]{resource.getTenantId(), resource.getResourceId(), t});
            this.metrics.reportCommand(MetricsTags.Direction.RESPONSE, resource.getTenantId(), (TenantObject)tenantTracker.result(), MetricsTags.ProcessingOutcome.from((Throwable)t), context.getPayloadSize(), context.getTimer());
            return Future.failedFuture((Throwable)t);
        });
    }

    private <T extends ProtonLink<T>> void onLinkDetach(ProtonLink<T> link) {
        this.log.debug("closing link [{}]", (Object)link.getName());
        link.close();
    }

    Future<AmqpContext> validateEndpoint(AmqpContext ctx) {
        Promise result = Promise.promise();
        if (ctx.getAddress() == null) {
            result.fail((Throwable)new ClientErrorException(404));
        } else {
            switch (ctx.getEndpoint()) {
                case TELEMETRY: 
                case COMMAND_RESPONSE: {
                    result.complete((Object)ctx);
                    break;
                }
                case EVENT: {
                    if (ctx.isRemotelySettled()) {
                        result.fail((Throwable)new ClientErrorException(400, "event endpoint accepts unsettled messages only"));
                        break;
                    }
                    result.complete((Object)ctx);
                    break;
                }
                default: {
                    this.log.debug("device wants to send message for unsupported address [{}]", (Object)ctx.getAddress());
                    result.fail((Throwable)new ClientErrorException(404, "unsupported endpoint"));
                }
            }
        }
        return result.future();
    }

    private static Future<ResourceIdentifier> getResourceIdentifier(Source source) {
        if (source == null || !ResourceIdentifier.isValid((String)source.getAddress())) {
            return Future.failedFuture((Throwable)new ClientErrorException(404, "no such node"));
        }
        return Future.succeededFuture((Object)ResourceIdentifier.fromString((String)source.getAddress()));
    }

    private static ErrorCondition getErrorCondition(Throwable t) {
        String errorMessage = ServiceInvocationException.getErrorMessageForExternalClient((Throwable)t);
        if (t instanceof AuthorizationException || t instanceof AdapterDisabledException) {
            return ProtonHelper.condition((Symbol)AmqpError.UNAUTHORIZED_ACCESS, (String)errorMessage);
        }
        if (ServiceInvocationException.class.isInstance(t)) {
            ServiceInvocationException error = (ServiceInvocationException)t;
            switch (error.getErrorCode()) {
                case 400: {
                    return ProtonHelper.condition((Symbol)AmqpUtils.AMQP_BAD_REQUEST, (String)errorMessage);
                }
                case 403: {
                    return ProtonHelper.condition((Symbol)AmqpError.UNAUTHORIZED_ACCESS, (String)errorMessage);
                }
                case 404: {
                    return ProtonHelper.condition((Symbol)AmqpError.NOT_FOUND, (String)errorMessage);
                }
                case 429: {
                    return ProtonHelper.condition((Symbol)AmqpError.RESOURCE_LIMIT_EXCEEDED, (String)errorMessage);
                }
            }
            return ProtonHelper.condition((Symbol)AmqpError.PRECONDITION_FAILED, (String)errorMessage);
        }
        return ProtonHelper.condition((Symbol)AmqpError.PRECONDITION_FAILED, (String)errorMessage);
    }

    private static void registerCommandSubscription(ProtonConnection con, CommandSubscription commandSubscription) {
        Map map = Optional.ofNullable((Map)con.attachments().get((Object)KEY_COMMAND_SUBSCRIPTIONS_MAP, Map.class)).orElseGet(HashMap::new);
        map.put(commandSubscription.getAddress().toString(), commandSubscription);
        con.attachments().set((Object)KEY_COMMAND_SUBSCRIPTIONS_MAP, Map.class, (Object)map);
    }

    private static Collection<CommandSubscription> getCommandSubscriptions(ProtonConnection con) {
        Map map = (Map)con.attachments().get((Object)KEY_COMMAND_SUBSCRIPTIONS_MAP, Map.class);
        return map != null ? map.values() : Collections.emptyList();
    }

    private static boolean removeCommandSubscription(ProtonConnection con, String subscriptionAddress) {
        Map map = (Map)con.attachments().get((Object)KEY_COMMAND_SUBSCRIPTIONS_MAP, Map.class);
        return map != null && map.remove(subscriptionAddress) != null;
    }

    private static DeviceUser getAuthenticatedDevice(ProtonConnection con) {
        return (DeviceUser)con.attachments().get((Object)"CLIENT_DEVICE", DeviceUser.class);
    }

    private static OptionalInt getTraceSamplingPriority(ProtonConnection con) {
        return Optional.ofNullable((OptionalInt)con.attachments().get((Object)"TRACE_SAMPLING_PRIORITY", OptionalInt.class)).orElse(OptionalInt.empty());
    }

    private Future<Void> checkConnectionLimitForAdapter() {
        if (this.getConnectionLimitManager() != null && this.getConnectionLimitManager().isLimitExceeded()) {
            return Future.failedFuture((Throwable)new AdapterConnectionsExceededException(null, "connection limit for the adapter exceeded", null));
        }
        return Future.succeededFuture();
    }

    public int getPortDefaultValue() {
        return 5671;
    }

    public int getInsecurePortDefaultValue() {
        return 5672;
    }

    protected int getActualPort() {
        return this.secureServer != null ? this.secureServer.actualPort() : -1;
    }

    protected int getActualInsecurePort() {
        return this.insecureServer != null ? this.insecureServer.actualPort() : -1;
    }

    private static class CommandSubscription {
        private final ProtocolAdapterCommandConsumer consumer;
        private final ResourceIdentifier subscriptionAddress;

        CommandSubscription(ProtocolAdapterCommandConsumer consumer, ResourceIdentifier subscriptionAddress) {
            this.consumer = consumer;
            this.subscriptionAddress = subscriptionAddress;
        }

        public final ProtocolAdapterCommandConsumer getConsumer() {
            return this.consumer;
        }

        public final ResourceIdentifier getAddress() {
            return this.subscriptionAddress;
        }
    }
}

