/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.adapter.mqtt.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import org.eclipse.hono.adapter.MapperEndpoint;
import org.eclipse.hono.adapter.mqtt.MappedMessage;
import org.eclipse.hono.adapter.mqtt.MessageMapping;
import org.eclipse.hono.adapter.mqtt.MqttContext;
import org.eclipse.hono.adapter.mqtt.MqttProtocolAdapterProperties;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class HttpBasedMessageMapping
implements MessageMapping<MqttContext> {
    private static final Logger LOG = LoggerFactory.getLogger(HttpBasedMessageMapping.class);
    private final WebClient webClient;
    private final MqttProtocolAdapterProperties mqttProtocolAdapterProperties;

    public HttpBasedMessageMapping(WebClient webClient, MqttProtocolAdapterProperties protocolAdapterConfig) {
        this.webClient = Objects.requireNonNull(webClient);
        this.mqttProtocolAdapterProperties = Objects.requireNonNull(protocolAdapterConfig);
    }

    private static MappedMessage unmodifiedMappedMessage(MqttContext ctx) {
        return new MappedMessage(ctx.deviceId(), ctx.payload());
    }

    @Override
    public Future<MappedMessage> mapDownstreamMessage(MqttContext ctx, String tenantId, RegistrationAssertion registrationInfo) {
        Objects.requireNonNull(ctx);
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(registrationInfo);
        if (!registrationInfo.getDeviceId().equals(ctx.deviceId())) {
            throw new IllegalArgumentException("registration assertion and MQTT context refer to different device identifiers");
        }
        if (!tenantId.equals(ctx.tenant())) {
            throw new IllegalArgumentException("given tenant identifier does not match the one associated with given MQTT context");
        }
        Promise result = Promise.promise();
        String mapper = registrationInfo.getDownstreamMessageMapper();
        if (Strings.isNullOrEmpty((Object)mapper)) {
            LOG.debug("no payload mapping configured for device [{}]", (Object)ctx.deviceId());
            result.complete((Object)HttpBasedMessageMapping.unmodifiedMappedMessage(ctx));
        } else {
            MapperEndpoint mapperEndpoint = this.mqttProtocolAdapterProperties.getMapperEndpoint(mapper);
            if (mapperEndpoint == null) {
                LOG.debug("no mapping endpoint [name: {}] found for device [{}]", (Object)mapper, (Object)ctx.deviceId());
                result.complete((Object)HttpBasedMessageMapping.unmodifiedMappedMessage(ctx));
            } else {
                this.mapDownstreamMessageRequest(ctx, tenantId, registrationInfo, mapperEndpoint, (Handler<AsyncResult<MappedMessage>>)result);
            }
        }
        return result.future();
    }

    @Override
    public Future<Buffer> mapUpstreamMessage(RegistrationAssertion registrationInfo, Command command) {
        Objects.requireNonNull(registrationInfo);
        Objects.requireNonNull(command);
        Promise result = Promise.promise();
        String mapper = registrationInfo.getUpstreamMessageMapper();
        if (Strings.isNullOrEmpty((Object)mapper)) {
            LOG.debug("no payload mapping configured for {}", (Object)registrationInfo.getDeviceId());
            result.complete((Object)command.getPayload());
        } else {
            MapperEndpoint mapperEndpoint = this.mqttProtocolAdapterProperties.getMapperEndpoint(mapper);
            if (mapperEndpoint == null) {
                LOG.debug("no mapping endpoint [name: {}] found for {}", (Object)mapper, (Object)registrationInfo.getDeviceId());
                result.complete((Object)command.getPayload());
            } else {
                this.mapUpstreamMessageRequest(command, registrationInfo, mapperEndpoint, (Handler<AsyncResult<Buffer>>)result);
            }
        }
        return result.future();
    }

    private void mapUpstreamMessageRequest(Command command, RegistrationAssertion registrationInfo, MapperEndpoint mapperEndpoint, Handler<AsyncResult<Buffer>> resultHandler) {
        MultiMap headers = MultiMap.caseInsensitiveMultiMap();
        JsonObject.mapFrom((Object)registrationInfo).forEach(property -> {
            Object value = property.getValue();
            if (value instanceof String) {
                headers.add((String)property.getKey(), (String)value);
            } else {
                headers.add((String)property.getKey(), Json.encode(value));
            }
        });
        if (command.getGatewayId() != null) {
            headers.add("gateway_id", command.getGatewayId());
        }
        if (command.getDeviceId() != null) {
            headers.add("device_id", command.getDeviceId());
        }
        if (command.getContentType() != null) {
            headers.add(HttpHeaders.CONTENT_TYPE.toString(), command.getContentType());
        }
        Promise result = Promise.promise();
        this.webClient.post(mapperEndpoint.getPort().intValue(), mapperEndpoint.getHost(), mapperEndpoint.getUri()).putHeaders(headers).ssl(Boolean.valueOf(mapperEndpoint.isTlsEnabled())).sendBuffer(command.getPayload(), httpResponseAsyncResult -> {
            if (httpResponseAsyncResult.failed()) {
                LOG.debug("failed to map message [origin: {}] using mapping service [host: {}, port: {}, URI: {}]", new Object[]{command.getDeviceId(), mapperEndpoint.getHost(), mapperEndpoint.getPort(), mapperEndpoint.getUri(), httpResponseAsyncResult.cause()});
                result.fail((Throwable)new ServerErrorException(503, httpResponseAsyncResult.cause()));
            } else {
                HttpResponse httpResponse = (HttpResponse)httpResponseAsyncResult.result();
                if (httpResponse.statusCode() == 200) {
                    result.complete((Object)httpResponse.bodyAsBuffer());
                } else {
                    LOG.debug("mapping service [host: {}, port: {}, URI: {}] returned unexpected status code: {}", new Object[]{mapperEndpoint.getHost(), mapperEndpoint.getPort(), mapperEndpoint.getUri(), httpResponse.statusCode()});
                    result.fail((Throwable)new ServerErrorException(503, "could not invoke configured mapping service"));
                }
            }
            resultHandler.handle((Object)result.future());
        });
    }

    private void mapDownstreamMessageRequest(MqttContext ctx, String tenantId, RegistrationAssertion registrationInfo, MapperEndpoint mapperEndpoint, Handler<AsyncResult<MappedMessage>> resultHandler) {
        MultiMap headers = MultiMap.caseInsensitiveMultiMap();
        JsonObject.mapFrom((Object)registrationInfo).forEach(property -> {
            Object value = property.getValue();
            if (value instanceof String) {
                headers.add((String)property.getKey(), (String)value);
            } else {
                headers.add((String)property.getKey(), Json.encode(value));
            }
        });
        headers.add("tenant_id", tenantId);
        headers.add("orig_address", ctx.getOrigAddress());
        if (ctx.contentType() != null) {
            headers.add(HttpHeaders.CONTENT_TYPE.toString(), ctx.contentType());
        }
        Promise result = Promise.promise();
        this.webClient.post(mapperEndpoint.getPort().intValue(), mapperEndpoint.getHost(), mapperEndpoint.getUri()).putHeaders(headers).ssl(Boolean.valueOf(mapperEndpoint.isTlsEnabled())).sendBuffer(ctx.payload(), httpResponseAsyncResult -> {
            if (httpResponseAsyncResult.failed()) {
                LOG.debug("failed to map message [original device: {}] using mapping service [host: {}, port: {}, URI: {}]", new Object[]{ctx.deviceId(), mapperEndpoint.getHost(), mapperEndpoint.getPort(), mapperEndpoint.getUri(), httpResponseAsyncResult.cause()});
                result.fail((Throwable)new ServerErrorException(503, httpResponseAsyncResult.cause()));
            } else {
                HttpResponse httpResponse = (HttpResponse)httpResponseAsyncResult.result();
                if (httpResponse.statusCode() == 200) {
                    HashMap<String, String> additionalProperties = new HashMap<String, String>();
                    httpResponse.headers().forEach(entry -> additionalProperties.put((String)entry.getKey(), (String)entry.getValue()));
                    String mappedDeviceId = Optional.ofNullable((String)additionalProperties.remove("device_id")).map(id -> {
                        LOG.debug("original device [{}] has been mapped to [{}]", (Object)ctx.deviceId(), id);
                        return id;
                    }).orElseGet(() -> ((RegistrationAssertion)registrationInfo).getDeviceId());
                    result.complete((Object)new MappedMessage(mappedDeviceId, httpResponse.bodyAsBuffer(), additionalProperties));
                } else {
                    LOG.debug("mapping service [host: {}, port: {}, URI: {}] returned unexpected status code: {}", new Object[]{mapperEndpoint.getHost(), mapperEndpoint.getPort(), mapperEndpoint.getUri(), httpResponse.statusCode()});
                    result.fail((Throwable)new ServerErrorException(503, "could not invoke configured mapping service"));
                }
            }
            resultHandler.handle((Object)result.future());
        });
    }
}

