/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.sdk.iot.device.transport.mqtt;

import com.microsoft.azure.sdk.iot.device.Message;
import com.microsoft.azure.sdk.iot.device.MessageType;
import com.microsoft.azure.sdk.iot.device.transport.IotHubListener;
import com.microsoft.azure.sdk.iot.device.transport.IotHubTransportMessage;
import com.microsoft.azure.sdk.iot.device.transport.ProtocolException;
import com.microsoft.azure.sdk.iot.device.transport.TransportException;
import com.microsoft.azure.sdk.iot.device.transport.mqtt.MqttMessageListener;
import com.microsoft.azure.sdk.iot.device.transport.mqtt.exceptions.PahoExceptionTranslator;
import com.microsoft.azure.sdk.iot.device.twin.DeviceOperations;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Queue;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class Mqtt
implements MqttCallback {
    private static final Logger log = LoggerFactory.getLogger(Mqtt.class);
    private static final int CONNECTION_TIMEOUT = 60000;
    private static final int DISCONNECTION_TIMEOUT = 60000;
    private static final int QOS = 1;
    private static final int MAX_SUBSCRIBE_ACK_WAIT_TIME = 15000;
    static final int MAX_IN_FLIGHT_COUNT = 65000;
    private MqttAsyncClient mqttAsyncClient;
    private final MqttConnectOptions connectOptions;
    private final MqttMessageListener messageListener;
    private final Map<Integer, Message> unacknowledgedSentMessages;
    final Object receivedMessagesLock;
    final Queue<Pair<String, MqttMessage>> receivedMessages;
    static final char MESSAGE_PROPERTY_SEPARATOR = '&';
    private static final String MESSAGE_SYSTEM_PROPERTY_IDENTIFIER_ENCODED = "%24";
    private static final char MESSAGE_SYSTEM_PROPERTY_IDENTIFIER_DECODED = '$';
    static final char MESSAGE_PROPERTY_KEY_VALUE_SEPARATOR = '=';
    private static final int PROPERTY_KEY_INDEX = 0;
    private static final int PROPERTY_VALUE_INDEX = 1;
    private static final String ABSOLUTE_EXPIRY_TIME = "$.exp";
    static final String CORRELATION_ID = "$.cid";
    static final String MESSAGE_ID = "$.mid";
    static final String TO = "$.to";
    static final String USER_ID = "$.uid";
    static final String OUTPUT_NAME = "$.on";
    static final String CONNECTION_DEVICE_ID = "$.cdid";
    static final String CONNECTION_MODULE_ID = "$.cmid";
    static final String CONTENT_TYPE = "$.ct";
    static final String CONTENT_ENCODING = "$.ce";
    static final String CREATION_TIME_UTC = "$.ctime";
    static final String MQTT_SECURITY_INTERFACE_ID = "$.ifid";
    static final String COMPONENT_ID = "$.sub";
    private static final String IOTHUB_ACK = "iothub-ack";
    private static final String INPUTS_PATH_STRING = "inputs";
    private static final String MODULES_PATH_STRING = "modules";
    private IotHubListener listener;
    private String connectionId;
    private final String deviceId;
    private final Object stateLock;
    private final Object unacknowledgedSentMessagesLock;

    Mqtt(MqttMessageListener messageListener, String deviceId, MqttConnectOptions connectOptions, Map<Integer, Message> unacknowledgedSentMessages, Queue<Pair<String, MqttMessage>> receivedMessages) {
        this.deviceId = deviceId;
        this.receivedMessages = receivedMessages;
        this.stateLock = new Object();
        this.receivedMessagesLock = new Object();
        this.unacknowledgedSentMessagesLock = new Object();
        this.messageListener = messageListener;
        this.connectOptions = connectOptions;
        this.unacknowledgedSentMessages = unacknowledgedSentMessages;
    }

    void updatePassword(char[] newPassword) {
        this.connectOptions.setPassword(newPassword);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void connect() throws TransportException {
        Object object = this.stateLock;
        synchronized (object) {
            try {
                if (!this.mqttAsyncClient.isConnected()) {
                    log.debug("Sending MQTT CONNECT packet...");
                    IMqttToken connectToken = this.mqttAsyncClient.connect(this.connectOptions);
                    connectToken.waitForCompletion(60000L);
                    log.debug("Sent MQTT CONNECT packet was acknowledged");
                }
            }
            catch (MqttException e) {
                log.debug("Exception encountered while sending MQTT CONNECT packet", (Throwable)e);
                this.disconnect();
                throw PahoExceptionTranslator.convertToMqttException(e, "Unable to establish MQTT connection");
            }
        }
    }

    void disconnect() {
        try {
            if (this.mqttAsyncClient.isConnected()) {
                log.debug("Sending MQTT DISCONNECT packet");
                IMqttToken disconnectToken = this.mqttAsyncClient.disconnect();
                if (disconnectToken != null) {
                    disconnectToken.waitForCompletion(60000L);
                }
                log.debug("Sent MQTT DISCONNECT packet was acknowledged");
            }
        }
        catch (MqttException e) {
            ProtocolException transportException = PahoExceptionTranslator.convertToMqttException(e, "Unable to disconnect");
            log.warn("Exception encountered while sending MQTT DISCONNECT packet. Forcefully closing the connection.", (Throwable)transportException);
        }
        finally {
            try {
                this.mqttAsyncClient.close();
            }
            catch (MqttException ex) {
                log.debug("Mqtt client was already closed, so ignoring the thrown exception", (Throwable)ex);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void publish(String publishTopic, Message message) throws TransportException {
        try {
            if (!this.mqttAsyncClient.isConnected()) {
                TransportException transportException = new TransportException("Cannot publish when mqtt client is disconnected");
                transportException.setRetryable(true);
                throw transportException;
            }
            if (message == null || publishTopic == null || publishTopic.length() == 0 || message.getBytes() == null) {
                throw new IllegalArgumentException("Cannot publish on null or empty publish topic");
            }
            byte[] payload = message.getBytes();
            while (this.mqttAsyncClient.getPendingDeliveryTokens().length >= 65000) {
                Thread.sleep(10L);
                if (this.mqttAsyncClient.isConnected()) continue;
                TransportException transportException = new TransportException("Cannot publish when mqtt client is holding 65000 tokens and is disconnected");
                transportException.setRetryable(true);
                throw transportException;
            }
            MqttMessage mqttMessage = payload.length == 0 ? new MqttMessage() : new MqttMessage(payload);
            mqttMessage.setQos(1);
            Object object = this.unacknowledgedSentMessagesLock;
            synchronized (object) {
                log.trace("Publishing message ({}) to MQTT topic {}", (Object)message, (Object)publishTopic);
                IMqttDeliveryToken publishToken = this.mqttAsyncClient.publish(publishTopic, mqttMessage);
                this.unacknowledgedSentMessages.put(publishToken.getMessageId(), message);
                log.trace("Message published to MQTT topic {}. Mqtt message id {} added to list of messages to wait for acknowledgement ({})", new Object[]{publishTopic, publishToken.getMessageId(), message});
            }
        }
        catch (MqttException e) {
            log.warn("Message could not be published to MQTT topic {} ({})", new Object[]{publishTopic, message, e});
            throw PahoExceptionTranslator.convertToMqttException(e, "Unable to publish message on topic : " + publishTopic);
        }
        catch (InterruptedException e) {
            throw new TransportException("Interrupted, Unable to publish message on topic : " + publishTopic, e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void subscribe(String topic) throws TransportException {
        Object object = this.stateLock;
        synchronized (object) {
            try {
                if (topic == null) {
                    throw new IllegalArgumentException("Topic cannot be null");
                }
                if (!this.mqttAsyncClient.isConnected()) {
                    TransportException transportException = new TransportException("Cannot subscribe when mqtt client is disconnected");
                    transportException.setRetryable(true);
                    throw transportException;
                }
                log.debug("Sending MQTT SUBSCRIBE packet for topic {}", (Object)topic);
                IMqttToken subToken = this.mqttAsyncClient.subscribe(topic, 1);
                subToken.waitForCompletion(15000L);
                log.debug("Sent MQTT SUBSCRIBE packet for topic {} was acknowledged", (Object)topic);
            }
            catch (MqttException e) {
                log.warn("Encountered exception while sending MQTT SUBSCRIBE packet for topic {}", (Object)topic, (Object)e);
                throw PahoExceptionTranslator.convertToMqttException(e, "Unable to subscribe to topic :" + topic);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public IotHubTransportMessage receive() {
        Object object = this.receivedMessagesLock;
        synchronized (object) {
            Pair<String, MqttMessage> messagePair = this.receivedMessages.peek();
            if (messagePair != null) {
                String topic = (String)messagePair.getKey();
                if (topic != null) {
                    MqttMessage message = (MqttMessage)messagePair.getValue();
                    if (message != null) {
                        this.receivedMessages.poll();
                        return this.constructMessage(message, topic);
                    }
                    log.warn("Data cannot be null when topic is non-null");
                } else {
                    return null;
                }
            }
            return null;
        }
    }

    public void connectionLost(Throwable throwable) {
        log.warn("Mqtt connection lost", throwable);
        this.disconnect();
        if (this.listener != null) {
            TransportException transportException;
            if (throwable instanceof MqttException) {
                transportException = PahoExceptionTranslator.convertToMqttException((MqttException)throwable, "Mqtt connection lost");
                log.trace("Mqtt connection loss interpreted into transport exception", throwable);
            } else {
                transportException = new TransportException(throwable);
            }
            this.listener.onConnectionLost(transportException, this.connectionId);
        }
    }

    public void messageArrived(String topic, MqttMessage mqttMessage) {
        log.trace("Mqtt message arrived on topic {} with mqtt message id {}", (Object)topic, (Object)mqttMessage.getId());
        this.receivedMessages.add((Pair<String, MqttMessage>)new MutablePair((Object)topic, (Object)mqttMessage));
        if (this.messageListener != null) {
            this.messageListener.onMessageArrived(mqttMessage.getId());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        DeviceOperations deviceOperation;
        Message deliveredMessage = null;
        log.trace("Mqtt message with message id {} was acknowledged by service", (Object)iMqttDeliveryToken.getMessageId());
        Object object = this.unacknowledgedSentMessagesLock;
        synchronized (object) {
            if (this.unacknowledgedSentMessages.containsKey(iMqttDeliveryToken.getMessageId())) {
                log.trace("Mqtt message with message id {} that was acknowledged by service was sent by this client", (Object)iMqttDeliveryToken.getMessageId());
                deliveredMessage = this.unacknowledgedSentMessages.remove(iMqttDeliveryToken.getMessageId());
            } else {
                log.warn("Mqtt message with message id {} that was acknowledged by service was not sent by this client, will be ignored", (Object)iMqttDeliveryToken.getMessageId());
            }
        }
        if (deliveredMessage instanceof IotHubTransportMessage && ((deviceOperation = ((IotHubTransportMessage)deliveredMessage).getDeviceOperationType()) == DeviceOperations.DEVICE_OPERATION_TWIN_SUBSCRIBE_DESIRED_PROPERTIES_REQUEST || deviceOperation == DeviceOperations.DEVICE_OPERATION_METHOD_SUBSCRIBE_REQUEST || deviceOperation == DeviceOperations.DEVICE_OPERATION_TWIN_UNSUBSCRIBE_DESIRED_PROPERTIES_REQUEST)) {
            return;
        }
        if (this.listener != null) {
            this.listener.onMessageSent(deliveredMessage, this.deviceId, null);
        } else {
            log.warn("Message sent, but no listener set");
        }
    }

    void sendMessageAcknowledgement(int messageId) throws TransportException {
        log.trace("Sending mqtt ack for received message with mqtt message id {}", (Object)messageId);
        try {
            this.mqttAsyncClient.messageArrivedComplete(messageId, 1);
        }
        catch (MqttException e) {
            throw PahoExceptionTranslator.convertToMqttException(e, "Error sending message ack");
        }
    }

    private IotHubTransportMessage constructMessage(MqttMessage mqttMessage, String topic) {
        IotHubTransportMessage message = new IotHubTransportMessage(mqttMessage.getPayload(), MessageType.DEVICE_TELEMETRY);
        message.setQualityOfService(mqttMessage.getQos());
        int propertiesStringStartingIndex = topic.indexOf(MESSAGE_SYSTEM_PROPERTY_IDENTIFIER_ENCODED);
        if (propertiesStringStartingIndex != -1) {
            String propertiesString = topic.substring(propertiesStringStartingIndex);
            this.assignPropertiesToMessage(message, propertiesString);
            String routeString = topic.substring(0, propertiesStringStartingIndex);
            String[] routeComponents = routeString.split("/");
            if (routeComponents.length > 2 && routeComponents[2].equals(MODULES_PATH_STRING)) {
                message.setConnectionModuleId(routeComponents[3]);
            }
            if (routeComponents.length > 4 && routeComponents[4].equals(INPUTS_PATH_STRING)) {
                message.setInputName(routeComponents[5]);
            }
        }
        return message;
    }

    private void assignPropertiesToMessage(Message message, String propertiesString) throws IllegalStateException, IllegalArgumentException {
        for (String propertyString : propertiesString.split(String.valueOf('&'))) {
            if (propertyString.contains("=")) {
                String key = propertyString.split("=")[0];
                String value = propertyString.split("=")[1];
                try {
                    key = URLDecoder.decode(key, StandardCharsets.UTF_8.name());
                    value = URLDecoder.decode(value, StandardCharsets.UTF_8.name());
                }
                catch (UnsupportedEncodingException e) {
                    throw new IllegalStateException(e);
                }
                switch (key) {
                    case "$.to": 
                    case "iothub-ack": 
                    case "$.uid": 
                    case "$.exp": {
                        break;
                    }
                    case "$.mid": {
                        message.setMessageId(value);
                        break;
                    }
                    case "$.cid": {
                        message.setCorrelationId(value);
                        break;
                    }
                    case "$.on": {
                        message.setOutputName(value);
                        break;
                    }
                    case "$.ce": {
                        message.setContentEncoding(value);
                        break;
                    }
                    case "$.ct": {
                        message.setContentType(value);
                        break;
                    }
                    default: {
                        message.setProperty(key, value);
                        break;
                    }
                }
                continue;
            }
            throw new IllegalArgumentException("Unexpected property string provided. Expected '=' symbol between key and value of the property in string: " + propertyString);
        }
    }

    void setListener(IotHubListener listener) {
        this.listener = listener;
    }

    void setConnectionId(String connectionId) {
        this.connectionId = connectionId;
    }

    void setMqttAsyncClient(MqttAsyncClient mqttAsyncClient) {
        this.mqttAsyncClient = mqttAsyncClient;
    }
}

