/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.cli.app;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.annotation.PostConstruct;
import org.eclipse.hono.cli.AbstractCliClient;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.consumer.KafkaConsumerConfigProperties;
import org.eclipse.hono.util.Strings;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

@Component
@Profile(value={"receiver-kafka"})
public class KafkaReceiver
extends AbstractCliClient {
    private static final String TYPE_TELEMETRY = "telemetry";
    private static final String TYPE_EVENT = "event";
    private static final String TYPE_ALL = "all";
    @Value(value="${tenant.id}")
    protected String tenantId;
    @Value(value="${message.type}")
    protected String messageType;
    @Value(value="${print.verbose:true}")
    protected Boolean isPrintVerbose;
    private KafkaConsumerConfigProperties config;

    @Autowired
    public void setConfig(KafkaConsumerConfigProperties config) {
        this.config = config;
    }

    @PostConstruct
    Future<Void> start() {
        return this.createConsumer().onComplete(this::handleCreateConsumerStatus);
    }

    private Future<Void> createConsumer() {
        if (Strings.isNullOrEmpty((Object)this.tenantId)) {
            return Future.failedFuture((String)"tenant id is not set");
        }
        Set<String> topics = this.getTopics();
        if (topics.isEmpty()) {
            return Future.failedFuture((String)String.format("Invalid message type [\"%s\"]. Valid types are \"telemetry\", \"event\" or \"all\"", this.messageType));
        }
        KafkaConsumer consumer = KafkaConsumer.create((Vertx)this.vertx, (Map)this.config.getConsumerConfig("cli"), String.class, Buffer.class);
        consumer.handler(this::logMessage);
        Promise promise = Promise.promise();
        consumer.subscribe(topics, (Handler)promise);
        return promise.future();
    }

    private Set<String> getTopics() {
        HashSet<String> topics = new HashSet<String>();
        if (this.messageType.equals(TYPE_TELEMETRY) || this.messageType.equals(TYPE_ALL)) {
            topics.add(new HonoTopic(HonoTopic.Type.TELEMETRY, this.tenantId).toString());
        }
        if (this.messageType.equals(TYPE_EVENT) || this.messageType.equals(TYPE_ALL)) {
            topics.add(new HonoTopic(HonoTopic.Type.EVENT, this.tenantId).toString());
        }
        return topics;
    }

    private void handleCreateConsumerStatus(AsyncResult<Void> startup) {
        if (startup.succeeded()) {
            this.log.info("Receiver [tenant: {}, mode: {}] subscribed successfully, hit ctrl-c to exit", (Object)this.tenantId, (Object)this.messageType);
        } else {
            this.log.error("Error occurred during initialization of receiver: {}", (Object)startup.cause().getMessage());
            this.vertx.close();
        }
    }

    private void logMessage(KafkaConsumerRecord<String, Buffer> record) {
        if (this.isPrintVerbose.booleanValue()) {
            this.logVerbosely(record);
        } else {
            this.logBriefly(record);
        }
    }

    private void logVerbosely(KafkaConsumerRecord<String, Buffer> record) {
        long timestamp = record.timestamp();
        LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault());
        StringBuilder stringBuilder = new StringBuilder();
        record.headers().forEach(h -> {
            stringBuilder.append("    ");
            stringBuilder.append(h.key());
            stringBuilder.append("=");
            stringBuilder.append(h.value());
            stringBuilder.append("\n");
        });
        String headers = stringBuilder.toString();
        this.log.info("topic: {}, partition: {}, offset: {}, timestamp: {} ({})\n  Headers:\n{}  Key:\n    {}\n  Value:\n    {}", new Object[]{record.topic(), record.partition(), record.offset(), timestamp, time, headers, record.key(), record.value()});
    }

    private void logBriefly(KafkaConsumerRecord<String, Buffer> record) {
        String endpoint = record.topic().equals(new HonoTopic(HonoTopic.Type.EVENT, this.tenantId).toString()) ? TYPE_EVENT : TYPE_TELEMETRY;
        String contentType = record.headers().stream().filter(h -> h.key().equals("content-type")).findAny().map(h -> h.value().toString()).orElse("");
        this.log.info("received {} message [device: {}, content-type: {}]: {}", new Object[]{endpoint, record.key(), contentType, record.value()});
    }
}

