/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.application.client.kafka.impl;

import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.producer.KafkaHeader;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.MessageProperties;
import org.eclipse.hono.application.client.kafka.KafkaMessageContext;
import org.eclipse.hono.application.client.kafka.KafkaMessageProperties;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.util.QoS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaDownstreamMessage
implements DownstreamMessage<KafkaMessageContext> {
    private final Logger log = LoggerFactory.getLogger(KafkaDownstreamMessage.class);
    private final String tenantId;
    private final String deviceId;
    private final MessageProperties properties;
    private final String contentType;
    private final KafkaMessageContext messageContext;
    private final QoS qos;
    private final Buffer payload;
    private final Instant creationTime;
    private final Integer timeTillDisconnect;

    public KafkaDownstreamMessage(KafkaConsumerRecord<String, Buffer> record) {
        Objects.requireNonNull(record);
        this.tenantId = this.getTenantId(record);
        this.deviceId = (String)record.key();
        this.properties = new KafkaMessageProperties(record);
        this.contentType = this.getContentType(record.headers());
        this.messageContext = new KafkaMessageContext(record);
        this.qos = this.getQoS(record.headers());
        this.payload = (Buffer)record.value();
        this.creationTime = this.getCreationTime(record.headers());
        this.timeTillDisconnect = this.getTimeTillDisconnect(record.headers());
    }

    private String getTenantId(KafkaConsumerRecord<String, Buffer> record) {
        return Optional.ofNullable(HonoTopic.fromString((String)record.topic())).map(HonoTopic::getTenantId).orElseThrow(() -> new IllegalArgumentException("Invalid topic name"));
    }

    private String getContentType(List<KafkaHeader> headers) {
        return KafkaRecordHelper.getContentType(headers).orElse("application/octet-stream");
    }

    private QoS getQoS(List<KafkaHeader> headers) {
        return KafkaRecordHelper.getQoS(headers).orElse(QoS.AT_LEAST_ONCE);
    }

    private Instant getCreationTime(List<KafkaHeader> headers) {
        return KafkaRecordHelper.getHeaderValue(headers, (String)"creation-time", Long.class).map(Instant::ofEpochMilli).orElse(null);
    }

    private Integer getTimeTillDisconnect(List<KafkaHeader> headers) {
        return KafkaRecordHelper.getHeaderValue(headers, (String)"ttd", Integer.class).orElse(null);
    }

    public final String getTenantId() {
        return this.tenantId;
    }

    public final String getDeviceId() {
        return this.deviceId;
    }

    public final MessageProperties getProperties() {
        return this.properties;
    }

    public final String getContentType() {
        return this.contentType;
    }

    public final KafkaMessageContext getMessageContext() {
        return this.messageContext;
    }

    public final QoS getQos() {
        return this.qos;
    }

    public final Buffer getPayload() {
        return this.payload;
    }

    public Instant getCreationTime() {
        return this.creationTime;
    }

    public Integer getTimeTillDisconnect() {
        return this.timeTillDisconnect;
    }

    public String getCorrelationId() {
        return (String)this.properties.getProperty("correlation-id", String.class);
    }

    public Integer getStatus() {
        return (Integer)this.properties.getProperty("status", Integer.class);
    }
}

