/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.sdk.iot.device.transport.amqps;

import com.microsoft.azure.sdk.iot.deps.ws.impl.WebSocketImpl;
import com.microsoft.azure.sdk.iot.device.CustomLogger;
import com.microsoft.azure.sdk.iot.device.DeviceClientConfig;
import com.microsoft.azure.sdk.iot.device.DeviceTwin.DeviceOperations;
import com.microsoft.azure.sdk.iot.device.IotHubMessageResult;
import com.microsoft.azure.sdk.iot.device.IotHubStatusCode;
import com.microsoft.azure.sdk.iot.device.Message;
import com.microsoft.azure.sdk.iot.device.MessageType;
import com.microsoft.azure.sdk.iot.device.auth.IotHubSasTokenAuthenticationProvider;
import com.microsoft.azure.sdk.iot.device.exceptions.TransportException;
import com.microsoft.azure.sdk.iot.device.transport.IotHubConnectionStatus;
import com.microsoft.azure.sdk.iot.device.transport.IotHubListener;
import com.microsoft.azure.sdk.iot.device.transport.IotHubTransportConnection;
import com.microsoft.azure.sdk.iot.device.transport.IotHubTransportMessage;
import com.microsoft.azure.sdk.iot.device.transport.ReconnectionNotifier;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpSasTokenRenewalHandler;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsConvertFromProtonReturnValue;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsConvertToProtonReturnValue;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsExceptionTranslator;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsMessage;
import com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsSessionManager;
import com.microsoft.azure.sdk.iot.device.transport.amqps.IotHubReactor;
import java.io.IOException;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Received;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Endpoint;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.HandlerException;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.TransportInternal;
import org.apache.qpid.proton.engine.impl.TransportLayer;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.apache.qpid.proton.reactor.FlowController;
import org.apache.qpid.proton.reactor.Handshaker;
import org.apache.qpid.proton.reactor.Reactor;
import org.apache.qpid.proton.reactor.ReactorOptions;

public final class AmqpsIotHubConnection
extends BaseHandler
implements IotHubTransportConnection {
    private static final int MAX_WAIT_TO_OPEN_CLOSE_CONNECTION = 90000;
    private static final int MAX_WAIT_TO_TERMINATE_EXECUTOR = 30;
    private static final int SEND_MESSAGES_PERIOD_MILLIS = 50;
    private IotHubConnectionStatus state;
    private static final String WEB_SOCKET_PATH = "/$iothub/websocket";
    private static final String WEB_SOCKET_SUB_PROTOCOL = "AMQPWSB10";
    private static final int AMQP_PORT = 5671;
    private static final int AMQP_WEB_SOCKET_PORT = 443;
    private Connection connection;
    private String hostName;
    private final Boolean useWebSockets;
    private DeviceClientConfig deviceClientConfig;
    private final Map<Integer, Message> inProgressMessages = new ConcurrentHashMap<Integer, Message>();
    private final Map<Message, AmqpsMessage> sendAckMessages = new ConcurrentHashMap<Message, AmqpsMessage>();
    private IotHubListener listener;
    private boolean reconnectionScheduled = false;
    private ExecutorService executorService;
    private ScheduledExecutorService scheduledExecutorService;
    private AmqpSasTokenRenewalHandler sasTokenRenewalHandler;
    private CountDownLatch openLatch;
    private CountDownLatch closeLatch;
    public String connectionId;
    private Reactor reactor;
    private CustomLogger logger;
    private TransportException savedException;
    public AmqpsSessionManager amqpsSessionManager;
    private static final String APPLICATION_PROPERTY_STATUS_CODE = "status-code";
    private static final String APPLICATION_PROPERTY_STATUS_DESCRIPTION = "status-description";
    private static final int MAX_MESSAGES_TO_SEND_PER_CALLBACK = 1000;
    private boolean methodSubscribed;
    private boolean twinSubscribed;
    private Queue<Message> messagesToSend = new ConcurrentLinkedQueue<Message>();
    int queueCount = 0;

    public AmqpsIotHubConnection(DeviceClientConfig config) {
        if (config == null) {
            throw new IllegalArgumentException("The DeviceClientConfig cannot be null.");
        }
        if (config.getIotHubHostname() == null || config.getIotHubHostname().length() == 0) {
            throw new IllegalArgumentException("hostName cannot be null or empty.");
        }
        if (config.getDeviceId() == null || config.getDeviceId().length() == 0) {
            throw new IllegalArgumentException("deviceID cannot be null or empty.");
        }
        if (config.getIotHubName() == null || config.getIotHubName().length() == 0) {
            throw new IllegalArgumentException("hubName cannot be null or empty.");
        }
        this.deviceClientConfig = config;
        this.useWebSockets = this.deviceClientConfig.isUseWebsocket();
        this.hostName = this.useWebSockets != false ? String.format("%s:%d", this.chooseHostname(), 443) : String.format("%s:%d", this.chooseHostname(), 5671);
        this.logger = new CustomLogger(this.getClass());
        this.add((Handler)new Handshaker());
        this.add((Handler)new FlowController());
        this.state = IotHubConnectionStatus.DISCONNECTED;
        this.logger.LogInfo("AmqpsIotHubConnection object is created successfully using port %s in %s method ", this.useWebSockets != false ? 443 : 5671, this.logger.getMethodName());
    }

    public void addDeviceOperationSession(DeviceClientConfig deviceClientConfig) throws TransportException {
        if (deviceClientConfig != null) {
            this.amqpsSessionManager.addDeviceOperationSession(deviceClientConfig);
        }
    }

    @Override
    public void open(Queue<DeviceClientConfig> deviceClientConfigs, ScheduledExecutorService scheduledExecutorService) throws TransportException {
        this.reconnectionScheduled = false;
        this.connectionId = UUID.randomUUID().toString();
        this.scheduledExecutorService = scheduledExecutorService;
        this.closeLatch = new CountDownLatch(1);
        this.openLatch = new CountDownLatch(1);
        this.savedException = null;
        this.amqpsSessionManager = new AmqpsSessionManager(this.deviceClientConfig);
        this.sasTokenRenewalHandler = new AmqpSasTokenRenewalHandler(this.amqpsSessionManager, this.deviceClientConfig);
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        if (this.state == IotHubConnectionStatus.DISCONNECTED) {
            if (deviceClientConfigs.size() > 1) {
                deviceClientConfigs.remove();
                while (!deviceClientConfigs.isEmpty()) {
                    this.addDeviceOperationSession(deviceClientConfigs.remove());
                }
            }
            try {
                this.openAsync();
                this.openLatch.await(90000L, TimeUnit.MILLISECONDS);
                this.authenticate();
                this.openLinks();
                if (this.savedException != null) {
                    throw this.savedException;
                }
                if (!this.amqpsSessionManager.isAuthenticationOpened().booleanValue()) {
                    this.close();
                    TransportException transportException = new TransportException("Timed out waiting for authentication links to open from service");
                    transportException.setRetryable(true);
                    throw transportException;
                }
                if (!this.amqpsSessionManager.areAllLinksOpen()) {
                    this.close();
                    TransportException transportException = new TransportException("Timed out waiting for worker links to open from service");
                    transportException.setRetryable(true);
                    throw transportException;
                }
            }
            catch (InterruptedException e) {
                this.executorServicesCleanup();
                this.logger.LogError(e);
                throw new TransportException("Waited too long for the connection to open.");
            }
        }
        this.listener.onConnectionEstablished(this.connectionId);
        this.state = IotHubConnectionStatus.CONNECTED;
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    private void openAsync() throws TransportException {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        if (this.reactor == null) {
            this.reactor = this.createReactor();
        }
        if (this.executorService == null) {
            this.executorService = Executors.newFixedThreadPool(1);
        }
        IotHubReactor iotHubReactor = new IotHubReactor(this.reactor);
        ReactorRunner reactorRunner = new ReactorRunner(iotHubReactor, this.listener, this.connectionId);
        this.executorService.submit(reactorRunner);
        this.logger.LogInfo("Reactor is assigned to executor service, method name is %s ", this.logger.getMethodName());
    }

    public void authenticate() throws TransportException {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        if (this.amqpsSessionManager.isAuthenticationOpened().booleanValue()) {
            this.amqpsSessionManager.authenticate();
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void openLinks() throws TransportException {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        if (this.amqpsSessionManager.isAuthenticationOpened().booleanValue()) {
            this.amqpsSessionManager.openDeviceOperationLinks(MessageType.DEVICE_TELEMETRY);
            if (this.methodSubscribed) {
                this.amqpsSessionManager.openDeviceOperationLinks(MessageType.DEVICE_METHODS);
            }
            if (this.twinSubscribed) {
                this.amqpsSessionManager.openDeviceOperationLinks(MessageType.DEVICE_TWIN);
            }
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    @Override
    public void close() throws TransportException {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        this.closeAsync();
        try {
            this.closeLatch.await(90000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            this.logger.LogError(e);
            throw new TransportException("Waited too long for the connection to close.", e);
        }
        this.executorServicesCleanup();
        this.state = IotHubConnectionStatus.DISCONNECTED;
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    private void executorServicesCleanup() throws TransportException {
        if (this.executorService != null) {
            this.logger.LogInfo("Shutdown of executor service has started, method name is %s ", this.logger.getMethodName());
            this.executorService.shutdown();
            try {
                if (!this.executorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                    this.executorService.shutdownNow();
                    if (!this.executorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                        this.logger.LogInfo("Pool did not terminate", new Object[0]);
                    }
                }
                this.executorService = null;
            }
            catch (InterruptedException e) {
                this.logger.LogError(e);
                this.executorService.shutdownNow();
                this.executorService = null;
                throw new TransportException("Waited too long for the connection to close.", e);
            }
            this.logger.LogInfo("Shutdown of executor service completed, method name is %s ", this.logger.getMethodName());
        }
    }

    private void closeAsync() {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        if (this.amqpsSessionManager != null) {
            this.amqpsSessionManager.closeNow();
        }
        if (this.connection != null) {
            this.connection.close();
        }
        if (this.reactor != null) {
            this.reactor.stop();
        }
        this.logger.LogInfo("Proton reactor has been stopped, method name is %s ", this.logger.getMethodName());
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    private Integer sendMessage(AmqpsConvertToProtonReturnValue protonReturnValue, String deviceId) throws TransportException {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        Integer deliveryTag = -1;
        if (this.state == IotHubConnectionStatus.DISCONNECTED || protonReturnValue == null) {
            deliveryTag = -1;
        } else {
            MessageImpl protonMessage = protonReturnValue.getMessageImpl();
            MessageType messageType = protonReturnValue.getMessageType();
            deliveryTag = this.amqpsSessionManager.sendMessage((org.apache.qpid.proton.message.Message)protonMessage, messageType, deviceId);
        }
        return deliveryTag;
    }

    public void onReactorInit(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        Reactor reactor = event.getReactor();
        if (this.useWebSockets.booleanValue()) {
            reactor.connectionToHost(this.chooseHostname(), 443, (Handler)this);
        } else {
            reactor.connectionToHost(this.chooseHostname(), 5671, (Handler)this);
        }
        reactor.schedule(50, (Handler)this);
        if (this.deviceClientConfig.getAuthenticationProvider() instanceof IotHubSasTokenAuthenticationProvider) {
            int sasTokenRenewalPeriod = this.deviceClientConfig.getSasTokenAuthentication().getMillisecondsBeforeProactiveRenewal();
            reactor.schedule(sasTokenRenewalPeriod, (Handler)this.sasTokenRenewalHandler);
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onReactorFinal(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        this.closeLatch.countDown();
        this.openLatch.countDown();
        this.reactor = null;
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onConnectionInit(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        this.connection = event.getConnection();
        this.connection.setHostname(this.hostName);
        this.connection.open();
        try {
            if (this.amqpsSessionManager.onConnectionInit(this.connection).booleanValue()) {
                this.openLatch.countDown();
            }
        }
        catch (TransportException e) {
            this.savedException = e;
            this.logger.LogError(e);
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onConnectionBound(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        Transport transport = event.getConnection().getTransport();
        if (transport != null) {
            if (this.useWebSockets.booleanValue()) {
                WebSocketImpl webSocket = new WebSocketImpl();
                webSocket.configure(this.hostName, WEB_SOCKET_PATH, 0, WEB_SOCKET_SUB_PROTOCOL, null, null);
                ((TransportInternal)transport).addTransportLayer((TransportLayer)webSocket);
            }
            try {
                this.amqpsSessionManager.onConnectionBound(transport);
            }
            catch (TransportException e) {
                this.savedException = e;
                this.logger.LogError(this.savedException);
            }
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onConnectionUnbound(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        this.state = IotHubConnectionStatus.DISCONNECTED;
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onDelivery(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        Link link = event.getLink();
        if (link instanceof Sender) {
            this.logger.LogInfo("Reading the delivery event in Sender link, method name is %s ", this.logger.getMethodName());
            Delivery delivery = event.getDelivery();
            while (delivery != null && !delivery.isSettled() && delivery.getRemoteState() != null) {
                DeliveryState remoteState = delivery.getRemoteState();
                int deliveryTag = Integer.valueOf(new String(delivery.getTag()));
                this.logger.LogInfo("Is state of remote Delivery COMPLETE ? %s, method name is %s ", new Object[]{this.state, this.logger.getMethodName()});
                this.logger.LogInfo("Inform listener that a message has been sent to IoT Hub along with remote state, method name is %s ", this.logger.getMethodName());
                if (!link.getSource().getAddress().equalsIgnoreCase("$cbs")) {
                    if (this.inProgressMessages.containsKey(deliveryTag)) {
                        TransportException transportException;
                        if (remoteState instanceof Accepted) {
                            this.listener.onMessageSent(this.inProgressMessages.remove(deliveryTag), null);
                        } else if (remoteState instanceof Rejected) {
                            ErrorCondition errorCondition = ((Rejected)remoteState).getError();
                            if (errorCondition != null && errorCondition.getCondition() != null) {
                                String errorCode = errorCondition.getCondition().toString();
                                String errorDescription = "";
                                if (errorCondition.getDescription() != null) {
                                    errorDescription = errorCondition.getDescription();
                                }
                                transportException = AmqpsExceptionTranslator.convertToAmqpException(errorCode, errorDescription);
                            } else {
                                transportException = new TransportException("IotHub rejected the message");
                            }
                            this.listener.onMessageSent(this.inProgressMessages.remove(deliveryTag), transportException);
                        } else if (remoteState instanceof Modified || remoteState instanceof Released || remoteState instanceof Received) {
                            transportException = new TransportException("IotHub responded to message with Modified, Received or Released; message needs to be re-delivered");
                            transportException.setRetryable(true);
                            this.listener.onMessageSent(this.inProgressMessages.remove(deliveryTag), transportException);
                        }
                    } else {
                        this.listener.onMessageReceived(null, new TransportException("Received response from service about a message that this client did not send"));
                    }
                }
                delivery.free();
                delivery = link.head();
            }
        } else if (link instanceof Receiver) {
            AmqpsMessage amqpsMessage = null;
            try {
                amqpsMessage = this.amqpsSessionManager.getMessageFromReceiverLink(link.getName());
            }
            catch (TransportException e) {
                this.listener.onMessageReceived(null, e);
            }
            if (amqpsMessage != null) {
                try {
                    this.messageReceivedFromServer(amqpsMessage);
                }
                catch (TransportException e) {
                    this.listener.onMessageReceived(null, e);
                }
            }
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onLinkInit(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        Link link = event.getLink();
        try {
            this.amqpsSessionManager.onLinkInit(link);
        }
        catch (TransportException e) {
            this.savedException = e;
            this.logger.LogError(this.savedException);
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onLinkFlow(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        this.amqpsSessionManager.onLinkFlow(event);
        this.processMessagesToSend();
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    private void processMessagesToSend() {
        int messagesAttemptedToBeProcessed = 0;
        int lastDeliveryTag = 0;
        Message message = this.messagesToSend.poll();
        while (message != null && messagesAttemptedToBeProcessed < 1000 && lastDeliveryTag >= 0) {
            ++messagesAttemptedToBeProcessed;
            AmqpsConvertToProtonReturnValue amqpsConvertToProtonReturnValue = null;
            try {
                amqpsConvertToProtonReturnValue = this.convertToProton(message);
            }
            catch (TransportException e) {
                if (e.isRetryable()) {
                    this.logger.LogError("Encountered exception while converting message to proton message, retrying", e);
                    this.messagesToSend.add(message);
                } else {
                    this.logger.LogError("Encountered non-retryable exception while converting message to proton message, not retryable so discarding message", e);
                }
                return;
            }
            if (amqpsConvertToProtonReturnValue == null) {
                this.logger.LogError("No handler found for message conversion! Abandoning message", new Object[0]);
                return;
            }
            try {
                lastDeliveryTag = this.sendMessage(amqpsConvertToProtonReturnValue, message.getConnectionDeviceId());
            }
            catch (TransportException e) {
                if (e.isRetryable()) {
                    this.logger.LogError("Encountered exception while sending amqp message, retrying", e);
                    this.messagesToSend.add(message);
                } else {
                    this.logger.LogError("Encountered non-retryable exception while sending amqp message, abandoning message", e);
                }
                return;
            }
            if (lastDeliveryTag != -1) {
                this.inProgressMessages.put(lastDeliveryTag, message);
            } else {
                this.messagesToSend.add(message);
            }
            message = this.messagesToSend.poll();
        }
        if (message != null) {
            this.messagesToSend.add(message);
        }
    }

    public void onLinkRemoteOpen(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        if (this.amqpsSessionManager.onLinkRemoteOpen(event)) {
            this.openLatch.countDown();
        }
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onLinkRemoteClose(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        this.state = IotHubConnectionStatus.DISCONNECTED;
        this.savedException = this.getTransportExceptionFromEvent(event);
        this.scheduleReconnection(this.savedException);
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    public void onTransportError(Event event) {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        this.state = IotHubConnectionStatus.DISCONNECTED;
        this.savedException = this.getTransportExceptionFromEvent(event);
        this.scheduleReconnection(this.savedException);
        this.logger.LogDebug("Exited from method %s", this.logger.getMethodName());
    }

    protected AmqpsConvertToProtonReturnValue convertToProton(Message message) throws TransportException {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        return this.amqpsSessionManager.convertToProton(message);
    }

    protected AmqpsConvertFromProtonReturnValue convertFromProton(AmqpsMessage amqpsMessage, DeviceClientConfig deviceClientConfig) throws TransportException {
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        return this.amqpsSessionManager.convertFromProton(amqpsMessage, deviceClientConfig);
    }

    private Reactor createReactor() throws TransportException {
        try {
            if (this.deviceClientConfig.getAuthenticationType() == DeviceClientConfig.AuthType.X509_CERTIFICATE) {
                ReactorOptions options = new ReactorOptions();
                options.setEnableSaslByDefault(false);
                return Proton.reactor((ReactorOptions)options, (Handler[])new Handler[]{this});
            }
            return Proton.reactor((Handler[])new Handler[]{this});
        }
        catch (IOException e) {
            throw new TransportException("Could not create Proton reactor", e);
        }
    }

    public void onTimerTask(Event event) {
        this.processMessagesToSend();
        event.getReactor().schedule(50, (Handler)this);
    }

    private void messageReceivedFromServer(AmqpsMessage amqpsMessage) throws TransportException {
        IotHubTransportMessage transportMessage;
        this.logger.LogDebug("Entered in method %s", this.logger.getMethodName());
        this.logger.LogInfo("All the listeners are informed that a message has been received, method name is %s ", this.logger.getMethodName());
        AmqpsConvertFromProtonReturnValue amqpsHandleMessageReturnValue = this.convertFromProton(amqpsMessage, amqpsMessage.getDeviceClientConfig());
        if (amqpsHandleMessageReturnValue == null) {
            if (amqpsMessage.getAmqpsMessageType() == MessageType.CBS_AUTHENTICATION) {
                Map properties;
                if (amqpsMessage.getApplicationProperties() != null && amqpsMessage.getApplicationProperties().getValue() != null && (properties = amqpsMessage.getApplicationProperties().getValue()).containsKey(APPLICATION_PROPERTY_STATUS_CODE)) {
                    try {
                        int statusCode = Integer.valueOf(properties.get(APPLICATION_PROPERTY_STATUS_CODE).toString());
                        IotHubStatusCode iotHubStatusCode = IotHubStatusCode.getIotHubStatusCode(statusCode);
                        if (iotHubStatusCode != IotHubStatusCode.OK && iotHubStatusCode != IotHubStatusCode.OK_EMPTY) {
                            String statusDescription = "";
                            if (properties.containsKey(APPLICATION_PROPERTY_STATUS_DESCRIPTION)) {
                                statusDescription = (String)properties.get(APPLICATION_PROPERTY_STATUS_DESCRIPTION);
                            }
                            this.savedException = IotHubStatusCode.getConnectionStatusException(iotHubStatusCode, statusDescription);
                            this.logger.LogError(this.savedException);
                        }
                    }
                    catch (NumberFormatException nfe) {
                        this.savedException = new TransportException("Encountered message from service with invalid status code value");
                        this.logger.LogInfo("status code received from service could not be parsed to integer, method name is %s ", this.logger.getMethodName());
                    }
                }
                return;
            }
            this.logger.LogError("No handler found for received message, method name is %s ", this.logger.getMethodName());
            return;
        }
        if (amqpsHandleMessageReturnValue.getMessageCallback() == null) {
            this.logger.LogError("Callback is not defined therefore response to IoT Hub cannot be generated. All received messages will be removed from receive message queue, method name is %s ", this.logger.getMethodName());
            throw new TransportException("callback is not defined");
        }
        Message message = amqpsHandleMessageReturnValue.getMessage();
        if (message instanceof IotHubTransportMessage) {
            transportMessage = (IotHubTransportMessage)message;
        } else {
            transportMessage = new IotHubTransportMessage(message.getBytes(), message.getMessageType(), message.getMessageId(), message.getCorrelationId(), message.getProperties());
            transportMessage.setIotHubConnectionString(message.getIotHubConnectionString());
        }
        transportMessage.setMessageCallback(amqpsHandleMessageReturnValue.getMessageCallback());
        transportMessage.setMessageCallbackContext(amqpsHandleMessageReturnValue.getMessageContext());
        this.sendAckMessages.put(transportMessage, amqpsMessage);
        this.listener.onMessageReceived(transportMessage, null);
    }

    @Override
    public void setListener(IotHubListener listener) throws IllegalArgumentException {
        if (listener == null) {
            throw new IllegalArgumentException("listener cannot be null");
        }
        this.listener = listener;
    }

    @Override
    public IotHubStatusCode sendMessage(Message message) throws TransportException {
        if (!this.subscriptionChangeHandler(message)) {
            this.messagesToSend.add(message);
        }
        return IotHubStatusCode.OK;
    }

    @Override
    public boolean sendMessageResult(Message message, IotHubMessageResult result) {
        if (this.state != IotHubConnectionStatus.CONNECTED) {
            return false;
        }
        if (this.sendAckMessages.containsKey(message)) {
            AmqpsMessage amqpsMessage = this.sendAckMessages.get(message);
            switch (result) {
                case ABANDON: {
                    amqpsMessage.acknowledge(AmqpsMessage.ACK_TYPE.ABANDON);
                    break;
                }
                case REJECT: {
                    amqpsMessage.acknowledge(AmqpsMessage.ACK_TYPE.REJECT);
                    break;
                }
                case COMPLETE: {
                    amqpsMessage.acknowledge(AmqpsMessage.ACK_TYPE.COMPLETE);
                    break;
                }
                default: {
                    this.logger.LogError("Invalid IoT Hub message result (%s), method name is %s ", result.name(), this.logger.getMethodName());
                    return false;
                }
            }
            this.sendAckMessages.remove(message);
            return true;
        }
        return false;
    }

    @Override
    public String getConnectionId() {
        return this.connectionId;
    }

    private void scheduleReconnection(Throwable throwable) {
        if (!this.reconnectionScheduled) {
            this.reconnectionScheduled = true;
            ReconnectionNotifier.notifyDisconnectAsync(throwable, this.listener, this.connectionId);
        }
    }

    private String chooseHostname() {
        String gatewayHostname = this.deviceClientConfig.getGatewayHostname();
        if (gatewayHostname != null && !gatewayHostname.isEmpty()) {
            return gatewayHostname;
        }
        return this.deviceClientConfig.getIotHubHostname();
    }

    private String getErrorCondition(ErrorCondition condition) {
        if (condition != null && condition.getCondition() != null) {
            return condition.getCondition().toString();
        }
        return null;
    }

    private String getErrorDescription(ErrorCondition condition) {
        if (condition != null) {
            return condition.getDescription();
        }
        return null;
    }

    private ErrorCondition getErrorConditionFromEndpoint(Endpoint endpoint) {
        return endpoint.getCondition() != null && endpoint.getCondition().getCondition() != null ? endpoint.getCondition() : endpoint.getRemoteCondition();
    }

    private TransportException getTransportExceptionFromProtonEndpoints(Endpoint ... endpoints) {
        for (Endpoint endpoint : endpoints) {
            ErrorCondition errorCondition;
            if (endpoint == null || (errorCondition = this.getErrorConditionFromEndpoint(endpoint)) == null || errorCondition.getCondition() == null) continue;
            String error = errorCondition.getCondition().toString();
            String errorDescription = errorCondition.getDescription();
            return AmqpsExceptionTranslator.convertToAmqpException(error, errorDescription);
        }
        return null;
    }

    private TransportException getTransportExceptionFromEvent(Event event) {
        TransportException transportException = this.getTransportExceptionFromProtonEndpoints(new Endpoint[]{event.getSender(), event.getReceiver(), event.getConnection(), event.getTransport(), event.getSession(), event.getLink()});
        if (transportException == null) {
            transportException = new TransportException("Unknown transport exception occurred");
            transportException.setRetryable(true);
        }
        return transportException;
    }

    private boolean subscriptionChangeHandler(Message message) throws TransportException {
        boolean handled = false;
        if (message.getMessageType() != null) {
            switch (message.getMessageType()) {
                case DEVICE_METHODS: {
                    if (((IotHubTransportMessage)message).getDeviceOperationType() != DeviceOperations.DEVICE_OPERATION_METHOD_SUBSCRIBE_REQUEST) break;
                    this.amqpsSessionManager.openDeviceOperationLinks(MessageType.DEVICE_METHODS);
                    this.methodSubscribed = true;
                    handled = true;
                    break;
                }
                case DEVICE_TWIN: {
                    if (((IotHubTransportMessage)message).getDeviceOperationType() == DeviceOperations.DEVICE_OPERATION_TWIN_UNSUBSCRIBE_DESIRED_PROPERTIES_REQUEST) break;
                    this.amqpsSessionManager.openDeviceOperationLinks(MessageType.DEVICE_TWIN);
                    this.twinSubscribed = true;
                    if (((IotHubTransportMessage)message).getDeviceOperationType() != DeviceOperations.DEVICE_OPERATION_TWIN_SUBSCRIBE_DESIRED_PROPERTIES_REQUEST) break;
                    handled = true;
                    break;
                }
            }
        }
        return handled;
    }

    private class ReactorRunner
    implements Callable {
        private static final String THREAD_NAME = "azure-iot-sdk-ReactorRunner";
        private final IotHubReactor iotHubReactor;
        private final IotHubListener listener;
        private String connectionId;

        ReactorRunner(IotHubReactor iotHubReactor, IotHubListener listener, String connectionId) {
            this.listener = listener;
            this.iotHubReactor = iotHubReactor;
            this.connectionId = connectionId;
        }

        public Object call() {
            try {
                Thread.currentThread().setName(THREAD_NAME);
                this.iotHubReactor.run();
            }
            catch (HandlerException e) {
                this.listener.onConnectionLost(new TransportException(e), this.connectionId);
            }
            return null;
        }
    }
}

