/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.adapter.mqtt.impl;

import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.Json;
import java.util.Map;
import java.util.Objects;
import org.eclipse.hono.adapter.mqtt.AbstractVertxBasedMqttProtocolAdapter;
import org.eclipse.hono.adapter.mqtt.MappedMessage;
import org.eclipse.hono.adapter.mqtt.MessageMapping;
import org.eclipse.hono.adapter.mqtt.MqttContext;
import org.eclipse.hono.adapter.mqtt.MqttProtocolAdapterProperties;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.ResourceIdentifier;

public final class VertxBasedMqttProtocolAdapter
extends AbstractVertxBasedMqttProtocolAdapter<MqttProtocolAdapterProperties> {
    private static final String MAPPER_DATA = "mapper_data";
    private MessageMapping<MqttContext> messageMapping;

    public String getTypeName() {
        return "hono-mqtt";
    }

    public void setMessageMapping(MessageMapping<MqttContext> messageMappingService) {
        Objects.requireNonNull(messageMappingService);
        this.messageMapping = messageMappingService;
    }

    @Override
    protected Future<Void> onPublishedMessage(MqttContext ctx) {
        return this.checkQosAndMapTopic(ctx).compose(address -> this.validateAddress((ResourceIdentifier)address, ctx.authenticatedDevice())).compose(targetAddress -> this.mapMessageAndUpdateContext(ctx, (ResourceIdentifier)targetAddress)).compose(mappedMessage -> this.uploadMessage(ctx)).recover(t -> {
            this.log.debug("discarding message [topic: {}, authenticated device: {}]", new Object[]{ctx.getOrigAddress(), ctx.authenticatedDevice(), t});
            return Future.failedFuture((Throwable)t);
        });
    }

    private Future<MappedMessage> mapMessageAndUpdateContext(MqttContext ctx, ResourceIdentifier targetAddress) {
        return this.getRegistrationAssertion(targetAddress.getTenantId(), targetAddress.getResourceId(), ctx.authenticatedDevice(), ctx.getTracingContext()).compose(registrationInfo -> this.messageMapping.mapDownstreamMessage(ctx, targetAddress.getTenantId(), (RegistrationAssertion)registrationInfo)).map(mappedMessage -> {
            ctx.put(MAPPER_DATA, mappedMessage.getAdditionalProperties());
            ctx.applyMappedTargetDeviceId(mappedMessage.getTargetDeviceId());
            ctx.applyMappedPayload(mappedMessage.getPayload());
            return mappedMessage;
        });
    }

    @Override
    protected void customizeDownstreamMessageProperties(Map<String, Object> props, MqttContext ctx) {
        Object additionalProperties = ctx.get(MAPPER_DATA);
        if (additionalProperties instanceof Map) {
            ((Map)additionalProperties).entrySet().stream().filter(entry -> entry.getKey() instanceof String).forEach(entry -> {
                String key = (String)entry.getKey();
                Object value = entry.getValue();
                if (value instanceof String) {
                    props.put(key, value);
                } else {
                    props.put(key, Json.encode(value));
                }
            });
        }
    }

    Future<ResourceIdentifier> checkQosAndMapTopic(MqttContext context) {
        Promise result = Promise.promise();
        MqttQoS qos = context.qosLevel();
        switch (context.endpoint()) {
            case TELEMETRY: {
                if (MqttQoS.EXACTLY_ONCE.equals((Object)qos)) {
                    result.fail((Throwable)new ClientErrorException(400, "QoS 2 not supported for telemetry messages"));
                    break;
                }
                result.complete((Object)context.topic());
                break;
            }
            case EVENT: {
                if (MqttQoS.AT_LEAST_ONCE.equals((Object)qos)) {
                    result.complete((Object)context.topic());
                    break;
                }
                result.fail((Throwable)new ClientErrorException(400, "Only QoS 1 supported for event messages"));
                break;
            }
            case COMMAND: {
                if (MqttQoS.EXACTLY_ONCE.equals((Object)qos)) {
                    result.fail((Throwable)new ClientErrorException(400, "QoS 2 not supported for command response messages"));
                    break;
                }
                result.complete((Object)context.topic());
                break;
            }
            default: {
                this.log.debug("no such endpoint [{}]", (Object)context.endpoint());
                result.fail((Throwable)new ClientErrorException(404, "no such endpoint"));
            }
        }
        return result.future();
    }

    @Override
    protected Future<Buffer> getCommandPayload(CommandContext ctx) {
        Command command = ctx.getCommand();
        return this.getRegistrationClient().assertRegistration(command.getTenant(), command.getGatewayOrDeviceId(), null, ctx.getTracingContext()).compose(registrationInfo -> this.messageMapping.mapUpstreamMessage((RegistrationAssertion)registrationInfo, command));
    }
}

