/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.adapter.http;

import io.micrometer.core.instrument.Timer;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.noop.NoopSpan;
import io.opentracing.tag.Tag;
import io.opentracing.tag.Tags;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.net.NetServerOptions;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.eclipse.hono.adapter.AbstractProtocolAdapterBase;
import org.eclipse.hono.adapter.HttpContext;
import org.eclipse.hono.adapter.TelemetryExecutionContext;
import org.eclipse.hono.adapter.auth.device.CredentialsApiAuthProvider;
import org.eclipse.hono.adapter.auth.device.DeviceCredentials;
import org.eclipse.hono.adapter.http.HttpAdapterMetrics;
import org.eclipse.hono.adapter.http.HttpProtocolAdapterProperties;
import org.eclipse.hono.auth.Device;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandConsumer;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandResponse;
import org.eclipse.hono.service.http.DefaultFailureHandler;
import org.eclipse.hono.service.http.HttpServerSpanHelper;
import org.eclipse.hono.service.http.HttpUtils;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.tracing.TenantTraceSamplingHelper;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.QoS;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.Strings;
import org.eclipse.hono.util.TenantObject;

public abstract class AbstractVertxBasedHttpProtocolAdapter<T extends HttpProtocolAdapterProperties>
extends AbstractProtocolAdapterBase<T> {
    protected static final String DEFAULT_UPLOADS_DIRECTORY = "/tmp";
    private static final String KEY_TIMER_ID = "timerId";
    private static final String MATCH_ALL_ROUTE_NAME = "/*";
    private static final String KEY_MATCH_ALL_ROUTE_APPLIED = "matchAllRouteApplied";
    private HttpAdapterMetrics metrics = HttpAdapterMetrics.NOOP;
    private HttpServer server;
    private HttpServer insecureServer;

    public final void setMetrics(HttpAdapterMetrics metrics) {
        Optional.ofNullable(metrics).ifPresent(m -> this.log.info("reporting metrics using [{}]", (Object)metrics.getClass().getName()));
        this.metrics = metrics;
    }

    protected final HttpAdapterMetrics getMetrics() {
        return this.metrics;
    }

    public final int getPortDefaultValue() {
        return 8443;
    }

    public final int getInsecurePortDefaultValue() {
        return 8080;
    }

    protected final int getActualPort() {
        return this.server != null ? this.server.actualPort() : -1;
    }

    protected final int getActualInsecurePort() {
        return this.insecureServer != null ? this.insecureServer.actualPort() : -1;
    }

    public final void setHttpServer(HttpServer server) {
        Objects.requireNonNull(server);
        if (server.actualPort() > 0) {
            throw new IllegalArgumentException("http server must not be started already");
        }
        this.server = server;
    }

    public final void setInsecureHttpServer(HttpServer server) {
        Objects.requireNonNull(server);
        if (server.actualPort() > 0) {
            throw new IllegalArgumentException("http server must not be started already");
        }
        this.insecureServer = server;
    }

    public final void doStart(Promise<Void> startPromise) {
        this.checkPortConfiguration().compose(s -> this.preStartup()).compose(s -> {
            Router router = this.createRouter();
            if (router == null) {
                return Future.failedFuture((String)"no router configured");
            }
            System.setProperty("io.vertx.web.router.setup.lenient", "true");
            this.addRoutes(router);
            return CompositeFuture.all(this.bindSecureHttpServer(router), this.bindInsecureHttpServer(router));
        }).compose(ok -> {
            try {
                this.onStartupSuccess();
                return Future.succeededFuture((Object)null);
            }
            catch (Exception e) {
                this.log.error("error in onStartupSuccess", (Throwable)e);
                return Future.failedFuture((Throwable)e);
            }
        }).onComplete(startPromise);
    }

    private Timer.Sample getMicrometerSample(RoutingContext ctx) {
        return (Timer.Sample)ctx.get("micrometer.sample");
    }

    private void setTtdStatus(RoutingContext ctx, MetricsTags.TtdStatus status) {
        ctx.put(MetricsTags.TtdStatus.class.getName(), (Object)status);
    }

    private Map<String, String> getCustomTags() {
        HashMap<String, String> customTags = new HashMap<String, String>();
        customTags.put(Tags.COMPONENT.getKey(), this.getTypeName());
        this.addCustomTags(customTags);
        return customTags;
    }

    protected void addCustomTags(Map<String, String> customTags) {
    }

    protected Future<Void> preStartup() {
        return Future.succeededFuture();
    }

    protected void onStartupSuccess() {
    }

    protected Router createRouter() {
        Router router = Router.router((Vertx)this.vertx);
        Map<String, String> customTags = this.getCustomTags();
        DefaultFailureHandler defaultFailureHandler = new DefaultFailureHandler();
        Route matchAllRoute = router.route();
        matchAllRoute.failureHandler(ctx -> {
            if (ctx.get(KEY_MATCH_ALL_ROUTE_APPLIED) == null) {
                ctx.request().routed(MATCH_ALL_ROUTE_NAME);
                HttpServerSpanHelper.adoptActiveSpanIntoContext((Tracer)this.tracer, (Map)customTags, (RoutingContext)ctx);
            }
            defaultFailureHandler.handle(ctx);
        });
        matchAllRoute.handler(ctx -> {
            ctx.request().routed(MATCH_ALL_ROUTE_NAME);
            ctx.put(KEY_MATCH_ALL_ROUTE_APPLIED, (Object)true);
            HttpServerSpanHelper.adoptActiveSpanIntoContext((Tracer)this.tracer, (Map)customTags, (RoutingContext)ctx);
            ctx.put("micrometer.sample", (Object)this.getMetrics().startTimer());
            if (!ctx.response().closed() && !ctx.response().ended()) {
                ctx.response().closeHandler(v -> this.logResponseGettingClosedPrematurely((RoutingContext)ctx));
            }
            HttpUtils.nextRoute((RoutingContext)ctx);
        });
        HttpUtils.addDefault404ErrorHandler((Router)router);
        return router;
    }

    protected Future<Void> handleBeforeCredentialsValidation(DeviceCredentials credentials, HttpContext executionContext) {
        String tenantId = credentials.getTenantId();
        String authId = credentials.getAuthId();
        Span span = Optional.ofNullable(executionContext.getTracingSpan()).orElseGet(() -> {
            this.log.warn("handleBeforeCredentialsValidation: no span context set in httpContext");
            return NoopSpan.INSTANCE;
        });
        return this.getTenantConfiguration(tenantId, span.context()).recover(t -> Future.failedFuture((Throwable)CredentialsApiAuthProvider.mapNotFoundToBadCredentialsException((Throwable)t))).map(tenantObject -> {
            TracingHelper.setDeviceTags((Span)span, (String)tenantId, null, (String)authId);
            TenantTraceSamplingHelper.applyTraceSamplingPriority((TenantObject)tenantObject, (String)authId, (Span)span);
            return tenantObject;
        }).compose(tenantObject -> this.isAdapterEnabled((TenantObject)tenantObject)).mapEmpty();
    }

    protected abstract void addRoutes(Router var1);

    protected HttpServerOptions getHttpServerOptions() {
        HttpServerOptions options = new HttpServerOptions();
        options.setHost(((HttpProtocolAdapterProperties)((Object)this.getConfig())).getBindAddress()).setPort(((HttpProtocolAdapterProperties)((Object)this.getConfig())).getPort(this.getPortDefaultValue())).setMaxChunkSize(4096).setIdleTimeout(((HttpProtocolAdapterProperties)((Object)this.getConfig())).getIdleTimeout());
        this.addTlsKeyCertOptions((NetServerOptions)options);
        this.addTlsTrustOptions((NetServerOptions)options);
        return options;
    }

    protected HttpServerOptions getInsecureHttpServerOptions() {
        HttpServerOptions options = new HttpServerOptions();
        options.setHost(((HttpProtocolAdapterProperties)((Object)this.getConfig())).getInsecurePortBindAddress()).setPort(((HttpProtocolAdapterProperties)((Object)this.getConfig())).getInsecurePort(this.getInsecurePortDefaultValue())).setMaxChunkSize(4096).setIdleTimeout(((HttpProtocolAdapterProperties)((Object)this.getConfig())).getIdleTimeout());
        return options;
    }

    protected void customizeDownstreamMessageProperties(Map<String, Object> messageProperties, HttpContext ctx) {
    }

    private Future<HttpServer> bindSecureHttpServer(Router router) {
        if (this.isSecurePortEnabled()) {
            String bindAddress;
            Promise result = Promise.promise();
            String string = bindAddress = this.server == null ? ((HttpProtocolAdapterProperties)((Object)this.getConfig())).getBindAddress() : "?";
            if (this.server == null) {
                this.server = this.vertx.createHttpServer(this.getHttpServerOptions());
            }
            this.server.requestHandler((Handler)router).listen(done -> {
                if (done.succeeded()) {
                    this.log.info("secure http server listening on {}:{}", (Object)bindAddress, (Object)this.server.actualPort());
                    result.complete((Object)((HttpServer)done.result()));
                } else {
                    this.log.error("error while starting up secure http server", done.cause());
                    result.fail(done.cause());
                }
            });
            return result.future();
        }
        return Future.succeededFuture();
    }

    private Future<HttpServer> bindInsecureHttpServer(Router router) {
        if (this.isInsecurePortEnabled()) {
            String bindAddress;
            Promise result = Promise.promise();
            String string = bindAddress = this.insecureServer == null ? ((HttpProtocolAdapterProperties)((Object)this.getConfig())).getInsecurePortBindAddress() : "?";
            if (this.insecureServer == null) {
                this.insecureServer = this.vertx.createHttpServer(this.getInsecureHttpServerOptions());
            }
            this.insecureServer.requestHandler((Handler)router).listen(done -> {
                if (done.succeeded()) {
                    this.log.info("insecure http server listening on {}:{}", (Object)bindAddress, (Object)this.insecureServer.actualPort());
                    result.complete((Object)((HttpServer)done.result()));
                } else {
                    this.log.error("error while starting up insecure http server", done.cause());
                    result.fail(done.cause());
                }
            });
            return result.future();
        }
        return Future.succeededFuture();
    }

    public final void doStop(Promise<Void> stopPromise) {
        try {
            this.preShutdown();
        }
        catch (Exception e) {
            this.log.error("error in preShutdown", (Throwable)e);
        }
        Promise serverStopTracker = Promise.promise();
        if (this.server != null) {
            this.server.close((Handler)serverStopTracker);
        } else {
            serverStopTracker.complete();
        }
        Promise insecureServerStopTracker = Promise.promise();
        if (this.insecureServer != null) {
            this.insecureServer.close((Handler)insecureServerStopTracker);
        } else {
            insecureServerStopTracker.complete();
        }
        CompositeFuture.all((Future)serverStopTracker.future(), (Future)insecureServerStopTracker.future()).compose(v -> this.postShutdown()).onComplete(stopPromise);
    }

    protected void preShutdown() {
    }

    protected Future<Void> postShutdown() {
        return Future.succeededFuture();
    }

    protected final void uploadTelemetryMessage(HttpContext ctx, String tenant, String deviceId, Buffer payload, String contentType) {
        this.doUploadMessage(Objects.requireNonNull(ctx), Objects.requireNonNull(tenant), Objects.requireNonNull(deviceId), payload, contentType, MetricsTags.EndpointType.TELEMETRY);
    }

    protected final void uploadEventMessage(HttpContext ctx, String tenant, String deviceId, Buffer payload, String contentType) {
        this.doUploadMessage(Objects.requireNonNull(ctx), Objects.requireNonNull(tenant), Objects.requireNonNull(deviceId), payload, contentType, MetricsTags.EndpointType.EVENT);
    }

    protected void doUploadMessage(HttpContext ctx, String tenant, String deviceId) {
        this.doUploadMessage(ctx, tenant, deviceId, ctx.getRoutingContext().body().buffer(), ctx.getContentType(), MetricsTags.EndpointType.fromString((String)ctx.getRequestedResource().getEndpoint()));
    }

    private void doUploadMessage(HttpContext ctx, String tenant, String deviceId, Buffer payload, String contentType, MetricsTags.EndpointType endpoint) {
        if (!ctx.hasValidQoS()) {
            HttpUtils.badRequest((RoutingContext)ctx.getRoutingContext(), (String)"unsupported QoS-Level header value");
            return;
        }
        if (!AbstractVertxBasedHttpProtocolAdapter.isPayloadOfIndicatedType((Buffer)payload, (String)contentType)) {
            HttpUtils.badRequest((RoutingContext)ctx.getRoutingContext(), (String)String.format("content type [%s] does not match payload", contentType));
            return;
        }
        MetricsTags.QoS qos = AbstractVertxBasedHttpProtocolAdapter.getQoSLevel(endpoint, ctx.getRequestedQos());
        Device authenticatedDevice = ctx.getAuthenticatedDevice();
        String gatewayId = authenticatedDevice != null && !deviceId.equals(authenticatedDevice.getDeviceId()) ? authenticatedDevice.getDeviceId() : null;
        Span currentSpan = TracingHelper.buildChildSpan((Tracer)this.tracer, (SpanContext)ctx.getTracingContext(), (String)("upload " + endpoint.getCanonicalName()), (String)this.getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").withTag((Tag)TracingHelper.TAG_TENANT_ID, (Object)tenant).withTag((Tag)TracingHelper.TAG_DEVICE_ID, (Object)deviceId).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), authenticatedDevice != null).withTag((Tag)TracingHelper.TAG_QOS, (Object)qos.name()).start();
        Future tokenTracker = this.getRegistrationAssertion(tenant, deviceId, authenticatedDevice, currentSpan.context());
        int payloadSize = Optional.ofNullable(payload).map(ok -> payload.length()).orElse(0);
        Future tenantTracker = this.getTenantConfiguration(tenant, currentSpan.context());
        Future tenantValidationTracker = tenantTracker.compose(tenantObject -> CompositeFuture.all((Future)this.isAdapterEnabled((TenantObject)tenantObject), (Future)this.checkMessageLimit((TenantObject)tenantObject, payloadSize, currentSpan.context())).map(success -> tenantObject));
        Future ttdTracker = CompositeFuture.all((Future)tenantValidationTracker, (Future)tokenTracker).compose(ok -> {
            Integer ttdParam = this.getTimeUntilDisconnectFromRequest(ctx);
            return this.getTimeUntilDisconnect((TenantObject)tenantTracker.result(), ttdParam).onSuccess(effectiveTtd -> Optional.ofNullable(effectiveTtd).ifPresent(v -> TracingHelper.TAG_DEVICE_TTD.set(currentSpan, v)));
        }).map(ttd -> ttd == null || ttd <= 0 ? null : ttd);
        ttdTracker.compose(ttd -> ttd == null ? ResponseReadyTracker.nop() : this.createCommandConsumer((Integer)ttd, (TenantObject)tenantTracker.result(), deviceId, gatewayId, ctx.getRoutingContext(), currentSpan)).compose(responseReadyTracker -> {
            Map props = this.getDownstreamMessageProperties((TelemetryExecutionContext)ctx);
            Optional.ofNullable((Integer)ttdTracker.result()).ifPresent(ttd -> props.put("ttd", ttd));
            props.put("qos", ctx.getRequestedQos().ordinal());
            this.customizeDownstreamMessageProperties(props, ctx);
            if (MetricsTags.EndpointType.EVENT.equals((Object)endpoint)) {
                ctx.getTimeToLive().ifPresent(ttl -> props.put("ttl", ttl.toSeconds()));
                return CompositeFuture.all((Future)this.getEventSender((TenantObject)tenantValidationTracker.result()).sendEvent((TenantObject)tenantTracker.result(), (RegistrationAssertion)tokenTracker.result(), contentType, payload, props, currentSpan.context()).onFailure(thr -> responseReadyTracker.cancel("send event failed", null)), responseReadyTracker.future()).map(s -> null);
            }
            return CompositeFuture.all((Future)this.getTelemetrySender((TenantObject)tenantValidationTracker.result()).sendTelemetry((TenantObject)tenantTracker.result(), (RegistrationAssertion)tokenTracker.result(), ctx.getRequestedQos(), contentType, payload, props, currentSpan.context()).onFailure(thr -> responseReadyTracker.cancel("send telemetry failed", null)), responseReadyTracker.future()).map(s -> null);
        }).map(proceed -> {
            this.metrics.reportTelemetry(endpoint, tenant, (TenantObject)tenantTracker.result(), MetricsTags.ProcessingOutcome.FORWARDED, qos, payloadSize, ctx.getTtdStatus(), this.getMicrometerSample(ctx.getRoutingContext()));
            if (ctx.response().closed()) {
                this.log.debug("failed to send http response for [{}] message from device [tenantId: {}, deviceId: {}]: response already closed", new Object[]{endpoint, tenant, deviceId});
                TracingHelper.logError((Span)currentSpan, (String)"failed to send HTTP response to device: response already closed");
                currentSpan.finish();
                ctx.response().end();
            } else {
                CommandContext commandContext = (CommandContext)ctx.get("command-context");
                this.setResponsePayload(ctx.response(), commandContext, currentSpan);
                ctx.getRoutingContext().addBodyEndHandler(ok -> {
                    this.log.trace("successfully processed [{}] message for device [tenantId: {}, deviceId: {}]", new Object[]{endpoint, tenant, deviceId});
                    if (commandContext != null) {
                        commandContext.getTracingSpan().log("forwarded command to device in HTTP response body");
                        commandContext.accept();
                        this.metrics.reportCommand(commandContext.getCommand().isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, tenant, (TenantObject)tenantTracker.result(), MetricsTags.ProcessingOutcome.FORWARDED, commandContext.getCommand().getPayloadSize(), AbstractVertxBasedHttpProtocolAdapter.getMicrometerSample((CommandContext)commandContext));
                    }
                    currentSpan.finish();
                });
                ctx.response().exceptionHandler(t -> {
                    this.log.debug("failed to send http response for [{}] message from device [tenantId: {}, deviceId: {}]", new Object[]{endpoint, tenant, deviceId, t});
                    if (commandContext != null) {
                        TracingHelper.logError((Span)commandContext.getTracingSpan(), (String)"failed to forward command to device in HTTP response body", (Throwable)t);
                        commandContext.release(t);
                        this.metrics.reportCommand(commandContext.getCommand().isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, tenant, (TenantObject)tenantTracker.result(), MetricsTags.ProcessingOutcome.UNDELIVERABLE, commandContext.getCommand().getPayloadSize(), AbstractVertxBasedHttpProtocolAdapter.getMicrometerSample((CommandContext)commandContext));
                    }
                    currentSpan.log("failed to send HTTP response to device");
                    TracingHelper.logError((Span)currentSpan, (Throwable)t);
                    currentSpan.finish();
                });
                ctx.response().end();
            }
            return proceed;
        }).recover(t -> {
            MetricsTags.ProcessingOutcome outcome;
            this.log.debug("cannot process [{}] message from device [tenantId: {}, deviceId: {}]", new Object[]{endpoint, tenant, deviceId, t});
            boolean responseClosedPrematurely = ctx.response().closed();
            CommandContext commandContext = (CommandContext)ctx.get("command-context");
            if (commandContext != null) {
                TracingHelper.logError((Span)commandContext.getTracingSpan(), (String)"command won't be forwarded to device in HTTP response body, HTTP request handling failed", (Throwable)t);
                commandContext.release(t);
                currentSpan.log("released command for device");
            }
            if (ClientErrorException.class.isInstance(t)) {
                outcome = MetricsTags.ProcessingOutcome.UNPROCESSABLE;
                ctx.fail(t);
            } else {
                outcome = MetricsTags.ProcessingOutcome.UNDELIVERABLE;
                String errorMessage = t instanceof ServerErrorException ? ((ServerErrorException)t).getClientFacingMessage() : null;
                HttpUtils.serviceUnavailable((RoutingContext)ctx.getRoutingContext(), (int)2, (String)(Strings.isNullOrEmpty((Object)errorMessage) ? "temporarily unavailable" : errorMessage));
            }
            if (responseClosedPrematurely) {
                this.log.debug("failed to send http response for [{}] message from device [tenantId: {}, deviceId: {}]: response already closed", new Object[]{endpoint, tenant, deviceId});
                TracingHelper.logError((Span)currentSpan, (String)"failed to send HTTP response to device: response already closed");
            }
            this.metrics.reportTelemetry(endpoint, tenant, (TenantObject)tenantTracker.result(), outcome, qos, payloadSize, ctx.getTtdStatus(), this.getMicrometerSample(ctx.getRoutingContext()));
            TracingHelper.logError((Span)currentSpan, (Throwable)t);
            currentSpan.finish();
            return Future.failedFuture((Throwable)t);
        });
    }

    private void logResponseGettingClosedPrematurely(RoutingContext ctx) {
        this.log.trace("connection got closed before response could be sent");
        Optional.ofNullable(HttpServerSpanHelper.serverSpan((RoutingContext)ctx)).ifPresent(span -> TracingHelper.logError((Span)span, (String)"connection got closed before response could be sent"));
    }

    protected Integer getTimeUntilDisconnectFromRequest(HttpContext ctx) {
        return ctx.getTimeTillDisconnect();
    }

    public Future<Integer> getTimeUntilDisconnect(TenantObject tenant, Integer deviceTtd) {
        Objects.requireNonNull(tenant);
        if (deviceTtd == null) {
            return Future.succeededFuture();
        }
        int ttdWithTenantMaxApplied = Math.min(tenant.getMaxTimeUntilDisconnect(this.getTypeName()), deviceTtd);
        int effectiveTtd = Math.min(((HttpProtocolAdapterProperties)((Object)this.getConfig())).getIdleTimeout() * 80 / 100, ttdWithTenantMaxApplied);
        return Future.succeededFuture((Object)effectiveTtd);
    }

    private void setTtdRequestConnectionCloseHandler(RoutingContext ctx, ResponseReadyTracker responseReadyTracker, String tenantId, String deviceId) {
        if (!ctx.response().closed() && !ctx.response().ended()) {
            ctx.response().closeHandler(v -> responseReadyTracker.cancel("device closed connection, stop waiting for command", (Handler<Boolean>)((Handler)canceled -> {
                if (canceled.booleanValue()) {
                    this.log.debug("device [tenant: {}, device-id: {}] closed connection before response could be sent", (Object)tenantId, (Object)deviceId);
                    this.cancelCommandReceptionTimer(ctx);
                }
            })));
        }
    }

    private void setResponsePayload(HttpServerResponse response, CommandContext commandContext, Span currentSpan) {
        if (commandContext == null) {
            this.setEmptyResponsePayload(response, currentSpan);
        } else {
            this.setNonEmptyResponsePayload(response, commandContext, currentSpan);
        }
    }

    protected void setEmptyResponsePayload(HttpServerResponse response, Span currentSpan) {
        response.setStatusCode(202);
    }

    protected void setNonEmptyResponsePayload(HttpServerResponse response, CommandContext commandContext, Span currentSpan) {
        Command command = commandContext.getCommand();
        response.putHeader("hono-command", command.getName());
        currentSpan.setTag("hono-command", command.getName());
        this.log.debug("adding command [name: {}, request-id: {}] to response for device [tenant-id: {}, device-id: {}]", new Object[]{command.getName(), command.getRequestId(), command.getTenant(), command.getGatewayOrDeviceId()});
        if (!command.isOneWay()) {
            response.putHeader("hono-cmd-req-id", command.getRequestId());
            currentSpan.setTag("hono-cmd-req-id", command.getRequestId());
        }
        if (command.isTargetedAtGateway()) {
            response.putHeader("hono-cmd-target-device", command.getDeviceId());
            currentSpan.setTag("hono-cmd-target-device", command.getDeviceId());
        }
        response.setStatusCode(200);
        HttpUtils.setResponseBody((HttpServerResponse)response, (Buffer)command.getPayload(), (String)command.getContentType());
    }

    protected final Future<ResponseReadyTracker> createCommandConsumer(Integer ttdSecs, final TenantObject tenantObject, final String deviceId, String gatewayId, RoutingContext ctx, final Span uploadMessageSpan) {
        Objects.requireNonNull(ttdSecs);
        Objects.requireNonNull(tenantObject);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(ctx);
        Objects.requireNonNull(uploadMessageSpan);
        final AtomicBoolean requestProcessed = new AtomicBoolean(false);
        final Promise responseReady = Promise.promise();
        TracingHelper.TAG_DEVICE_TTD.set(uploadMessageSpan, ttdSecs);
        final Span waitForCommandSpan = TracingHelper.buildChildSpan((Tracer)this.tracer, (SpanContext)uploadMessageSpan.context(), (String)"create consumer and wait for command", (String)this.getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").start();
        TracingHelper.setDeviceTags((Span)waitForCommandSpan, (String)tenantObject.getTenantId(), (String)deviceId);
        Function<CommandContext, Future> commandHandler = commandContext -> {
            Span processCommandSpan = TracingHelper.buildFollowsFromSpan((Tracer)this.tracer, (SpanContext)waitForCommandSpan.context(), (String)"process received command").withTag(Tags.COMPONENT.getKey(), this.getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").addReference("follows_from", commandContext.getTracingContext()).start();
            TracingHelper.setDeviceTags((Span)processCommandSpan, (String)tenantObject.getTenantId(), (String)deviceId);
            Tags.COMPONENT.set(commandContext.getTracingSpan(), this.getTypeName());
            commandContext.logCommandToSpan(processCommandSpan);
            Command command = commandContext.getCommand();
            Timer.Sample commandSample = this.getMetrics().startTimer();
            if (this.isCommandValid(command, processCommandSpan)) {
                Promise commandHandlerDonePromise = Promise.promise();
                if (requestProcessed.compareAndSet(false, true)) {
                    waitForCommandSpan.finish();
                    commandHandlerDonePromise.future().onComplete((Handler)responseReady);
                    this.checkMessageLimit(tenantObject, command.getPayloadSize(), processCommandSpan.context()).onComplete(result -> {
                        if (result.succeeded()) {
                            AbstractVertxBasedHttpProtocolAdapter.addMicrometerSample((CommandContext)commandContext, (Timer.Sample)commandSample);
                            ctx.put("command-context", commandContext);
                            commandHandlerDonePromise.complete();
                        } else {
                            commandContext.reject(result.cause());
                            TracingHelper.logError((Span)processCommandSpan, (String)"rejected command for device", (Throwable)result.cause());
                            this.metrics.reportCommand(command.isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, tenantObject.getTenantId(), tenantObject, MetricsTags.ProcessingOutcome.from((Throwable)result.cause()), command.getPayloadSize(), commandSample);
                            commandHandlerDonePromise.fail(result.cause());
                        }
                        this.cancelCommandReceptionTimer(ctx);
                        this.setTtdStatus(ctx, MetricsTags.TtdStatus.COMMAND);
                        processCommandSpan.finish();
                    });
                } else {
                    String errorMsg = "waiting time for command has elapsed or another command has already been processed";
                    this.log.debug("{} [tenantId: {}, deviceId: {}]", new Object[]{"waiting time for command has elapsed or another command has already been processed", tenantObject.getTenantId(), deviceId});
                    this.getMetrics().reportCommand(command.isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, tenantObject.getTenantId(), tenantObject, MetricsTags.ProcessingOutcome.UNDELIVERABLE, command.getPayloadSize(), commandSample);
                    ServerErrorException exception = new ServerErrorException(503, "waiting time for command has elapsed or another command has already been processed");
                    commandContext.release((Throwable)exception);
                    TracingHelper.logError((Span)processCommandSpan, (String)"waiting time for command has elapsed or another command has already been processed");
                    processCommandSpan.finish();
                    commandHandlerDonePromise.fail((Throwable)exception);
                }
                return commandHandlerDonePromise.future();
            }
            this.getMetrics().reportCommand(command.isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, tenantObject.getTenantId(), tenantObject, MetricsTags.ProcessingOutcome.UNPROCESSABLE, command.getPayloadSize(), commandSample);
            this.log.debug("command message is invalid: {}", (Object)command);
            commandContext.reject("malformed command message");
            TracingHelper.logError((Span)processCommandSpan, (String)"malformed command message");
            processCommandSpan.finish();
            return Future.failedFuture((String)"malformed command message");
        };
        Future commandConsumerFuture = gatewayId != null ? this.getCommandConsumerFactory().createCommandConsumer(tenantObject.getTenantId(), deviceId, gatewayId, commandHandler, Duration.ofSeconds(ttdSecs.intValue()), waitForCommandSpan.context()) : this.getCommandConsumerFactory().createCommandConsumer(tenantObject.getTenantId(), deviceId, commandHandler, Duration.ofSeconds(ttdSecs.intValue()), waitForCommandSpan.context());
        return commandConsumerFuture.onFailure(thr -> {
            TracingHelper.logError((Span)waitForCommandSpan, (Throwable)thr);
            waitForCommandSpan.finish();
        }).map(consumer -> {
            ResponseReadyTracker responseReadyTracker = new ResponseReadyTracker(){
                private final Future<Void> waitForCommandFuture;
                final /* synthetic */ CommandConsumer val$consumer;
                {
                    this.val$consumer = commandConsumer;
                    this.waitForCommandFuture = responseReady.future().eventually(v -> this.closeCommandConsumer());
                }

                @Override
                public Future<Void> future() {
                    return this.waitForCommandFuture;
                }

                @Override
                public void cancel(String reason, Handler<Boolean> handler) {
                    if (requestProcessed.compareAndSet(false, true)) {
                        if (reason != null) {
                            waitForCommandSpan.log(String.format("canceled: %s", reason));
                        }
                        if (handler != null) {
                            handler.handle((Object)true);
                        }
                        waitForCommandSpan.finish();
                        responseReady.complete();
                    } else if (handler != null) {
                        handler.handle((Object)false);
                    }
                }

                private Future<Void> closeCommandConsumer() {
                    Span closeConsumerSpan = TracingHelper.buildChildSpan((Tracer)AbstractVertxBasedHttpProtocolAdapter.this.tracer, (SpanContext)uploadMessageSpan.context(), (String)"close command consumer", (String)AbstractVertxBasedHttpProtocolAdapter.this.getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").start();
                    TracingHelper.setDeviceTags((Span)closeConsumerSpan, (String)tenantObject.getTenantId(), (String)deviceId);
                    return this.val$consumer.close(closeConsumerSpan.context()).onComplete(ar -> {
                        if (ar.failed()) {
                            TracingHelper.logError((Span)closeConsumerSpan, (Throwable)ar.cause());
                        }
                        closeConsumerSpan.finish();
                    });
                }
            };
            if (!requestProcessed.get()) {
                this.addCommandReceptionTimer(ctx, responseReadyTracker, ttdSecs.intValue());
                this.setTtdRequestConnectionCloseHandler(ctx, responseReadyTracker, tenantObject.getTenantId(), deviceId);
            }
            return responseReadyTracker;
        });
    }

    protected boolean isCommandValid(Command command, Span currentSpan) {
        return command.isValid();
    }

    private void addCommandReceptionTimer(RoutingContext ctx, ResponseReadyTracker responseReadyTracker, long delaySecs) {
        Long timerId = ctx.vertx().setTimer(delaySecs * 1000L, id -> {
            this.log.trace("time to wait [{}s] for command expired [timer id: {}]", (Object)delaySecs, id);
            responseReadyTracker.cancel(String.format("time to wait for command expired (%ds)", delaySecs), (Handler<Boolean>)((Handler)canceled -> {
                if (canceled.booleanValue()) {
                    this.setTtdStatus(ctx, MetricsTags.TtdStatus.EXPIRED);
                } else {
                    this.log.trace("response already sent, nothing to do ...");
                }
            }));
        });
        this.log.trace("adding command reception timer [id: {}]", (Object)timerId);
        ctx.put(KEY_TIMER_ID, (Object)timerId);
    }

    private void cancelCommandReceptionTimer(RoutingContext ctx) {
        Long timerId = (Long)ctx.get(KEY_TIMER_ID);
        if (timerId != null) {
            if (ctx.vertx().cancelTimer(timerId.longValue())) {
                this.log.trace("Cancelled timer id {}", (Object)timerId);
            } else {
                this.log.debug("Could not cancel timer id {}", (Object)timerId);
            }
        }
    }

    public final void uploadCommandResponseMessage(HttpContext ctx, String tenant, String deviceId, String commandRequestId, Integer responseStatus) {
        Objects.requireNonNull(ctx);
        Objects.requireNonNull(tenant);
        Objects.requireNonNull(deviceId);
        Buffer payload = ctx.getRoutingContext().body().buffer();
        String contentType = ctx.getContentType();
        this.log.debug("processing response to command [tenantId: {}, deviceId: {}, cmd-req-id: {}, status code: {}]", new Object[]{tenant, deviceId, commandRequestId, responseStatus});
        Device authenticatedDevice = ctx.getAuthenticatedDevice();
        Span currentSpan = TracingHelper.buildChildSpan((Tracer)this.tracer, (SpanContext)ctx.getTracingContext(), (String)"upload Command response", (String)this.getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").withTag((Tag)TracingHelper.TAG_TENANT_ID, (Object)tenant).withTag((Tag)TracingHelper.TAG_DEVICE_ID, (Object)deviceId).withTag("hono-cmd-status", (Number)responseStatus).withTag("hono-cmd-req-id", commandRequestId).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), authenticatedDevice != null).start();
        CommandResponse cmdResponseOrNull = CommandResponse.fromRequestId((String)commandRequestId, (String)tenant, (String)deviceId, (Buffer)payload, (String)contentType, (Integer)responseStatus);
        Future tenantTracker = this.getTenantConfiguration(tenant, currentSpan.context());
        Future commandResponseTracker = cmdResponseOrNull != null ? Future.succeededFuture((Object)cmdResponseOrNull) : Future.failedFuture((Throwable)new ClientErrorException(400, String.format("command-request-id [%s] or status code [%s] is missing/invalid", commandRequestId, responseStatus)));
        int payloadSize = Optional.ofNullable(payload).map(Buffer::length).orElse(0);
        CompositeFuture.all((Future)tenantTracker, (Future)commandResponseTracker).compose(commandResponse -> {
            Future deviceRegistrationTracker = this.getRegistrationAssertion(tenant, deviceId, authenticatedDevice, currentSpan.context());
            Future tenantValidationTracker = CompositeFuture.all((Future)this.isAdapterEnabled((TenantObject)tenantTracker.result()), (Future)this.checkMessageLimit((TenantObject)tenantTracker.result(), payloadSize, currentSpan.context())).map(ok -> null);
            return CompositeFuture.all((Future)tenantValidationTracker, (Future)deviceRegistrationTracker).compose(ok -> this.sendCommandResponse((TenantObject)tenantTracker.result(), (RegistrationAssertion)deviceRegistrationTracker.result(), (CommandResponse)commandResponseTracker.result(), currentSpan.context())).map(delivery -> {
                this.log.trace("delivered command response [command-request-id: {}] to application", (Object)commandRequestId);
                currentSpan.log("delivered command response to application");
                currentSpan.finish();
                this.metrics.reportCommand(MetricsTags.Direction.RESPONSE, tenant, (TenantObject)tenantTracker.result(), MetricsTags.ProcessingOutcome.FORWARDED, payloadSize, this.getMicrometerSample(ctx.getRoutingContext()));
                ctx.response().setStatusCode(202);
                ctx.response().end();
                return delivery;
            });
        }).otherwise(t -> {
            this.log.debug("could not send command response [command-request-id: {}] to application", (Object)commandRequestId, t);
            TracingHelper.logError((Span)currentSpan, (Throwable)t);
            currentSpan.finish();
            this.metrics.reportCommand(MetricsTags.Direction.RESPONSE, tenant, (TenantObject)tenantTracker.result(), MetricsTags.ProcessingOutcome.from((Throwable)t), payloadSize, this.getMicrometerSample(ctx.getRoutingContext()));
            ctx.fail(t);
            return null;
        });
    }

    private static MetricsTags.QoS getQoSLevel(MetricsTags.EndpointType endpoint, QoS requestedQos) {
        if (endpoint == MetricsTags.EndpointType.EVENT) {
            return MetricsTags.QoS.AT_LEAST_ONCE;
        }
        if (requestedQos == null) {
            return MetricsTags.QoS.UNKNOWN;
        }
        if (requestedQos == QoS.AT_MOST_ONCE) {
            return MetricsTags.QoS.AT_MOST_ONCE;
        }
        return MetricsTags.QoS.AT_LEAST_ONCE;
    }

    protected BodyHandler getBodyHandler() {
        BodyHandler bodyHandler = BodyHandler.create((String)DEFAULT_UPLOADS_DIRECTORY);
        bodyHandler.setBodyLimit((long)((HttpProtocolAdapterProperties)((Object)this.getConfig())).getMaxPayloadSize());
        return bodyHandler;
    }

    protected static interface ResponseReadyTracker {
        public Future<Void> future();

        public void cancel(String var1, Handler<Boolean> var2);

        public static Future<ResponseReadyTracker> nop() {
            return Future.succeededFuture((Object)new ResponseReadyTracker(){

                @Override
                public Future<Void> future() {
                    return Future.succeededFuture();
                }

                @Override
                public void cancel(String reason, Handler<Boolean> handler) {
                    if (handler != null) {
                        handler.handle((Object)false);
                    }
                }
            });
        }
    }
}

