/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.adapter.mqtt;

import io.micrometer.core.instrument.Timer;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tag;
import io.opentracing.tag.Tags;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetServerOptions;
import io.vertx.mqtt.MqttConnectionException;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubscribeMessage;
import io.vertx.mqtt.messages.MqttUnsubscribeMessage;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.net.ssl.SSLSession;
import org.eclipse.hono.adapter.AbstractProtocolAdapterBase;
import org.eclipse.hono.adapter.AdapterConnectionsExceededException;
import org.eclipse.hono.adapter.AuthorizationException;
import org.eclipse.hono.adapter.ProtocolAdapterProperties;
import org.eclipse.hono.adapter.TelemetryExecutionContext;
import org.eclipse.hono.adapter.auth.device.AuthHandler;
import org.eclipse.hono.adapter.auth.device.ChainAuthHandler;
import org.eclipse.hono.adapter.auth.device.CredentialsApiAuthProvider;
import org.eclipse.hono.adapter.auth.device.DeviceCredentials;
import org.eclipse.hono.adapter.auth.device.DeviceCredentialsAuthProvider;
import org.eclipse.hono.adapter.auth.device.SubjectDnCredentials;
import org.eclipse.hono.adapter.auth.device.TenantServiceBasedX509Authentication;
import org.eclipse.hono.adapter.auth.device.UsernamePasswordAuthProvider;
import org.eclipse.hono.adapter.auth.device.UsernamePasswordCredentials;
import org.eclipse.hono.adapter.auth.device.X509AuthProvider;
import org.eclipse.hono.adapter.auth.device.X509Authentication;
import org.eclipse.hono.adapter.limiting.ConnectionLimitManager;
import org.eclipse.hono.adapter.limiting.ConnectionLimitStrategy;
import org.eclipse.hono.adapter.limiting.DefaultConnectionLimitManager;
import org.eclipse.hono.adapter.limiting.MemoryBasedConnectionLimitStrategy;
import org.eclipse.hono.adapter.mqtt.AbstractSubscription;
import org.eclipse.hono.adapter.mqtt.CommandSubscription;
import org.eclipse.hono.adapter.mqtt.ConnectPacketAuthHandler;
import org.eclipse.hono.adapter.mqtt.ErrorSubscription;
import org.eclipse.hono.adapter.mqtt.MqttAdapterMetrics;
import org.eclipse.hono.adapter.mqtt.MqttConnectContext;
import org.eclipse.hono.adapter.mqtt.MqttContext;
import org.eclipse.hono.adapter.mqtt.MqttProtocolAdapterProperties;
import org.eclipse.hono.adapter.mqtt.PendingPubAcks;
import org.eclipse.hono.adapter.mqtt.PropertyBag;
import org.eclipse.hono.adapter.mqtt.Subscription;
import org.eclipse.hono.adapter.mqtt.X509AuthHandler;
import org.eclipse.hono.auth.Device;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandConsumer;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandResponse;
import org.eclipse.hono.notification.NotificationEventBusSupport;
import org.eclipse.hono.notification.NotificationType;
import org.eclipse.hono.notification.deviceregistry.AllDevicesOfTenantDeletedNotification;
import org.eclipse.hono.notification.deviceregistry.DeviceChangeNotification;
import org.eclipse.hono.notification.deviceregistry.LifecycleChange;
import org.eclipse.hono.notification.deviceregistry.TenantChangeNotification;
import org.eclipse.hono.service.auth.DeviceUser;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.tracing.TenantTraceSamplingHelper;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.ExecutionContext;
import org.eclipse.hono.util.Futures;
import org.eclipse.hono.util.Pair;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.ResourceIdentifier;
import org.eclipse.hono.util.TenantObject;

public abstract class AbstractVertxBasedMqttProtocolAdapter<T extends MqttProtocolAdapterProperties>
extends AbstractProtocolAdapterBase<T> {
    protected static final int MINIMAL_MEMORY_JVM = 100000000;
    protected static final int MINIMAL_MEMORY_SUBSTRATE = 35000000;
    protected static final int MEMORY_PER_CONNECTION = 20000;
    protected static final int MAX_MSG_SIZE_VARIABLE_HEADER_SIZE = 128;
    private static final String EVENT_SENDING_PUBACK = "sending PUBACK";
    private static final int IANA_MQTT_PORT = 1883;
    private static final int IANA_SECURE_MQTT_PORT = 8883;
    private static final String LOG_FIELD_TOPIC_FILTER = "filter";
    private final AtomicReference<Promise<Void>> stopResultPromiseRef = new AtomicReference();
    private MqttAdapterMetrics metrics = MqttAdapterMetrics.NOOP;
    private MqttServer server;
    private MqttServer insecureServer;
    private AuthHandler<MqttConnectContext> authHandler;
    private final Set<MqttDeviceEndpoint> connectedAuthenticatedDeviceEndpoints = new HashSet<MqttDeviceEndpoint>();

    public final void setAuthHandler(AuthHandler<MqttConnectContext> authHandler) {
        this.authHandler = Objects.requireNonNull(authHandler);
    }

    public int getPortDefaultValue() {
        return 8883;
    }

    public int getInsecurePortDefaultValue() {
        return 1883;
    }

    protected final int getActualPort() {
        return this.server != null ? this.server.actualPort() : -1;
    }

    protected final int getActualInsecurePort() {
        return this.insecureServer != null ? this.insecureServer.actualPort() : -1;
    }

    public final void setMetrics(MqttAdapterMetrics metrics) {
        Optional.ofNullable(metrics).ifPresent(m -> this.log.info("reporting metrics using [{}]", (Object)metrics.getClass().getName()));
        this.metrics = metrics;
    }

    protected final MqttAdapterMetrics getMetrics() {
        return this.metrics;
    }

    protected AuthHandler<MqttConnectContext> createAuthHandler() {
        return new ChainAuthHandler(this::handleBeforeCredentialsValidation).append((AuthHandler)new X509AuthHandler((X509Authentication)new TenantServiceBasedX509Authentication(this.getTenantClient(), this.tracer), (DeviceCredentialsAuthProvider<SubjectDnCredentials>)new X509AuthProvider(this.getCredentialsClient(), this.tracer))).append((AuthHandler)new ConnectPacketAuthHandler((DeviceCredentialsAuthProvider<UsernamePasswordCredentials>)new UsernamePasswordAuthProvider(this.getCredentialsClient(), this.tracer)));
    }

    protected Future<Void> handleBeforeCredentialsValidation(DeviceCredentials credentials, MqttConnectContext executionContext) {
        String tenantId = credentials.getTenantId();
        Span span = executionContext.getTracingSpan();
        String authId = credentials.getAuthId();
        return this.getTenantConfiguration(tenantId, span.context()).recover(t -> Future.failedFuture((Throwable)CredentialsApiAuthProvider.mapNotFoundToBadCredentialsException((Throwable)t))).map(tenantObject -> {
            TracingHelper.setDeviceTags((Span)span, (String)tenantId, null, (String)authId);
            OptionalInt traceSamplingPriority = TenantTraceSamplingHelper.applyTraceSamplingPriority((TenantObject)tenantObject, (String)authId, (Span)span);
            executionContext.setTraceSamplingPriority(traceSamplingPriority);
            return tenantObject;
        }).compose(arg_0 -> ((AbstractVertxBasedMqttProtocolAdapter)this).isAdapterEnabled(arg_0)).mapEmpty();
    }

    public void setMqttSecureServer(MqttServer server) {
        Objects.requireNonNull(server);
        if (server.actualPort() > 0) {
            throw new IllegalStateException("MQTT server must not be started already");
        }
        this.server = server;
    }

    public void setMqttInsecureServer(MqttServer server) {
        Objects.requireNonNull(server);
        if (server.actualPort() > 0) {
            throw new IllegalStateException("MQTT server must not be started already");
        }
        this.insecureServer = server;
    }

    private Future<Void> bindSecureMqttServer() {
        if (this.isSecurePortEnabled()) {
            MqttServerOptions options = new MqttServerOptions();
            options.setHost(((MqttProtocolAdapterProperties)((Object)this.getConfig())).getBindAddress()).setPort(this.determineSecurePort()).setMaxMessageSize(((MqttProtocolAdapterProperties)((Object)this.getConfig())).getMaxPayloadSize() + 128);
            this.addTlsKeyCertOptions((NetServerOptions)options);
            this.addTlsTrustOptions((NetServerOptions)options);
            return this.bindMqttServer(options, this.server).map(s -> {
                this.server = s;
                return null;
            });
        }
        return Future.succeededFuture();
    }

    private Future<Void> bindInsecureMqttServer() {
        if (this.isInsecurePortEnabled()) {
            MqttServerOptions options = new MqttServerOptions();
            options.setHost(((MqttProtocolAdapterProperties)((Object)this.getConfig())).getInsecurePortBindAddress()).setPort(this.determineInsecurePort()).setMaxMessageSize(((MqttProtocolAdapterProperties)((Object)this.getConfig())).getMaxPayloadSize() + 128);
            return this.bindMqttServer(options, this.insecureServer).map(createdServer -> {
                this.insecureServer = createdServer;
                return null;
            });
        }
        return Future.succeededFuture();
    }

    private Future<MqttServer> bindMqttServer(MqttServerOptions options, MqttServer mqttServer) {
        Promise result = Promise.promise();
        MqttServer createdMqttServer = mqttServer == null ? MqttServer.create((Vertx)this.vertx, (MqttServerOptions)options) : mqttServer;
        createdMqttServer.endpointHandler(this::handleEndpointConnection).listen(done -> {
            if (done.succeeded()) {
                this.log.info("MQTT server running on {}:{}", (Object)((MqttProtocolAdapterProperties)((Object)((Object)this.getConfig()))).getBindAddress(), (Object)createdMqttServer.actualPort());
                result.complete((Object)createdMqttServer);
            } else {
                this.log.error("error while starting up MQTT server", done.cause());
                result.fail(done.cause());
            }
        });
        return result.future();
    }

    protected final void doStart(Promise<Void> startPromise) {
        this.registerDeviceAndTenantChangeNotificationConsumers();
        this.log.info("limiting size of inbound message payload to {} bytes", (Object)((MqttProtocolAdapterProperties)((Object)this.getConfig())).getMaxPayloadSize());
        if (!((MqttProtocolAdapterProperties)((Object)this.getConfig())).isAuthenticationRequired()) {
            this.log.warn("authentication of devices turned off");
        }
        ConnectionLimitManager connectionLimitManager = Optional.ofNullable(this.getConnectionLimitManager()).orElseGet(this::createConnectionLimitManager);
        this.setConnectionLimitManager(connectionLimitManager);
        this.checkPortConfiguration().compose(ok -> CompositeFuture.all(this.bindSecureMqttServer(), this.bindInsecureMqttServer())).compose(ok -> {
            if (this.authHandler == null) {
                this.authHandler = this.createAuthHandler();
            }
            return Future.succeededFuture((Object)null);
        }).onComplete(startPromise);
    }

    private void registerDeviceAndTenantChangeNotificationConsumers() {
        NotificationEventBusSupport.registerConsumer((Vertx)this.vertx, (NotificationType)DeviceChangeNotification.TYPE, notification -> {
            if (LifecycleChange.DELETE.equals((Object)notification.getChange()) || LifecycleChange.UPDATE.equals((Object)notification.getChange()) && !notification.isDeviceEnabled()) {
                String reason = LifecycleChange.DELETE.equals((Object)notification.getChange()) ? "device deleted" : "device disabled";
                this.closeDeviceConnectionsOnDeviceOrTenantChange(connectedDevice -> connectedDevice.getTenantId().equals(notification.getTenantId()) && connectedDevice.getDeviceId().equals(notification.getDeviceId()), reason);
            }
        });
        NotificationEventBusSupport.registerConsumer((Vertx)this.vertx, (NotificationType)AllDevicesOfTenantDeletedNotification.TYPE, notification -> this.closeDeviceConnectionsOnDeviceOrTenantChange(connectedDevice -> connectedDevice.getTenantId().equals(notification.getTenantId()), "all devices of tenant deleted"));
        NotificationEventBusSupport.registerConsumer((Vertx)this.vertx, (NotificationType)TenantChangeNotification.TYPE, notification -> {
            if (LifecycleChange.DELETE.equals((Object)notification.getChange()) || LifecycleChange.UPDATE.equals((Object)notification.getChange()) && !notification.isTenantEnabled()) {
                String reason = LifecycleChange.DELETE.equals((Object)notification.getChange()) ? "tenant deleted" : "tenant disabled";
                this.closeDeviceConnectionsOnDeviceOrTenantChange(connectedDevice -> connectedDevice.getTenantId().equals(notification.getTenantId()), reason);
            }
        });
    }

    private void closeDeviceConnectionsOnDeviceOrTenantChange(Predicate<Device> deviceMatchPredicate, String reason) {
        List<MqttDeviceEndpoint> deviceEndpoints = this.connectedAuthenticatedDeviceEndpoints.stream().filter(ep -> deviceMatchPredicate.test(ep.getAuthenticatedDevice())).collect(Collectors.toList());
        deviceEndpoints.forEach(ep -> Futures.onCurrentContextCompletionHandler(v -> ep.close(reason, false)).handle(null));
    }

    private ConnectionLimitManager createConnectionLimitManager() {
        return new DefaultConnectionLimitManager((ConnectionLimitStrategy)MemoryBasedConnectionLimitStrategy.forParams((long)(((MqttProtocolAdapterProperties)((Object)this.getConfig())).isSubstrateVm() ? 35000000L : 100000000L), (long)20000L, (int)((MqttProtocolAdapterProperties)((Object)this.getConfig())).getGcHeapPercentage()), () -> this.metrics.getNumberOfConnections(), (ProtocolAdapterProperties)this.getConfig());
    }

    protected final void doStop(Promise<Void> stopPromise) {
        if (!this.stopResultPromiseRef.compareAndSet(null, stopPromise)) {
            this.stopResultPromiseRef.get().future().onComplete(stopPromise);
            this.log.trace("stop already called");
            return;
        }
        Promise serverTracker = Promise.promise();
        if (this.server != null) {
            this.log.info("closing secure MQTT server ...");
            this.server.close((Handler)serverTracker);
        } else {
            serverTracker.complete();
        }
        Promise insecureServerTracker = Promise.promise();
        if (this.insecureServer != null) {
            this.log.info("closing insecure MQTT server ...");
            this.insecureServer.close((Handler)insecureServerTracker);
        } else {
            insecureServerTracker.complete();
        }
        CompositeFuture.all((Future)serverTracker.future(), (Future)insecureServerTracker.future()).map(ok -> null).onComplete(ar -> this.log.info("MQTT server(s) closed")).onComplete(stopPromise);
    }

    final void handleEndpointConnection(MqttEndpoint endpoint) {
        this.log.debug("connection request from client [client-id: {}]", (Object)endpoint.clientIdentifier());
        String cipherSuite = Optional.ofNullable(endpoint.sslSession()).map(SSLSession::getCipherSuite).orElse(null);
        Span span = this.tracer.buildSpan("CONNECT").ignoreActiveSpan().withTag(Tags.SPAN_KIND.getKey(), "server").withTag(Tags.COMPONENT.getKey(), this.getTypeName()).withTag(TracingHelper.TAG_CLIENT_ID.getKey(), endpoint.clientIdentifier()).start();
        if (!endpoint.isCleanSession()) {
            span.log("ignoring client's intent to resume existing session");
        }
        if (endpoint.will() != null) {
            span.log("ignoring client's last will");
        }
        AtomicBoolean connectionClosedPrematurely = new AtomicBoolean(false);
        endpoint.closeHandler(v -> {
            this.log.debug("client closed connection before CONNACK got sent to client [client-id: {}]", (Object)endpoint.clientIdentifier());
            TracingHelper.logError((Span)span, (String)"client closed connection");
            connectionClosedPrematurely.set(true);
        });
        this.handleConnectionRequest(endpoint, connectionClosedPrematurely, span).compose(authenticatedDevice -> this.handleConnectionRequestResult(endpoint, (Device)authenticatedDevice, connectionClosedPrematurely, span)).onSuccess(authenticatedDevice -> {
            endpoint.accept(false);
            span.log("connection accepted");
            this.metrics.reportConnectionAttempt(MetricsTags.ConnectionAttemptOutcome.SUCCEEDED, Optional.ofNullable(authenticatedDevice).map(Device::getTenantId).orElse(null), cipherSuite);
        }).onFailure(t -> {
            if (!connectionClosedPrematurely.get()) {
                this.log.debug("rejecting connection request from client [clientId: {}], cause:", (Object)endpoint.clientIdentifier(), t);
                MqttConnectReturnCode code = AbstractVertxBasedMqttProtocolAdapter.getConnectReturnCode(t);
                this.rejectConnectionRequest(endpoint, code, span);
                TracingHelper.logError((Span)span, (Throwable)t);
            }
            this.metrics.reportConnectionAttempt(AbstractProtocolAdapterBase.getOutcome((Throwable)t), t instanceof ServiceInvocationException ? ((ServiceInvocationException)t).getTenant() : null, cipherSuite);
        }).onComplete(result -> span.finish());
    }

    private Future<Device> handleConnectionRequest(MqttEndpoint endpoint, AtomicBoolean connectionClosedPrematurely, Span currentSpan) {
        if (this.getConnectionLimitManager() != null && this.getConnectionLimitManager().isLimitExceeded()) {
            currentSpan.log("adapter's connection limit exceeded");
            return Future.failedFuture((Throwable)new AdapterConnectionsExceededException(null, "adapter's connection limit is exceeded", null));
        }
        if (((MqttProtocolAdapterProperties)((Object)this.getConfig())).isAuthenticationRequired()) {
            return this.handleEndpointConnectionWithAuthentication(endpoint, connectionClosedPrematurely, currentSpan);
        }
        return this.handleEndpointConnectionWithoutAuthentication(endpoint);
    }

    private Future<Device> handleConnectionRequestResult(MqttEndpoint endpoint, Device authenticatedDevice, AtomicBoolean connectionClosedPrematurely, Span currentSpan) {
        TracingHelper.TAG_AUTHENTICATED.set(currentSpan, Boolean.valueOf(authenticatedDevice != null));
        if (authenticatedDevice != null) {
            TracingHelper.setDeviceTags((Span)currentSpan, (String)authenticatedDevice.getTenantId(), (String)authenticatedDevice.getDeviceId());
        }
        Promise result = Promise.promise();
        if (connectionClosedPrematurely.get()) {
            this.log.debug("abort handling connection request, connection already closed [clientId: {}]", (Object)endpoint.clientIdentifier());
            currentSpan.log("abort connection request processing, connection already closed");
            result.fail("connection already closed");
        } else {
            this.sendConnectedEvent(endpoint.clientIdentifier(), authenticatedDevice, currentSpan.context()).map((Object)authenticatedDevice).recover(t -> {
                this.log.warn("failed to send connection event for client [clientId: {}]", (Object)endpoint.clientIdentifier(), t);
                return Future.failedFuture((Throwable)new ServerErrorException(503, "failed to send connection event", t));
            }).onComplete((Handler)result);
        }
        return result.future();
    }

    private void rejectConnectionRequest(MqttEndpoint endpoint, MqttConnectReturnCode errorCode, Span currentSpan) {
        try {
            endpoint.reject(errorCode);
            currentSpan.log("connection request rejected");
        }
        catch (IllegalStateException ex) {
            if ("MQTT endpoint is closed".equals(ex.getMessage())) {
                this.log.debug("skipped rejecting connection request, connection already closed [clientId: {}]", (Object)endpoint.clientIdentifier());
                currentSpan.log("skipped rejecting connection request, connection already closed");
            }
            this.log.debug("could not reject connection request from client [clientId: {}]: {}", (Object)endpoint.clientIdentifier(), (Object)ex.toString());
            TracingHelper.logError((Span)currentSpan, (String)"could not reject connection request from client", (Throwable)ex);
        }
    }

    private Future<Device> handleEndpointConnectionWithoutAuthentication(MqttEndpoint endpoint) {
        this.registerEndpointHandlers(endpoint, null, OptionalInt.empty());
        this.metrics.incrementUnauthenticatedConnections();
        this.log.debug("unauthenticated device [clientId: {}] connected", (Object)endpoint.clientIdentifier());
        return Future.succeededFuture(null);
    }

    private Future<Device> handleEndpointConnectionWithAuthentication(MqttEndpoint endpoint, AtomicBoolean connectionClosedPrematurely, Span currentSpan) {
        MqttConnectContext context = MqttConnectContext.fromConnectPacket(endpoint, currentSpan);
        Future authAttempt = this.authHandler.authenticateDevice((ExecutionContext)context);
        return authAttempt.compose(authenticatedDevice -> CompositeFuture.all((Future)this.getTenantConfiguration(authenticatedDevice.getTenantId(), currentSpan.context()).compose(tenantObj -> CompositeFuture.all((Future)this.isAdapterEnabled((TenantObject)tenantObj), (Future)this.checkConnectionLimit((TenantObject)tenantObj, currentSpan.context()))), (Future)this.checkDeviceRegistration((Device)authenticatedDevice, currentSpan.context())).map(authenticatedDevice)).compose(authenticatedDevice -> {
            if (connectionClosedPrematurely.get()) {
                return Future.failedFuture((String)"connection already closed");
            }
            this.registerEndpointHandlers(endpoint, (Device)authenticatedDevice, context.getTraceSamplingPriority());
            this.metrics.incrementConnections(authenticatedDevice.getTenantId());
            return Future.succeededFuture((Object)authenticatedDevice);
        }).recover(t -> {
            if (authAttempt.failed()) {
                this.log.debug("device authentication or early stage checks failed", t);
            } else {
                this.log.debug("cannot establish connection with device [tenant-id: {}, device-id: {}]", new Object[]{((DeviceUser)authAttempt.result()).getTenantId(), ((DeviceUser)authAttempt.result()).getDeviceId(), t});
            }
            return Future.failedFuture((Throwable)t);
        });
    }

    public final Future<Void> uploadMessage(MqttContext ctx) {
        Objects.requireNonNull(ctx);
        this.verifyTenantAndDeviceContextIsSet(ctx);
        switch (ctx.endpoint()) {
            case TELEMETRY: {
                return this.uploadTelemetryMessage(ctx);
            }
            case EVENT: {
                return this.uploadEventMessage(ctx);
            }
            case COMMAND: {
                return this.uploadCommandResponseMessage(ctx);
            }
        }
        return Future.failedFuture((Throwable)new ClientErrorException(400, "unsupported endpoint"));
    }

    private void verifyTenantAndDeviceContextIsSet(MqttContext ctx) {
        if (ctx.tenant() == null || ctx.deviceId() == null) {
            throw new IllegalArgumentException("context does not contain tenant/device information");
        }
    }

    public final Future<Void> uploadTelemetryMessage(MqttContext ctx) {
        Objects.requireNonNull(ctx);
        if (ctx.endpoint() != MetricsTags.EndpointType.TELEMETRY) {
            throw new IllegalArgumentException("context does not contain telemetry message but " + ctx.endpoint().getCanonicalName());
        }
        this.verifyTenantAndDeviceContextIsSet(ctx);
        Buffer payload = ctx.payload();
        MetricsTags.QoS qos = MetricsTags.QoS.from((int)ctx.qosLevel().value());
        Future tenantTracker = this.getTenantConfiguration(ctx.tenant(), ctx.getTracingContext());
        return tenantTracker.compose(tenantObject -> this.uploadMessage(ctx, (TenantObject)tenantObject, ctx.deviceId(), payload, ctx.endpoint())).compose(success -> {
            this.metrics.reportTelemetry(ctx.endpoint(), ctx.tenant(), (TenantObject)tenantTracker.result(), MetricsTags.ProcessingOutcome.FORWARDED, qos, payload.length(), ctx.getTimer());
            return Future.succeededFuture();
        }).recover(t -> {
            this.metrics.reportTelemetry(ctx.endpoint(), ctx.tenant(), (TenantObject)tenantTracker.result(), MetricsTags.ProcessingOutcome.from((Throwable)t), qos, payload.length(), ctx.getTimer());
            return Future.failedFuture((Throwable)t);
        });
    }

    public final Future<Void> uploadEventMessage(MqttContext ctx) {
        Objects.requireNonNull(ctx);
        if (ctx.endpoint() != MetricsTags.EndpointType.EVENT) {
            throw new IllegalArgumentException("context does not contain event but " + ctx.endpoint().getCanonicalName());
        }
        this.verifyTenantAndDeviceContextIsSet(ctx);
        Buffer payload = ctx.payload();
        MetricsTags.QoS qos = MetricsTags.QoS.from((int)ctx.qosLevel().value());
        Future tenantTracker = this.getTenantConfiguration(ctx.tenant(), ctx.getTracingContext());
        return tenantTracker.compose(tenantObject -> this.uploadMessage(ctx, (TenantObject)tenantObject, ctx.deviceId(), payload, ctx.endpoint())).compose(success -> {
            this.metrics.reportTelemetry(ctx.endpoint(), ctx.tenant(), (TenantObject)tenantTracker.result(), MetricsTags.ProcessingOutcome.FORWARDED, qos, payload.length(), ctx.getTimer());
            return Future.succeededFuture();
        }).recover(t -> {
            this.metrics.reportTelemetry(ctx.endpoint(), ctx.tenant(), (TenantObject)tenantTracker.result(), MetricsTags.ProcessingOutcome.from((Throwable)t), qos, payload.length(), ctx.getTimer());
            return Future.failedFuture((Throwable)t);
        });
    }

    public final Future<Void> uploadCommandResponseMessage(MqttContext ctx) {
        Future commandResponseTracker;
        Objects.requireNonNull(ctx);
        if (ctx.endpoint() != MetricsTags.EndpointType.COMMAND) {
            throw new IllegalArgumentException("context does not contain command response message but " + ctx.endpoint().getCanonicalName());
        }
        this.verifyTenantAndDeviceContextIsSet(ctx);
        String[] addressPath = ctx.topic().getResourcePath();
        String tenantId = ctx.tenant();
        String deviceId = ctx.deviceId();
        Integer status = null;
        String reqId = null;
        if (addressPath.length <= 5) {
            commandResponseTracker = Future.failedFuture((Throwable)new ClientErrorException(400, "command response topic has too few segments"));
        } else {
            CommandResponse commandResponse;
            try {
                status = Integer.parseInt(addressPath[5]);
            }
            catch (NumberFormatException e) {
                this.log.trace("got invalid status code [{}] [tenant-id: {}, device-id: {}]", new Object[]{addressPath[5], tenantId, deviceId});
            }
            commandResponseTracker = status != null ? ((commandResponse = CommandResponse.fromRequestId((String)(reqId = addressPath[4]), (String)tenantId, (String)deviceId, (Buffer)ctx.payload(), (String)ctx.contentType(), (Integer)status)) != null ? Future.succeededFuture((Object)commandResponse) : Future.failedFuture((Throwable)new ClientErrorException(400, "command response topic contains invalid data"))) : Future.failedFuture((Throwable)new ClientErrorException(400, "invalid status code"));
        }
        Span currentSpan = TracingHelper.buildChildSpan((Tracer)this.tracer, (SpanContext)ctx.getTracingContext(), (String)"upload Command response", (String)this.getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").withTag((Tag)TracingHelper.TAG_TENANT_ID, (Object)tenantId).withTag((Tag)TracingHelper.TAG_DEVICE_ID, (Object)deviceId).withTag("hono-cmd-status", (Number)status).withTag("hono-cmd-req-id", reqId).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), ctx.authenticatedDevice() != null).start();
        int payloadSize = ctx.payload().length();
        Future tenantTracker = this.getTenantConfiguration(tenantId, ctx.getTracingContext());
        return CompositeFuture.all((Future)tenantTracker, (Future)commandResponseTracker).compose(success -> {
            Future deviceRegistrationTracker = this.getRegistrationAssertion(tenantId, deviceId, ctx.authenticatedDevice(), currentSpan.context());
            Future tenantValidationTracker = CompositeFuture.all((Future)this.isAdapterEnabled((TenantObject)tenantTracker.result()), (Future)this.checkMessageLimit((TenantObject)tenantTracker.result(), payloadSize, currentSpan.context())).mapEmpty();
            return CompositeFuture.all((Future)deviceRegistrationTracker, (Future)tenantValidationTracker).compose(ok -> this.sendCommandResponse((TenantObject)tenantTracker.result(), (RegistrationAssertion)deviceRegistrationTracker.result(), (CommandResponse)commandResponseTracker.result(), currentSpan.context()));
        }).compose(delivery -> {
            this.log.trace("successfully forwarded command response from device [tenant-id: {}, device-id: {}]", (Object)tenantId, (Object)deviceId);
            this.metrics.reportCommand(MetricsTags.Direction.RESPONSE, tenantId, (TenantObject)tenantTracker.result(), MetricsTags.ProcessingOutcome.FORWARDED, payloadSize, ctx.getTimer());
            if (ctx.isAtLeastOnce() && ctx.deviceEndpoint().isConnected()) {
                currentSpan.log(EVENT_SENDING_PUBACK);
                ctx.acknowledge();
            }
            currentSpan.finish();
            return Future.succeededFuture();
        }).recover(t -> {
            TracingHelper.logError((Span)currentSpan, (Throwable)t);
            currentSpan.finish();
            this.metrics.reportCommand(MetricsTags.Direction.RESPONSE, tenantId, (TenantObject)tenantTracker.result(), MetricsTags.ProcessingOutcome.from((Throwable)t), payloadSize, ctx.getTimer());
            return Future.failedFuture((Throwable)t);
        });
    }

    private Future<Void> uploadMessage(MqttContext ctx, TenantObject tenantObject, String deviceId, Buffer payload, MetricsTags.EndpointType endpoint) {
        if (!AbstractVertxBasedMqttProtocolAdapter.isPayloadOfIndicatedType((Buffer)payload, (String)ctx.contentType())) {
            return Future.failedFuture((Throwable)new ClientErrorException(400, String.format("Content-Type %s does not match payload", ctx.contentType())));
        }
        Span currentSpan = TracingHelper.buildChildSpan((Tracer)this.tracer, (SpanContext)ctx.getTracingContext(), (String)("upload " + endpoint), (String)this.getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").withTag((Tag)TracingHelper.TAG_TENANT_ID, (Object)tenantObject.getTenantId()).withTag((Tag)TracingHelper.TAG_DEVICE_ID, (Object)deviceId).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), ctx.authenticatedDevice() != null).start();
        Future tokenTracker = this.getRegistrationAssertion(tenantObject.getTenantId(), deviceId, ctx.authenticatedDevice(), currentSpan.context());
        Future tenantValidationTracker = CompositeFuture.all((Future)this.isAdapterEnabled(tenantObject), (Future)this.checkMessageLimit(tenantObject, payload.length(), currentSpan.context())).map((Object)tenantObject);
        return CompositeFuture.all((Future)tokenTracker, (Future)tenantValidationTracker).compose(ok -> {
            Map props = this.getDownstreamMessageProperties((TelemetryExecutionContext)ctx);
            props.put("qos", ctx.getRequestedQos().ordinal());
            AbstractVertxBasedMqttProtocolAdapter.addRetainAnnotation(ctx, props, currentSpan);
            this.customizeDownstreamMessageProperties(props, ctx);
            if (endpoint == MetricsTags.EndpointType.EVENT) {
                return this.getEventSender(tenantObject).sendEvent(tenantObject, (RegistrationAssertion)tokenTracker.result(), ctx.contentType(), payload, props, currentSpan.context());
            }
            return this.getTelemetrySender(tenantObject).sendTelemetry(tenantObject, (RegistrationAssertion)tokenTracker.result(), ctx.getRequestedQos(), ctx.contentType(), payload, props, currentSpan.context());
        }).map(ok -> {
            this.log.trace("successfully processed message [topic: {}, QoS: {}] from device [tenantId: {}, deviceId: {}]", new Object[]{ctx.getOrigAddress(), ctx.qosLevel(), tenantObject.getTenantId(), deviceId});
            if (ctx.isAtLeastOnce() && ctx.deviceEndpoint().isConnected()) {
                currentSpan.log(EVENT_SENDING_PUBACK);
                ctx.acknowledge();
            }
            currentSpan.finish();
            return ok;
        }).recover(t -> {
            if (t instanceof ClientErrorException) {
                ClientErrorException e = (ClientErrorException)t;
                this.log.debug("cannot process message [endpoint: {}] from device [tenantId: {}, deviceId: {}]: {} - {}", new Object[]{endpoint, tenantObject.getTenantId(), deviceId, e.getErrorCode(), e.getMessage()});
            } else {
                this.log.debug("cannot process message [endpoint: {}] from device [tenantId: {}, deviceId: {}]", new Object[]{endpoint, tenantObject.getTenantId(), deviceId, t});
            }
            TracingHelper.logError((Span)currentSpan, (Throwable)t);
            currentSpan.finish();
            return Future.failedFuture((Throwable)t);
        });
    }

    private void onBeforeEndpointClose(MqttDeviceEndpoint endpoint) {
        if (endpoint.getAuthenticatedDevice() != null) {
            this.connectedAuthenticatedDeviceEndpoints.remove(endpoint);
        }
    }

    protected void onClose(MqttEndpoint endpoint) {
    }

    protected abstract Future<Void> onPublishedMessage(MqttContext var1);

    protected void customizeDownstreamMessageProperties(Map<String, Object> messageProperties, MqttContext ctx) {
    }

    protected void onMessageSent(MqttContext ctx) {
    }

    protected void onMessageUndeliverable(MqttContext ctx) {
    }

    private static void addRetainAnnotation(MqttContext context, Map<String, Object> props, Span currentSpan) {
        if (context.isRetain()) {
            currentSpan.log("device wants to retain message");
            props.put("x-opt-retain", Boolean.TRUE);
        }
    }

    private void registerEndpointHandlers(MqttEndpoint endpoint, Device authenticatedDevice, OptionalInt traceSamplingPriority) {
        MqttDeviceEndpoint deviceEndpoint = this.createMqttDeviceEndpoint(endpoint, authenticatedDevice, traceSamplingPriority);
        deviceEndpoint.registerHandlers();
    }

    final MqttDeviceEndpoint createMqttDeviceEndpoint(MqttEndpoint endpoint, Device authenticatedDevice, OptionalInt traceSamplingPriority) {
        MqttDeviceEndpoint mqttDeviceEndpoint = new MqttDeviceEndpoint(endpoint, authenticatedDevice, traceSamplingPriority);
        if (authenticatedDevice != null) {
            this.connectedAuthenticatedDeviceEndpoints.add(mqttDeviceEndpoint);
        }
        return mqttDeviceEndpoint;
    }

    private static MqttConnectReturnCode getConnectReturnCode(Throwable e) {
        if (e instanceof MqttConnectionException) {
            return ((MqttConnectionException)e).code();
        }
        if (e instanceof AuthorizationException) {
            if (e instanceof AdapterConnectionsExceededException) {
                return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE;
            }
            return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
        }
        if (e instanceof ServiceInvocationException) {
            switch (((ServiceInvocationException)e).getErrorCode()) {
                case 401: 
                case 404: {
                    return MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
                }
                case 503: {
                    return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE;
                }
            }
            return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
        }
        return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
    }

    protected Future<Buffer> getCommandPayload(CommandContext ctx) {
        return Future.succeededFuture((Object)ctx.getCommand().getPayload());
    }

    static /* synthetic */ Vertx access$000(AbstractVertxBasedMqttProtocolAdapter x0) {
        return x0.vertx;
    }

    public class MqttDeviceEndpoint {
        private final MqttEndpoint endpoint;
        private final Device authenticatedDevice;
        private final OptionalInt traceSamplingPriority;
        private final Map<Subscription.Key, Pair<CommandSubscription, CommandConsumer>> commandSubscriptions = new ConcurrentHashMap<Subscription.Key, Pair<CommandSubscription, CommandConsumer>>();
        private final Map<Subscription.Key, ErrorSubscription> errorSubscriptions = new HashMap<Subscription.Key, ErrorSubscription>();
        private final PendingPubAcks pendingAcks = new PendingPubAcks(AbstractVertxBasedMqttProtocolAdapter.access$000(AbstractVertxBasedMqttProtocolAdapter.this));
        private Throwable protocolLevelException;

        public MqttDeviceEndpoint(MqttEndpoint endpoint, Device authenticatedDevice, OptionalInt traceSamplingPriority) {
            this.endpoint = Objects.requireNonNull(endpoint);
            this.authenticatedDevice = authenticatedDevice;
            this.traceSamplingPriority = Objects.requireNonNull(traceSamplingPriority);
        }

        protected final Device getAuthenticatedDevice() {
            return this.authenticatedDevice;
        }

        protected final void registerHandlers() {
            this.endpoint.publishHandler(this::handlePublishedMessage);
            this.endpoint.publishAcknowledgeHandler(this::handlePubAck);
            this.endpoint.subscribeHandler(this::onSubscribe);
            this.endpoint.unsubscribeHandler(this::onUnsubscribe);
            this.endpoint.closeHandler(v -> this.onClose());
            this.endpoint.exceptionHandler(this::onProtocolLevelError);
        }

        private void onProtocolLevelError(Throwable throwable) {
            if (this.authenticatedDevice == null) {
                AbstractVertxBasedMqttProtocolAdapter.this.log.debug("protocol-level exception occurred [client ID: {}]", (Object)this.endpoint.clientIdentifier(), (Object)throwable);
            } else {
                AbstractVertxBasedMqttProtocolAdapter.this.log.debug("protocol-level exception occurred [tenant-id: {}, device-id: {}, client ID: {}]", new Object[]{this.authenticatedDevice.getTenantId(), this.authenticatedDevice.getDeviceId(), this.endpoint.clientIdentifier(), throwable});
            }
            this.protocolLevelException = throwable;
        }

        protected final void handlePublishedMessage(MqttPublishMessage message) {
            Objects.requireNonNull(message);
            SpanContext spanContext = Optional.ofNullable(message.topicName()).flatMap(topic -> Optional.ofNullable(PropertyBag.fromTopic(message.topicName()))).map(propertyBag -> TracingHelper.extractSpanContext((Tracer)AbstractVertxBasedMqttProtocolAdapter.this.tracer, propertyBag::getPropertiesIterator)).orElse(null);
            Span span = this.newChildSpan(spanContext, "PUBLISH");
            span.setTag(Tags.MESSAGE_BUS_DESTINATION.getKey(), message.topicName());
            span.setTag(TracingHelper.TAG_QOS.getKey(), message.qosLevel().toString());
            this.traceSamplingPriority.ifPresent(prio -> TracingHelper.setTraceSamplingPriority((Span)span, (int)prio));
            MqttContext context = MqttContext.fromPublishPacket(message, this.endpoint, span, this.authenticatedDevice);
            context.setTimer(AbstractVertxBasedMqttProtocolAdapter.this.getMetrics().startTimer());
            Future<Void> spanPreparationFuture = this.authenticatedDevice == null ? this.applyTraceSamplingPriorityForTopicTenant(context.topic(), span) : Future.succeededFuture();
            spanPreparationFuture.compose(v -> this.checkTopic(context)).compose(ok -> AbstractVertxBasedMqttProtocolAdapter.this.onPublishedMessage(context)).onSuccess(ok -> {
                Tags.HTTP_STATUS.set(span, Integer.valueOf(202));
                AbstractVertxBasedMqttProtocolAdapter.this.onMessageSent(context);
                span.finish();
            }).onFailure(error -> this.handlePublishedMessageError(context, (Throwable)error, span));
        }

        private void handlePublishedMessageError(MqttContext context, Throwable error, Span span) {
            ErrorSubscription errorSubscription = this.getErrorSubscription(context);
            Future errorSentIfNeededFuture = Optional.ofNullable(errorSubscription).map(v -> this.publishError(errorSubscription, context, error, span.context())).orElseGet(Future::succeededFuture);
            Tags.HTTP_STATUS.set(span, Integer.valueOf(ServiceInvocationException.extractStatusCode((Throwable)error)));
            TracingHelper.logError((Span)span, (Throwable)error);
            if (!(error instanceof ClientErrorException)) {
                AbstractVertxBasedMqttProtocolAdapter.this.onMessageUndeliverable(context);
            }
            errorSentIfNeededFuture.compose(ok -> AbstractVertxBasedMqttProtocolAdapter.this.isTerminalError(error, context.deviceId(), this.authenticatedDevice, span.context())).onComplete(ar -> {
                boolean isTerminalError = ar.succeeded() ? (Boolean)ar.result() : false;
                MqttContext.ErrorHandlingMode errorHandlingMode = context.getErrorHandlingMode(errorSubscription != null);
                if (errorHandlingMode == MqttContext.ErrorHandlingMode.DISCONNECT || isTerminalError) {
                    if (context.deviceEndpoint().isConnected()) {
                        span.log("closing connection to device");
                        context.deviceEndpoint().close();
                    }
                } else if (context.isAtLeastOnce()) {
                    if (errorHandlingMode == MqttContext.ErrorHandlingMode.SKIP_ACK) {
                        span.log("skipped sending PUBACK");
                    } else if (context.deviceEndpoint().isConnected()) {
                        span.log(AbstractVertxBasedMqttProtocolAdapter.EVENT_SENDING_PUBACK);
                        context.acknowledge();
                    }
                }
                span.finish();
            });
        }

        protected final Future<Void> applyTraceSamplingPriorityForTopicTenant(ResourceIdentifier topic, Span span) {
            Objects.requireNonNull(span);
            if (topic == null || topic.getTenantId() == null) {
                return Future.succeededFuture();
            }
            return AbstractVertxBasedMqttProtocolAdapter.this.getTenantConfiguration(topic.getTenantId(), span.context()).map(tenantObject -> {
                TracingHelper.setDeviceTags((Span)span, (String)tenantObject.getTenantId(), null);
                TenantTraceSamplingHelper.applyTraceSamplingPriority((TenantObject)tenantObject, null, (Span)span);
                return null;
            }).recover(t -> Future.succeededFuture());
        }

        private Future<Void> checkTopic(MqttContext context) {
            if (context.topic() == null) {
                return Future.failedFuture((Throwable)new ClientErrorException(400, "malformed topic name"));
            }
            return Future.succeededFuture();
        }

        private ErrorSubscription getErrorSubscription(MqttContext context) {
            ErrorSubscription result = null;
            if (context.tenant() != null && context.deviceId() != null) {
                result = this.errorSubscriptions.get(ErrorSubscription.getKey(context.tenant(), context.deviceId()));
            }
            if (result == null && context.authenticatedDevice() != null) {
                result = this.errorSubscriptions.get(ErrorSubscription.getKey(context.tenant(), context.authenticatedDevice().getDeviceId()));
            }
            return result;
        }

        public final void handlePubAck(Integer msgId) {
            Objects.requireNonNull(msgId);
            this.pendingAcks.handlePubAck(msgId);
        }

        protected final void onSubscribe(MqttSubscribeMessage subscribeMsg) {
            Objects.requireNonNull(subscribeMsg);
            HashMap uniqueSubscriptions = new HashMap();
            ArrayDeque subscriptionOutcomes = new ArrayDeque(subscribeMsg.topicSubscriptions().size());
            Span span = this.newSpan("SUBSCRIBE");
            LinkedList topicSubscriptions = new LinkedList(subscribeMsg.topicSubscriptions());
            topicSubscriptions.descendingIterator().forEachRemaining(mqttTopicSub -> {
                Future result;
                AbstractSubscription sub;
                AbstractSubscription abstractSubscription = sub = CommandSubscription.hasCommandEndpointPrefix(mqttTopicSub.topicName()) ? CommandSubscription.fromTopic(mqttTopicSub, this.authenticatedDevice) : ErrorSubscription.fromTopic(mqttTopicSub, this.authenticatedDevice);
                if (sub == null) {
                    TracingHelper.logError((Span)span, (String)String.format("unsupported topic filter [%s]", mqttTopicSub.topicName()));
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("cannot create subscription [filter: {}, requested QoS: {}]: unsupported topic filter", (Object)mqttTopicSub.topicName(), (Object)mqttTopicSub.qualityOfService());
                    result = Future.failedFuture((Throwable)new IllegalArgumentException("unsupported topic filter"));
                } else if (uniqueSubscriptions.containsKey(sub.getKey())) {
                    HashMap<String, String> items = new HashMap<String, String>(3);
                    items.put("event", "ignoring duplicate subscription");
                    items.put(AbstractVertxBasedMqttProtocolAdapter.LOG_FIELD_TOPIC_FILTER, sub.getTopic());
                    items.put("requested QoS", (String)sub.getQos());
                    span.log(items);
                    result = (Future)uniqueSubscriptions.get(sub.getKey());
                } else {
                    result = this.registerSubscription(sub, span);
                    uniqueSubscriptions.put(sub.getKey(), result);
                }
                subscriptionOutcomes.addFirst(result);
            });
            CompositeFuture.join(new ArrayList(subscriptionOutcomes)).onComplete(v -> {
                if (this.endpoint.isConnected()) {
                    List grantedQosLevels = subscriptionOutcomes.stream().map(future -> future.failed() ? MqttQoS.FAILURE : ((Subscription)future.result()).getQos()).collect(Collectors.toList());
                    this.endpoint.subscribeAcknowledge(subscribeMsg.messageId(), grantedQosLevels);
                    subscriptionOutcomes.stream().filter(subFuture -> subFuture.succeeded() && subFuture.result() instanceof CommandSubscription).map(subFuture -> ((CommandSubscription)subFuture.result()).getKey()).distinct().forEach(cmdSubKey -> AbstractVertxBasedMqttProtocolAdapter.this.sendConnectedTtdEvent(cmdSubKey.getTenant(), cmdSubKey.getDeviceId(), this.authenticatedDevice, span.context()));
                } else {
                    TracingHelper.logError((Span)span, (String)"skipped sending command subscription notification - endpoint not connected anymore");
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("skipped sending command subscription notification - endpoint not connected anymore [tenant-id: {}, device-id: {}]", (Object)Optional.ofNullable(this.authenticatedDevice).map(Device::getTenantId).orElse(""), (Object)Optional.ofNullable(this.authenticatedDevice).map(Device::getDeviceId).orElse(""));
                }
                span.finish();
            });
        }

        private Future<Subscription> registerSubscription(Subscription sub, Span span) {
            return sub instanceof CommandSubscription ? this.registerCommandSubscription((CommandSubscription)sub, span) : this.registerErrorSubscription((ErrorSubscription)sub, span);
        }

        private Future<Subscription> registerCommandSubscription(CommandSubscription cmdSub, Span span) {
            if (MqttQoS.EXACTLY_ONCE.equals((Object)cmdSub.getQos())) {
                TracingHelper.logError((Span)span, (String)String.format("topic filter [%s] with unsupported QoS 2", cmdSub.getTopic()));
                return Future.failedFuture((Throwable)new IllegalArgumentException("QoS 2 not supported for command subscription"));
            }
            return this.createCommandConsumer(cmdSub, span).map(consumer -> {
                cmdSub.logSubscribeSuccess(span);
                AbstractVertxBasedMqttProtocolAdapter.this.log.debug("created subscription [tenant: {}, device: {}, filter: {}, QoS: {}]", new Object[]{cmdSub.getTenant(), cmdSub.getDeviceId(), cmdSub.getTopic(), cmdSub.getQos()});
                Pair<CommandSubscription, CommandConsumer> existingCmdSub = this.commandSubscriptions.get(cmdSub.getKey());
                if (existingCmdSub != null) {
                    span.log(String.format("subscription replaces previous subscription [QoS %s, filter %s]", ((CommandSubscription)existingCmdSub.one()).getQos(), ((CommandSubscription)existingCmdSub.one()).getTopic()));
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("previous subscription [QoS {}, filter {}] is getting replaced", (Object)((CommandSubscription)existingCmdSub.one()).getQos(), (Object)((CommandSubscription)existingCmdSub.one()).getTopic());
                }
                this.commandSubscriptions.put(cmdSub.getKey(), (Pair<CommandSubscription, CommandConsumer>)Pair.of((Object)cmdSub, (Object)consumer));
                return cmdSub;
            }).recover(t -> {
                cmdSub.logSubscribeFailure(span, (Throwable)t);
                AbstractVertxBasedMqttProtocolAdapter.this.log.debug("cannot create subscription [tenant: {}, device: {}, filter: {}, requested QoS: {}]", new Object[]{cmdSub.getTenant(), cmdSub.getDeviceId(), cmdSub.getTopic(), cmdSub.getQos(), t});
                return Future.failedFuture((Throwable)t);
            });
        }

        private Future<Subscription> registerErrorSubscription(ErrorSubscription errorSub, Span span) {
            if (!MqttQoS.AT_MOST_ONCE.equals((Object)errorSub.getQos())) {
                TracingHelper.logError((Span)span, (String)String.format("topic filter [%s] with unsupported QoS %d", errorSub.getTopic(), errorSub.getQos().value()));
                return Future.failedFuture((Throwable)new IllegalArgumentException(String.format("QoS %d not supported for error subscription", errorSub.getQos().value())));
            }
            return Future.succeededFuture().compose(v -> {
                if (errorSub.isGatewaySubscriptionForSpecificDevice()) {
                    return AbstractVertxBasedMqttProtocolAdapter.this.getRegistrationAssertion(this.authenticatedDevice.getTenantId(), errorSub.getDeviceId(), this.authenticatedDevice, span.context());
                }
                return Future.succeededFuture();
            }).recover(t -> {
                errorSub.logSubscribeFailure(span, (Throwable)t);
                AbstractVertxBasedMqttProtocolAdapter.this.log.debug("cannot create subscription [tenant: {}, device: {}, filter: {}, requested QoS: {}]", new Object[]{errorSub.getTenant(), errorSub.getDeviceId(), errorSub.getTopic(), errorSub.getQos(), t});
                return Future.failedFuture((Throwable)t);
            }).compose(v -> {
                this.errorSubscriptions.put(errorSub.getKey(), errorSub);
                errorSub.logSubscribeSuccess(span);
                AbstractVertxBasedMqttProtocolAdapter.this.log.debug("created subscription [tenant: {}, device: {}, filter: {}, QoS: {}]", new Object[]{errorSub.getTenant(), errorSub.getDeviceId(), errorSub.getTopic(), errorSub.getQos()});
                return Future.succeededFuture((Object)errorSub);
            });
        }

        protected final Future<Void> publishError(ErrorSubscription subscription, MqttContext context, Throwable error, SpanContext spanContext) {
            Objects.requireNonNull(subscription);
            Objects.requireNonNull(context);
            Objects.requireNonNull(error);
            Span span = this.newChildSpan(spanContext, "publish error to device");
            int errorCode = ServiceInvocationException.extractStatusCode((Throwable)error);
            String correlationId = Optional.ofNullable(context.correlationId()).orElse("-1");
            String publishTopic = subscription.getErrorPublishTopic(context, errorCode);
            Tags.MESSAGE_BUS_DESTINATION.set(span, publishTopic);
            TracingHelper.TAG_QOS.set(span, subscription.getQos().name());
            JsonObject errorJson = new JsonObject();
            errorJson.put("code", (Object)errorCode);
            errorJson.put("message", (Object)ServiceInvocationException.getErrorMessageForExternalClient((Throwable)error));
            errorJson.put("timestamp", (Object)ZonedDateTime.now(ZoneOffset.UTC).truncatedTo(ChronoUnit.MILLIS).format(DateTimeFormatter.ISO_INSTANT));
            errorJson.put("correlation-id", (Object)correlationId);
            String targetInfo = this.authenticatedDevice != null && !this.authenticatedDevice.getDeviceId().equals(context.deviceId()) ? String.format("gateway [%s], device [%s]", this.authenticatedDevice.getDeviceId(), context.deviceId()) : String.format("device [%s]", context.deviceId());
            return this.publish(publishTopic, errorJson.toBuffer(), subscription.getQos()).onSuccess(msgId -> {
                AbstractVertxBasedMqttProtocolAdapter.this.log.debug("published error message [packet-id: {}] to {} [tenant-id: {}, MQTT client-id: {}, QoS: {}, topic: {}]", new Object[]{msgId, targetInfo, subscription.getTenant(), this.endpoint.clientIdentifier(), subscription.getQos(), publishTopic});
                span.log((String)(subscription.getQos().value() > 0 ? "published error message, packet-id: " + msgId : "published error message"));
            }).onFailure(thr -> {
                AbstractVertxBasedMqttProtocolAdapter.this.log.debug("error publishing error message to {} [tenant-id: {}, MQTT client-id: {}, QoS: {}, topic: {}]", new Object[]{targetInfo, subscription.getTenant(), this.endpoint.clientIdentifier(), subscription.getQos(), publishTopic, thr});
                TracingHelper.logError((Span)span, (String)"failed to publish error message", (Throwable)thr);
            }).map(msgId -> null).onComplete(ar -> span.finish());
        }

        private Future<CommandConsumer> createCommandConsumer(CommandSubscription subscription, Span span) {
            Function<CommandContext, Future> commandHandler = commandContext -> {
                Tags.COMPONENT.set(commandContext.getTracingSpan(), AbstractVertxBasedMqttProtocolAdapter.this.getTypeName());
                TracingHelper.TAG_CLIENT_ID.set(commandContext.getTracingSpan(), this.endpoint.clientIdentifier());
                Timer.Sample timer = AbstractVertxBasedMqttProtocolAdapter.this.metrics.startTimer();
                AbstractVertxBasedMqttProtocolAdapter.addMicrometerSample((CommandContext)commandContext, (Timer.Sample)timer);
                Command command = commandContext.getCommand();
                Future tenantTracker = AbstractVertxBasedMqttProtocolAdapter.this.getTenantConfiguration(subscription.getTenant(), commandContext.getTracingContext());
                return tenantTracker.compose(tenantObject -> {
                    if (!command.isValid()) {
                        return Future.failedFuture((Throwable)new ClientErrorException(400, "malformed command message"));
                    }
                    return AbstractVertxBasedMqttProtocolAdapter.this.checkMessageLimit((TenantObject)tenantObject, command.getPayloadSize(), commandContext.getTracingContext());
                }).compose(success -> {
                    if (subscription.isGatewaySubscriptionForSpecificDevice()) {
                        return AbstractVertxBasedMqttProtocolAdapter.this.getRegistrationAssertion(this.authenticatedDevice.getTenantId(), subscription.getDeviceId(), this.authenticatedDevice, commandContext.getTracingContext());
                    }
                    return Future.succeededFuture();
                }).onFailure(t -> {
                    if (t instanceof ClientErrorException) {
                        commandContext.reject(t);
                    } else {
                        commandContext.release(t);
                    }
                    AbstractVertxBasedMqttProtocolAdapter.this.metrics.reportCommand(command.isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, subscription.getTenant(), (TenantObject)tenantTracker.result(), MetricsTags.ProcessingOutcome.from((Throwable)t), command.getPayloadSize(), timer);
                }).compose(success -> this.onCommandReceived((TenantObject)tenantTracker.result(), subscription, (CommandContext)commandContext));
            };
            Future tokenTracker = Optional.ofNullable(this.authenticatedDevice).map(v -> AbstractVertxBasedMqttProtocolAdapter.this.getRegistrationAssertion(this.authenticatedDevice.getTenantId(), subscription.getDeviceId(), this.authenticatedDevice, span.context())).orElseGet(Future::succeededFuture);
            if (subscription.isGatewaySubscriptionForSpecificDevice()) {
                return tokenTracker.compose(v -> AbstractVertxBasedMqttProtocolAdapter.this.getCommandConsumerFactory().createCommandConsumer(subscription.getTenant(), subscription.getDeviceId(), subscription.getAuthenticatedDeviceId(), commandHandler, null, span.context()));
            }
            return tokenTracker.compose(v -> AbstractVertxBasedMqttProtocolAdapter.this.getCommandConsumerFactory().createCommandConsumer(subscription.getTenant(), subscription.getDeviceId(), commandHandler, null, span.context()));
        }

        protected final Future<Void> onCommandReceived(TenantObject tenantObject, CommandSubscription subscription, CommandContext commandContext) {
            Objects.requireNonNull(tenantObject);
            Objects.requireNonNull(subscription);
            Objects.requireNonNull(commandContext);
            Command command = commandContext.getCommand();
            String publishTopic = subscription.getCommandPublishTopic(command);
            Tags.MESSAGE_BUS_DESTINATION.set(commandContext.getTracingSpan(), publishTopic);
            TracingHelper.TAG_QOS.set(commandContext.getTracingSpan(), subscription.getQos().name());
            String targetInfo = command.isTargetedAtGateway() ? String.format("gateway [%s], device [%s]", command.getGatewayId(), command.getDeviceId()) : String.format("device [%s]", command.getDeviceId());
            return AbstractVertxBasedMqttProtocolAdapter.this.getCommandPayload(commandContext).map(mappedPayload -> Optional.ofNullable(mappedPayload).orElseGet(Buffer::buffer)).onSuccess(payload -> this.publish(publishTopic, (Buffer)payload, subscription.getQos()).onSuccess(msgId -> {
                AbstractVertxBasedMqttProtocolAdapter.this.log.debug("published command [packet-id: {}] to {} [tenant-id: {}, MQTT client-id: {}, QoS: {}, topic: {}]", new Object[]{msgId, targetInfo, subscription.getTenant(), this.endpoint.clientIdentifier(), subscription.getQos(), publishTopic});
                commandContext.getTracingSpan().log((String)(subscription.getQos().value() > 0 ? "published command, packet-id: " + msgId : "published command"));
                this.afterCommandPublished((Integer)msgId, commandContext, tenantObject, subscription);
            }).onFailure(thr -> {
                AbstractVertxBasedMqttProtocolAdapter.this.log.debug("error publishing command to {} [tenant-id: {}, MQTT client-id: {}, QoS: {}, topic: {}]", new Object[]{targetInfo, subscription.getTenant(), this.endpoint.clientIdentifier(), subscription.getQos(), publishTopic, thr});
                TracingHelper.logError((Span)commandContext.getTracingSpan(), (String)"failed to publish command", (Throwable)thr);
                this.reportPublishedCommand(tenantObject, subscription, commandContext, MetricsTags.ProcessingOutcome.from((Throwable)thr));
                commandContext.release(thr);
            })).onFailure(t -> {
                AbstractVertxBasedMqttProtocolAdapter.this.log.debug("error mapping command [tenant-id: {}, MQTT client-id: {}, QoS: {}]", new Object[]{subscription.getTenant(), this.endpoint.clientIdentifier(), subscription.getQos(), t});
                TracingHelper.logError((Span)commandContext.getTracingSpan(), (String)"failed to map command", (Throwable)t);
                this.reportPublishedCommand(tenantObject, subscription, commandContext, MetricsTags.ProcessingOutcome.from((Throwable)t));
                commandContext.release(t);
            }).mapEmpty();
        }

        private Future<Integer> publish(String topic, Buffer payload, MqttQoS qosLevel) {
            Promise publishSentPromise = Promise.promise();
            try {
                this.endpoint.publish(topic, payload, qosLevel, false, false, (Handler)publishSentPromise);
            }
            catch (Exception e) {
                publishSentPromise.fail((Throwable)(!this.endpoint.isConnected() ? new ServerErrorException(503, "connection to device already closed") : new ServerErrorException(500, (Throwable)e)));
            }
            return publishSentPromise.future();
        }

        private void afterCommandPublished(Integer publishedMsgId, CommandContext commandContext, TenantObject tenantObject, CommandSubscription subscription) {
            boolean requiresPuback = MqttQoS.AT_LEAST_ONCE.equals((Object)subscription.getQos());
            if (requiresPuback) {
                Handler onAckHandler = msgId -> {
                    this.reportPublishedCommand(tenantObject, subscription, commandContext, MetricsTags.ProcessingOutcome.FORWARDED);
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("received PUBACK [packet-id: {}] for command [tenant-id: {}, device-id: {}, MQTT client-id: {}]", new Object[]{msgId, subscription.getTenant(), subscription.getDeviceId(), this.endpoint.clientIdentifier()});
                    commandContext.getTracingSpan().log("received PUBACK from device");
                    commandContext.accept();
                };
                Handler onAckTimeoutHandler = v -> {
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("did not receive PUBACK [packet-id: {}] for command [tenant-id: {}, device-id: {}, MQTT client-id: {}]", new Object[]{publishedMsgId, subscription.getTenant(), subscription.getDeviceId(), this.endpoint.clientIdentifier()});
                    commandContext.release((Throwable)new ServerErrorException(503, "did not receive PUBACK from device"));
                    this.reportPublishedCommand(tenantObject, subscription, commandContext, MetricsTags.ProcessingOutcome.UNDELIVERABLE);
                };
                this.pendingAcks.add(publishedMsgId, (Handler<Integer>)onAckHandler, (Handler<Void>)onAckTimeoutHandler, ((MqttProtocolAdapterProperties)((Object)AbstractVertxBasedMqttProtocolAdapter.this.getConfig())).getSendMessageToDeviceTimeout());
            } else {
                this.reportPublishedCommand(tenantObject, subscription, commandContext, MetricsTags.ProcessingOutcome.FORWARDED);
                commandContext.accept();
            }
        }

        private void reportPublishedCommand(TenantObject tenantObject, CommandSubscription subscription, CommandContext commandContext, MetricsTags.ProcessingOutcome outcome) {
            AbstractVertxBasedMqttProtocolAdapter.this.metrics.reportCommand(commandContext.getCommand().isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, subscription.getTenant(), tenantObject, outcome, commandContext.getCommand().getPayloadSize(), AbstractVertxBasedMqttProtocolAdapter.getMicrometerSample((CommandContext)commandContext));
        }

        protected final void onUnsubscribe(MqttUnsubscribeMessage unsubscribeMsg) {
            Objects.requireNonNull(unsubscribeMsg);
            Span span = this.newSpan("UNSUBSCRIBE");
            ArrayList removalDoneFutures = new ArrayList(unsubscribeMsg.topics().size());
            unsubscribeMsg.topics().forEach(topic -> {
                AtomicReference removedSubscription = new AtomicReference();
                if (CommandSubscription.hasCommandEndpointPrefix(topic)) {
                    Optional.ofNullable(CommandSubscription.getKey(topic, this.authenticatedDevice)).map(this.commandSubscriptions::remove).ifPresent(subscriptionCommandConsumerPair -> {
                        CommandSubscription subscription = (CommandSubscription)subscriptionCommandConsumerPair.one();
                        removedSubscription.set(subscription);
                        subscription.logUnsubscribe(span);
                        removalDoneFutures.add(this.onCommandSubscriptionRemoved((Pair<CommandSubscription, CommandConsumer>)subscriptionCommandConsumerPair, span, true));
                    });
                } else if (ErrorSubscription.hasErrorEndpointPrefix(topic)) {
                    Optional.ofNullable(ErrorSubscription.getKey(topic, this.authenticatedDevice)).map(this.errorSubscriptions::remove).ifPresent(subscription -> {
                        removedSubscription.set(subscription);
                        subscription.logUnsubscribe(span);
                    });
                }
                if (removedSubscription.get() != null) {
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("removed subscription with topic [{}] for device [tenant-id: {}, device-id: {}]", new Object[]{topic, ((Subscription)removedSubscription.get()).getTenant(), ((Subscription)removedSubscription.get()).getDeviceId()});
                } else {
                    TracingHelper.logError((Span)span, (String)String.format("no subscription found for topic filter [%s]", topic));
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("cannot unsubscribe - no subscription found for topic filter [{}]", topic);
                }
            });
            if (this.endpoint.isConnected()) {
                this.endpoint.unsubscribeAcknowledge(unsubscribeMsg.messageId());
            }
            CompositeFuture.join(removalDoneFutures).onComplete(r -> span.finish());
        }

        private Future<Void> onCommandSubscriptionRemoved(Pair<CommandSubscription, CommandConsumer> subscriptionConsumerPair, Span span, boolean sendDisconnectedEvent) {
            CommandSubscription subscription = (CommandSubscription)subscriptionConsumerPair.one();
            CommandConsumer commandConsumer = (CommandConsumer)subscriptionConsumerPair.two();
            return commandConsumer.close(span.context()).recover(thr -> {
                TracingHelper.logError((Span)span, (Throwable)thr);
                if (ServiceInvocationException.extractStatusCode((Throwable)thr) == 412) {
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("command consumer wasn't active anymore - skip sending disconnected event [tenant: {}, device-id: {}]", (Object)subscription.getTenant(), (Object)subscription.getDeviceId());
                    span.log("command consumer wasn't active anymore - skip sending disconnected event");
                    return Future.failedFuture((Throwable)thr);
                }
                return Future.succeededFuture();
            }).compose(v -> {
                if (sendDisconnectedEvent) {
                    return this.sendDisconnectedTtdEvent(subscription.getTenant(), subscription.getDeviceId(), span);
                }
                return Future.succeededFuture();
            });
        }

        private Future<Void> sendDisconnectedTtdEvent(String tenant, String device, Span span) {
            Span sendEventSpan = this.newChildSpan(span.context(), "send Disconnected Event");
            return AbstractVertxBasedMqttProtocolAdapter.this.sendDisconnectedTtdEvent(tenant, device, this.authenticatedDevice, sendEventSpan.context()).onComplete(r -> sendEventSpan.finish()).mapEmpty();
        }

        private boolean stopCalled() {
            return AbstractVertxBasedMqttProtocolAdapter.this.stopResultPromiseRef.get() != null;
        }

        protected final void close(String reason, boolean sendDisconnectedEvent) {
            Objects.requireNonNull(reason);
            Span span = this.newSpan("close device connection");
            if (this.endpoint.isConnected()) {
                this.endpoint.closeHandler(v -> {});
            }
            this.onCloseInternal(span, reason, sendDisconnectedEvent).onComplete(ar -> span.finish());
        }

        protected final void onClose() {
            Object reason;
            Span span;
            if (this.protocolLevelException != null) {
                span = this.newSpan("close connection due to protocol error");
                TracingHelper.logError((Span)span, null, (Throwable)this.protocolLevelException, (boolean)true);
                reason = "protocol error: " + this.protocolLevelException.toString();
            } else if (this.stopCalled()) {
                span = this.newSpan("close device connection (server shutdown)");
                reason = "server shutdown";
            } else {
                span = this.newSpan("CLOSE");
                reason = null;
            }
            this.onCloseInternal(span, (String)reason, true).onComplete(ar -> span.finish());
        }

        private Future<Void> onCloseInternal(Span span, String reason, boolean sendDisconnectedEvent) {
            Object reasonSuffix;
            AbstractVertxBasedMqttProtocolAdapter.this.onBeforeEndpointClose(this);
            AbstractVertxBasedMqttProtocolAdapter.this.onClose(this.endpoint);
            CompositeFuture removalDoneFuture = this.removeAllCommandSubscriptions(span, sendDisconnectedEvent);
            if (sendDisconnectedEvent) {
                AbstractVertxBasedMqttProtocolAdapter.this.sendDisconnectedEvent(this.endpoint.clientIdentifier(), this.authenticatedDevice, span.context());
            }
            if (this.authenticatedDevice == null) {
                AbstractVertxBasedMqttProtocolAdapter.this.metrics.decrementUnauthenticatedConnections();
            } else {
                AbstractVertxBasedMqttProtocolAdapter.this.metrics.decrementConnections(this.authenticatedDevice.getTenantId());
            }
            Object object = reasonSuffix = reason != null ? "; reason: " + reason : "";
            if (this.endpoint.isConnected()) {
                HashMap<String, String> logFields = new HashMap<String, String>(2);
                logFields.put("event", "closing device connection");
                Optional.ofNullable(reason).ifPresent(r -> logFields.put("reason", (String)r));
                span.log(logFields);
                if (this.authenticatedDevice == null) {
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("closing connection to anonymous device [client ID: {}]{}", (Object)this.endpoint.clientIdentifier(), reasonSuffix);
                } else {
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("closing connection to device [tenant-id: {}, device-id: {}, client ID: {}]{}", new Object[]{this.authenticatedDevice.getTenantId(), this.authenticatedDevice.getDeviceId(), this.endpoint.clientIdentifier(), reasonSuffix});
                }
                this.endpoint.close();
            } else if (this.authenticatedDevice == null) {
                AbstractVertxBasedMqttProtocolAdapter.this.log.debug("connection to anonymous device closed [client ID: {}]{}", (Object)this.endpoint.clientIdentifier(), reasonSuffix);
            } else {
                AbstractVertxBasedMqttProtocolAdapter.this.log.debug("connection to device closed [tenant-id: {}, device-id: {}, client ID: {}]{}", new Object[]{this.authenticatedDevice.getTenantId(), this.authenticatedDevice.getDeviceId(), this.endpoint.clientIdentifier(), reasonSuffix});
            }
            return removalDoneFuture.mapEmpty();
        }

        private CompositeFuture removeAllCommandSubscriptions(Span span, boolean sendDisconnectedEvent) {
            ArrayList<Future<Void>> removalFutures = new ArrayList<Future<Void>>(this.commandSubscriptions.size());
            Iterator<Pair<CommandSubscription, CommandConsumer>> iter = this.commandSubscriptions.values().iterator();
            while (iter.hasNext()) {
                Pair<CommandSubscription, CommandConsumer> pair = iter.next();
                ((CommandSubscription)pair.one()).logUnsubscribe(span);
                removalFutures.add(this.onCommandSubscriptionRemoved(pair, span, sendDisconnectedEvent));
                iter.remove();
            }
            return CompositeFuture.join(removalFutures);
        }

        private Span newSpan(String operationName) {
            Span span = this.newChildSpan(null, operationName);
            this.traceSamplingPriority.ifPresent(prio -> TracingHelper.setTraceSamplingPriority((Span)span, (int)prio));
            return span;
        }

        private Span newChildSpan(SpanContext spanContext, String operationName) {
            Span span = TracingHelper.buildChildSpan((Tracer)AbstractVertxBasedMqttProtocolAdapter.this.tracer, (SpanContext)spanContext, (String)operationName, (String)AbstractVertxBasedMqttProtocolAdapter.this.getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "server").withTag(TracingHelper.TAG_CLIENT_ID.getKey(), this.endpoint.clientIdentifier()).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), this.authenticatedDevice != null).start();
            if (this.authenticatedDevice != null) {
                TracingHelper.setDeviceTags((Span)span, (String)this.authenticatedDevice.getTenantId(), (String)this.authenticatedDevice.getDeviceId());
            }
            return span;
        }
    }
}

