/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.cli;

import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.proton.ProtonConnection;
import java.util.ArrayList;
import javax.annotation.PostConstruct;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.cli.AbstractClient;
import org.eclipse.hono.client.HonoClient;
import org.eclipse.hono.util.MessageHelper;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

@Component
@Profile(value={"receiver"})
public class Receiver
extends AbstractClient {
    private static final String TYPE_TELEMETRY = "telemetry";
    private static final String TYPE_EVENT = "event";
    private static final String TYPE_ALL = "all";

    @PostConstruct
    Future<CompositeFuture> start() {
        return this.client.connect(this::onDisconnect).compose(this::createConsumer).setHandler(this::handleCreateConsumerStatus);
    }

    private CompositeFuture createConsumer(HonoClient connectedClient) {
        Handler closeHandler = closeHook -> {
            this.LOG.info("close handler of consumer is called");
            this.vertx.setTimer((long)this.connectionTimeOut, reconnect -> {
                this.LOG.info("attempting to re-open the consumer link ...");
                this.createConsumer(connectedClient);
            });
        };
        ArrayList<Future> consumerFutures = new ArrayList<Future>();
        if (this.messageType.equals(TYPE_EVENT) || this.messageType.equals(TYPE_ALL)) {
            consumerFutures.add(connectedClient.createEventConsumer(this.tenantId, msg -> this.handleMessage(TYPE_EVENT, (Message)msg), closeHandler));
        }
        if (this.messageType.equals(TYPE_TELEMETRY) || this.messageType.equals(TYPE_ALL)) {
            consumerFutures.add(connectedClient.createTelemetryConsumer(this.tenantId, msg -> this.handleMessage(TYPE_TELEMETRY, (Message)msg), closeHandler));
        }
        if (consumerFutures.isEmpty()) {
            consumerFutures.add(Future.failedFuture((String)"Invalid message type. Valid types are telemetry, event or all"));
        }
        return CompositeFuture.all(consumerFutures);
    }

    private void onDisconnect(ProtonConnection con) {
        this.vertx.setTimer((long)this.connectionTimeOut, reconnect -> {
            this.LOG.info("attempting to re-connect to Hono ...");
            this.client.connect(this::onDisconnect).compose(this::createConsumer);
        });
    }

    private void handleMessage(String endpoint, Message msg) {
        String deviceId = MessageHelper.getDeviceId((Message)msg);
        Section body = msg.getBody();
        String content = null;
        if (body instanceof Data) {
            content = ((Data)msg.getBody()).getValue().toString();
        } else if (body instanceof AmqpValue) {
            content = ((AmqpValue)msg.getBody()).getValue().toString();
        }
        this.LOG.info("received {} message [device: {}, content-type: {}]: {}", new Object[]{endpoint, deviceId, msg.getContentType(), content});
        if (msg.getApplicationProperties() != null) {
            this.LOG.info("... with application properties: {}", (Object)msg.getApplicationProperties().getValue());
        }
    }

    private void handleCreateConsumerStatus(AsyncResult<CompositeFuture> startup) {
        if (startup.succeeded()) {
            this.LOG.info("Receiver [tenant: {}, mode: {}] created successfully, hit ctrl-c to exit", (Object)this.tenantId, (Object)this.messageType);
        } else {
            this.LOG.error("Error occurred during initialization of receiver: {}", (Object)startup.cause().getMessage());
            this.vertx.close();
        }
    }
}

