/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.mqtt.inbound;

import java.util.Arrays;
import java.util.concurrent.ScheduledFuture;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;

public class MqttPahoMessageDrivenChannelAdapter
extends AbstractMqttMessageDrivenChannelAdapter
implements MqttCallback {
    private static final int DEFAULT_COMPLETION_TIMEOUT = 30000;
    private final MqttPahoClientFactory clientFactory;
    private volatile MqttAsyncClient client;
    private volatile ScheduledFuture<?> reconnectFuture;
    private volatile boolean connected;
    private volatile int completionTimeout = 30000;

    public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, MqttPahoClientFactory clientFactory, String ... topic) {
        super(url, clientId, topic);
        this.clientFactory = clientFactory;
    }

    public MqttPahoMessageDrivenChannelAdapter(String clientId, MqttPahoClientFactory clientFactory, String ... topic) {
        super(null, clientId, topic);
        this.clientFactory = clientFactory;
    }

    public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, String ... topic) {
        this(url, clientId, new DefaultMqttPahoClientFactory(), topic);
    }

    public void setCompletionTimeout(int completionTimeout) {
        this.completionTimeout = completionTimeout;
    }

    protected void doStart() {
        super.doStart();
        try {
            this.connectAndSubscribe();
        }
        catch (Exception e) {
            this.logger.error((Object)"Exception while connecting and subscribing, retrying", (Throwable)e);
            this.scheduleReconnect();
        }
    }

    protected void doStop() {
        this.cancelReconnect();
        super.doStop();
        if (this.client != null) {
            try {
                this.client.unsubscribe(this.getTopic()).waitForCompletion((long)this.completionTimeout);
            }
            catch (MqttException e) {
                this.logger.error((Object)"Exception while unsubscribing", (Throwable)e);
            }
            try {
                this.client.disconnect().waitForCompletion((long)this.completionTimeout);
            }
            catch (MqttException e) {
                this.logger.error((Object)"Exception while disconnecting", (Throwable)e);
            }
            try {
                this.client.close();
            }
            catch (MqttException e) {
                this.logger.error((Object)"Exception while closing", (Throwable)e);
            }
            this.connected = false;
            this.client = null;
        }
    }

    @Override
    public void addTopic(String topic, int qos) {
        this.topicLock.lock();
        try {
            super.addTopic(topic, qos);
            if (this.client != null && this.client.isConnected()) {
                this.client.subscribe(topic, qos).waitForCompletion((long)this.completionTimeout);
            }
        }
        catch (MqttException e) {
            super.removeTopic(topic);
            throw new MessagingException("Failed to subscribe to topic " + topic, (Throwable)e);
        }
        finally {
            this.topicLock.unlock();
        }
    }

    @Override
    public void removeTopic(String ... topic) {
        this.topicLock.lock();
        try {
            if (this.client != null && this.client.isConnected()) {
                this.client.unsubscribe(topic).waitForCompletion((long)this.completionTimeout);
            }
            super.removeTopic(topic);
        }
        catch (MqttException e) {
            throw new MessagingException("Failed to unsubscribe from topic " + Arrays.asList(topic), (Throwable)e);
        }
        finally {
            this.topicLock.unlock();
        }
    }

    private void connectAndSubscribe() throws MqttException {
        MqttConnectOptions connectionOptions = this.clientFactory.getConnectionOptions();
        Assert.state((this.getUrl() != null || connectionOptions.getServerURIs() != null ? 1 : 0) != 0, (String)"If no 'url' provided, connectionOptions.getServerURIs() must not be null");
        this.client = this.clientFactory.getAsyncClientInstance(this.getUrl(), this.getClientId());
        this.client.setCallback((MqttCallback)this);
        this.topicLock.lock();
        try {
            this.client.connect(connectionOptions).waitForCompletion((long)this.completionTimeout);
            this.client.subscribe(this.getTopic(), this.getQos()).waitForCompletion((long)this.completionTimeout);
        }
        catch (MqttException e) {
            this.logger.error((Object)("Error connecting or subscribing to " + Arrays.asList(this.getTopic())), (Throwable)e);
            this.client.disconnect().waitForCompletion((long)this.completionTimeout);
            throw e;
        }
        finally {
            this.topicLock.unlock();
        }
        if (this.client.isConnected()) {
            this.connected = true;
            if (this.reconnectFuture != null) {
                this.cancelReconnect();
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Connected and subscribed to " + Arrays.asList(this.getTopic())));
            }
        }
    }

    private synchronized void cancelReconnect() {
        if (this.reconnectFuture != null) {
            this.reconnectFuture.cancel(false);
            this.reconnectFuture = null;
        }
    }

    private void scheduleReconnect() {
        try {
            this.reconnectFuture = this.getTaskScheduler().scheduleWithFixedDelay(new Runnable(){

                @Override
                public void run() {
                    try {
                        if (MqttPahoMessageDrivenChannelAdapter.this.logger.isDebugEnabled()) {
                            MqttPahoMessageDrivenChannelAdapter.this.logger.debug((Object)"Attempting reconnect");
                        }
                        if (!MqttPahoMessageDrivenChannelAdapter.this.connected) {
                            MqttPahoMessageDrivenChannelAdapter.this.connectAndSubscribe();
                        }
                    }
                    catch (MqttException e) {
                        MqttPahoMessageDrivenChannelAdapter.this.logger.error((Object)"Exception while connecting and subscribing", (Throwable)e);
                    }
                }
            }, 10000L);
        }
        catch (Exception e) {
            this.logger.error((Object)"Failed to schedule reconnect", (Throwable)e);
        }
    }

    public void connectionLost(Throwable cause) {
        this.logger.error((Object)("Lost connection:" + cause.getMessage() + "; retrying..."));
        this.connected = false;
        this.scheduleReconnect();
    }

    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        Message<?> message = this.getConverter().toMessage(topic, mqttMessage);
        try {
            this.sendMessage(message);
        }
        catch (RuntimeException e) {
            this.logger.error((Object)("Unhandled exception for " + message.toString()), (Throwable)e);
            throw e;
        }
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
    }
}

