/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.mqtt.inbound;

import java.util.Arrays;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.jspecify.annotations.Nullable;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.integration.acks.SimpleAcknowledgment;
import org.springframework.integration.mqtt.core.ClientManager;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoComponent;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttUtils;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.Assert;

public class MqttPahoMessageDrivenChannelAdapter
extends AbstractMqttMessageDrivenChannelAdapter<IMqttAsyncClient, MqttConnectOptions>
implements MqttCallbackExtended,
MqttPahoComponent {
    private final Lock lock = new ReentrantLock();
    private final MqttPahoClientFactory clientFactory;
    private volatile IMqttAsyncClient client;
    private volatile boolean readyToSubscribeOnStart;

    public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, String ... topic) {
        this(url, clientId, new DefaultMqttPahoClientFactory(), topic);
    }

    public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, MqttPahoClientFactory clientFactory, String ... topic) {
        super(url, clientId, topic);
        this.clientFactory = clientFactory;
    }

    public MqttPahoMessageDrivenChannelAdapter(String clientId, MqttPahoClientFactory clientFactory, String ... topic) {
        super((String)null, clientId, topic);
        this.clientFactory = clientFactory;
    }

    public MqttPahoMessageDrivenChannelAdapter(ClientManager<IMqttAsyncClient, MqttConnectOptions> clientManager, String ... topic) {
        super(clientManager, topic);
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions((MqttConnectOptions)clientManager.getConnectionInfo());
        this.clientFactory = factory;
    }

    @Override
    public MqttConnectOptions getConnectionInfo() {
        String url;
        MqttConnectOptions options = this.clientFactory.getConnectionOptions();
        if (options.getServerURIs() == null && (url = this.getUrl()) != null) {
            options = MqttUtils.cloneConnectOptions(options);
            options.setServerURIs(new String[]{url});
        }
        return options;
    }

    @Override
    protected void onInit() {
        super.onInit();
        if (this.getConverter() == null) {
            DefaultPahoMessageConverter pahoMessageConverter = new DefaultPahoMessageConverter();
            pahoMessageConverter.setBeanFactory(this.getBeanFactory());
            this.setConverter(pahoMessageConverter);
        }
    }

    protected void doStart() {
        try {
            this.connect();
            if (this.readyToSubscribeOnStart) {
                this.subscribe();
            }
        }
        catch (Exception ex) {
            if (this.getConnectionInfo().isAutomaticReconnect()) {
                try {
                    this.client.reconnect();
                }
                catch (MqttException re) {
                    this.logger.error((Throwable)re, (CharSequence)"MQTT client failed to connect. Never happens.");
                }
            }
            this.logger.error((Throwable)ex, (CharSequence)"Exception while connecting");
            this.getApplicationEventPublisher().publishEvent((ApplicationEvent)new MqttConnectionFailedEvent(this, ex));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void connect() throws MqttException {
        this.lock.lock();
        try {
            MqttConnectOptions connectionOptions = this.clientFactory.getConnectionOptions();
            ClientManager clientManager = this.getClientManager();
            if (clientManager == null) {
                Assert.state((this.getUrl() != null || connectionOptions.getServerURIs() != null ? 1 : 0) != 0, (String)"If no 'url' provided, connectionOptions.getServerURIs() must not be null");
                this.client = this.clientFactory.getAsyncClientInstance(this.getUrl(), this.getClientId());
                this.client.setCallback((MqttCallback)this);
                this.client.connect(connectionOptions).waitForCompletion(this.getCompletionTimeout());
                this.client.setManualAcks(this.isManualAcks());
            } else {
                IMqttAsyncClient theClient = (IMqttAsyncClient)clientManager.getClient();
                Assert.state((theClient != null ? 1 : 0) != 0, (String)"The 'client' must not be null, consider to start the 'clientManager'.");
                this.client = theClient;
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    protected void doStop() {
        this.lock.lock();
        try {
            this.readyToSubscribeOnStart = false;
            try {
                if (this.clientFactory.getConnectionOptions().isCleanSession()) {
                    this.client.unsubscribe(this.getTopic());
                    this.readyToSubscribeOnStart = true;
                }
            }
            catch (MqttException ex1) {
                this.logger.error((Throwable)ex1, (CharSequence)"Exception while unsubscribing");
            }
            if (this.getClientManager() != null) {
                return;
            }
            try {
                this.client.disconnectForcibly(this.getQuiescentTimeout(), this.getDisconnectCompletionTimeout());
                if (this.getConnectionInfo().isAutomaticReconnect()) {
                    MqttUtils.stopClientReconnectCycle(this.client);
                }
            }
            catch (MqttException ex) {
                this.logger.error((Throwable)ex, (CharSequence)"Exception while disconnecting");
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void destroy() {
        super.destroy();
        if (this.getClientManager() == null) {
            try {
                this.client.close();
            }
            catch (MqttException e) {
                this.logger.error((Throwable)e, (CharSequence)"Could not close client");
            }
        }
    }

    @Override
    public void addTopic(String topic, int qos) {
        this.topicLock.lock();
        try {
            super.addTopic(topic, qos);
            if (this.client != null && this.client.isConnected()) {
                this.client.subscribe(topic, qos, this::messageArrived).waitForCompletion(this.getCompletionTimeout());
            }
        }
        catch (MqttException e) {
            super.removeTopic(topic);
            throw new MessagingException("Failed to subscribe to topic " + topic, (Throwable)e);
        }
        finally {
            this.topicLock.unlock();
        }
    }

    @Override
    public void removeTopic(String ... topic) {
        this.topicLock.lock();
        try {
            if (this.client != null && this.client.isConnected()) {
                this.client.unsubscribe(topic).waitForCompletion(this.getCompletionTimeout());
            }
            super.removeTopic(topic);
        }
        catch (MqttException e) {
            throw new MessagingException("Failed to unsubscribe from topic(s) " + Arrays.toString(topic), (Throwable)e);
        }
        finally {
            this.topicLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void subscribe() {
        this.topicLock.lock();
        Object[] topics = this.getTopic();
        ApplicationEventPublisher applicationEventPublisher = this.getApplicationEventPublisher();
        try {
            if (topics.length > 0) {
                int[] requestedQos = this.getQos();
                IMqttMessageListener listener = this::messageArrived;
                IMqttMessageListener[] listeners = (IMqttMessageListener[])Stream.of(topics).map(t -> listener).toArray(IMqttMessageListener[]::new);
                IMqttToken subscribeToken = this.client.subscribe((String[])topics, requestedQos, listeners);
                subscribeToken.waitForCompletion(this.getCompletionTimeout());
                int[] grantedQos = subscribeToken.getGrantedQos();
                if (grantedQos.length == 1 && grantedQos[0] == 128) {
                    throw new MqttException(128);
                }
                this.warnInvalidQosForSubscription((String[])topics, requestedQos, grantedQos);
            }
        }
        catch (MqttException ex) {
            applicationEventPublisher.publishEvent((ApplicationEvent)new MqttConnectionFailedEvent(this, ex));
            this.logger.error((Throwable)ex, () -> MqttPahoMessageDrivenChannelAdapter.lambda$subscribe$2((String[])topics));
        }
        finally {
            this.topicLock.unlock();
        }
        if (this.client.isConnected()) {
            String message = "Connected and subscribed to " + Arrays.toString(topics);
            this.logger.debug((CharSequence)message);
            applicationEventPublisher.publishEvent((ApplicationEvent)new MqttSubscribedEvent((Object)this, message));
        }
    }

    private void warnInvalidQosForSubscription(String[] topics, int[] requestedQos, int[] grantedQos) {
        for (int i = 0; i < requestedQos.length; ++i) {
            if (grantedQos[i] == requestedQos[i]) continue;
            this.logger.warn(() -> "Granted QOS different to Requested QOS; topics: " + Arrays.toString(topics) + " requested: " + Arrays.toString(requestedQos) + " granted: " + Arrays.toString(grantedQos));
            break;
        }
    }

    public void connectionLost(Throwable cause) {
        this.lock.lock();
        try {
            if (this.isRunning()) {
                this.logger.error(() -> "Lost connection: " + cause.getMessage());
                this.getApplicationEventPublisher().publishEvent((ApplicationEvent)new MqttConnectionFailedEvent(this, cause));
            } else {
                this.readyToSubscribeOnStart = false;
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    public void messageArrived(String topic, MqttMessage mqttMessage) {
        AbstractIntegrationMessageBuilder<?> builder = this.toMessageBuilder(topic, mqttMessage);
        if (builder != null) {
            if (this.isManualAcks()) {
                builder.setHeader("acknowledgmentCallback", (Object)new AcknowledgmentImpl(mqttMessage.getId(), mqttMessage.getQos(), this.client));
            }
            Message message = builder.build();
            try {
                this.sendMessage(message);
            }
            catch (RuntimeException ex) {
                this.logger.error((Throwable)ex, () -> "Unhandled exception for " + String.valueOf(message));
                throw ex;
            }
        }
    }

    private AbstractIntegrationMessageBuilder<?> toMessageBuilder(String topic, MqttMessage mqttMessage) {
        GenericMessage message;
        AbstractIntegrationMessageBuilder<?> builder = null;
        Exception conversionError = null;
        try {
            builder = this.getConverter().toMessageBuilder(topic, mqttMessage);
        }
        catch (Exception ex) {
            conversionError = ex;
        }
        if (builder == null && conversionError == null) {
            conversionError = new IllegalStateException("'MqttMessageConverter' returned 'null'");
        }
        if (conversionError != null && !this.sendErrorMessageIfNecessary((Message)(message = new GenericMessage((Object)mqttMessage)), conversionError)) {
            MessageConversionException conversionException = conversionError instanceof MessageConversionException ? (MessageConversionException)conversionError : new MessageConversionException((Message)message, "Failed to convert from MQTT Message", (Throwable)conversionError);
            throw conversionException;
        }
        return builder;
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
    }

    @Override
    public void connectComplete(boolean isReconnect) {
        this.connectComplete(isReconnect, this.getUrl());
    }

    public void connectComplete(boolean reconnect, @Nullable String serverURI) {
        if (this.isActive()) {
            this.subscribe();
        } else {
            this.readyToSubscribeOnStart = true;
        }
    }

    private static /* synthetic */ CharSequence lambda$subscribe$2(String[] topics) {
        return "Error subscribing to " + Arrays.toString(topics);
    }

    private static class AcknowledgmentImpl
    implements SimpleAcknowledgment {
        private final int id;
        private final int qos;
        private final IMqttAsyncClient ackClient;

        AcknowledgmentImpl(int id, int qos, IMqttAsyncClient client) {
            this.id = id;
            this.qos = qos;
            this.ackClient = client;
        }

        public void acknowledge() {
            if (this.ackClient != null) {
                try {
                    this.ackClient.messageArrivedComplete(this.id, this.qos);
                }
                catch (MqttException e) {
                    throw new IllegalStateException(e);
                }
            } else {
                throw new IllegalStateException("Client has changed");
            }
        }
    }
}

