/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.example.protocolgateway.controller;

import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import io.vertx.core.parsetools.RecordParser;
import io.vertx.core.streams.ReadStream;
import io.vertx.proton.ProtonDelivery;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.amqp.connection.AmqpUtils;
import org.eclipse.hono.client.command.CommandConsumer;
import org.eclipse.hono.client.device.amqp.AmqpAdapterClient;
import org.eclipse.hono.example.protocolgateway.TcpServer;
import org.eclipse.hono.util.QoS;
import org.eclipse.hono.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Dependent
public class ProtocolGateway {
    private static final String CMD_LOGIN = "login";
    private static final String CMD_SUBSCRIBE = "subscribe";
    private static final String CMD_UNSUBSCRIBE = "unsubscribe";
    private static final String CONTENT_TYPE_BINARY_OPAQUE = "binary/opaque";
    private static final String KEY_COMMAND_CONSUMER = "command_consumer";
    private static final String KEY_DEVICE_ID = "device_id";
    private static final Logger LOG = LoggerFactory.getLogger(ProtocolGateway.class);
    private static final String MSG_DEVICE_NOT_LOGGED_IN = "device not logged in";
    private final AmqpAdapterClient amqpAdapterClient;
    private final TcpServer server;

    @Inject
    public ProtocolGateway(AmqpAdapterClient client, TcpServer server) {
        this.amqpAdapterClient = client;
        this.server = server;
    }

    public void onStart(@Observes StartupEvent ev) {
        this.server.setConnectHandler((Handler<NetSocket>)((Handler)this::handleConnect));
        this.amqpAdapterClient.connect().onSuccess(ok -> LOG.info("successfully connected to Hono's AMQP adapter")).onFailure(t -> LOG.error("failed to connect to Hono's AMQP adapter")).compose(ok -> this.server.start()).onSuccess(s -> LOG.info("successfully started example protocol gateway")).onFailure(t -> LOG.error("failed to start protocol gateway", t));
    }

    public void onStop(@Observes ShutdownEvent ev) {
        this.server.stop().onComplete(r -> this.amqpAdapterClient.disconnect());
    }

    void handleConnect(NetSocket socket) {
        HashMap dict = new HashMap();
        RecordParser commandParser = RecordParser.newDelimited((String)"\n", (ReadStream)socket);
        commandParser.endHandler(end -> socket.close());
        commandParser.exceptionHandler(t -> {
            LOG.debug("error processing data from device", t);
            socket.close();
        });
        commandParser.handler(data -> this.handleData(socket, dict, (Buffer)data));
        socket.closeHandler(remoteClose -> {
            LOG.debug("device closed connection");
            Optional.ofNullable((CommandConsumer)dict.get(KEY_COMMAND_CONSUMER)).ifPresent(c -> c.close(null).onComplete(res -> LOG.debug("closed device's command consumer")));
            socket.close();
        });
        socket.write("Welcome to the example Protocol Gateway for devices, please enter a command\n");
        LOG.debug("connection with client established");
    }

    private void handleData(NetSocket socket, Map<String, Object> dictionary, Buffer buffer) {
        String data = buffer.toString();
        LOG.debug("received data from device: [{}]", (Object)data);
        String[] command = data.split(" ", 2);
        this.executeCommand(command, socket, dictionary).onSuccess(c -> socket.write("OK\n")).onFailure(t -> {
            LOG.debug("failed to process data provided by device");
            socket.write("FAILED: " + t.getMessage() + "\n");
        });
    }

    private Future<Void> executeCommand(String[] command, NetSocket socket, Map<String, Object> dictionary) {
        String commandName = command[0];
        String args = command.length > 1 ? command[1] : null;
        LOG.debug("processing command: {}", (Object)commandName);
        switch (commandName) {
            case "login": {
                return this.login(args, socket, dictionary);
            }
            case "telemetry": 
            case "t": {
                return this.sendTelemetry(args, dictionary);
            }
            case "event": 
            case "e": {
                return this.sendEvent(args, dictionary);
            }
            case "subscribe": {
                return this.subscribe(socket, dictionary);
            }
            case "unsubscribe": {
                return this.unsubscribe(dictionary);
            }
        }
        LOG.debug("unsupported command [{}]", (Object)commandName);
        return Future.failedFuture((String)"no such command");
    }

    private Future<Void> login(String args, NetSocket socket, Map<String, Object> dictionary) {
        if (Strings.isNullOrEmpty((Object)args)) {
            return Future.failedFuture((String)"missing device identifier");
        }
        String deviceId = args;
        LOG.info("authenticating device [id: {}]", (Object)deviceId);
        dictionary.put(KEY_DEVICE_ID, deviceId);
        socket.write(String.format("device [%s] logged in\n", deviceId));
        return Future.succeededFuture();
    }

    private Future<Void> sendTelemetry(String args, Map<String, Object> dictionary) {
        Promise result = Promise.promise();
        LOG.debug("Command: send Telemetry");
        Optional.ofNullable(dictionary.get(KEY_DEVICE_ID)).ifPresentOrElse(obj -> {
            String deviceId = (String)obj;
            if (Strings.isNullOrEmpty((Object)args)) {
                result.fail("missing params qos and payload");
            } else {
                String[] params = args.split(" ", 2);
                QoS qos = "0".equals(params[0]) ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE;
                Buffer payload = Optional.ofNullable(params[1]).map(p -> Buffer.buffer((String)p)).orElse(null);
                this.amqpAdapterClient.sendTelemetry(qos, payload, CONTENT_TYPE_BINARY_OPAQUE, null, deviceId, null).map((Object)null).onComplete((Handler)result);
            }
        }, () -> result.fail(MSG_DEVICE_NOT_LOGGED_IN));
        return result.future();
    }

    private Future<Void> sendEvent(String args, Map<String, Object> dictionary) {
        Promise result = Promise.promise();
        LOG.debug("Command: send Event");
        Optional.ofNullable(dictionary.get(KEY_DEVICE_ID)).ifPresentOrElse(obj -> {
            String deviceId = (String)obj;
            if (Strings.isNullOrEmpty((Object)args)) {
                result.fail("missing payload");
            } else {
                Buffer payload = Buffer.buffer((String)args);
                this.amqpAdapterClient.sendEvent(payload, CONTENT_TYPE_BINARY_OPAQUE, null, deviceId, null).map((Object)null).onComplete((Handler)result);
            }
        }, () -> result.fail(MSG_DEVICE_NOT_LOGGED_IN));
        return result.future();
    }

    private Future<Void> subscribe(NetSocket socket, Map<String, Object> dictionary) {
        Promise result = Promise.promise();
        LOG.debug("Command: subscribe");
        Optional.ofNullable(dictionary.get(KEY_DEVICE_ID)).ifPresentOrElse(obj -> {
            String deviceId = (String)obj;
            this.subscribe(deviceId, socket).map(consumer -> {
                dictionary.put(KEY_COMMAND_CONSUMER, consumer);
                return null;
            }).onComplete((Handler)result);
        }, () -> result.fail(MSG_DEVICE_NOT_LOGGED_IN));
        return result.future();
    }

    private Future<Void> unsubscribe(Map<String, Object> dictionary) {
        Promise result = Promise.promise();
        LOG.debug("Command: unsubscribe");
        Optional.ofNullable(dictionary.get(KEY_COMMAND_CONSUMER)).ifPresentOrElse(obj -> ((CommandConsumer)obj).close(null).onComplete((Handler)result), () -> result.fail("device not subscribed to commands"));
        return result.future();
    }

    private Future<CommandConsumer> subscribe(String deviceId, NetSocket socket) {
        Consumer<Message> messageHandler = m -> {
            boolean isOneWay;
            String commandPayload = AmqpUtils.getPayloadAsString((Message)m);
            boolean bl = isOneWay = m.getReplyTo() == null;
            if (isOneWay) {
                LOG.debug("received one-way command [name: {}]: {}", (Object)m.getSubject(), (Object)commandPayload);
                socket.write(String.format("ONE-WAY COMMAND [name: %s]: %s\n", m.getSubject(), commandPayload));
            } else {
                LOG.debug("received command [name: {}]: {}", (Object)m.getSubject(), (Object)commandPayload);
                if ("tellTime".equals(m.getSubject())) {
                    this.respondWithTime((Message)m).onSuccess(delivery -> LOG.debug("sent response to command [name: {}, outcome: {}]", (Object)m.getSubject(), (Object)delivery.getRemoteState().getType())).onFailure(t -> LOG.info("failed to send response to command [name: {}]", (Object)m.getSubject(), t));
                }
            }
        };
        return this.amqpAdapterClient.createDeviceSpecificCommandConsumer(null, deviceId, messageHandler);
    }

    private Future<ProtonDelivery> respondWithTime(Message command) {
        Buffer payload = Buffer.buffer((String)String.format("myCurrentTime: %s", DateTimeFormatter.ISO_DATE_TIME.format(LocalDateTime.now(ZoneId.systemDefault()))));
        return this.amqpAdapterClient.sendCommandResponse(command.getReplyTo(), (String)command.getCorrelationId(), 200, payload, "text/plain", null);
    }
}

