/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.awssdk.crt.mqtt;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import software.amazon.awssdk.crt.AsyncCallback;
import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.crt.CrtRuntimeException;
import software.amazon.awssdk.crt.io.SocketOptions;
import software.amazon.awssdk.crt.io.TlsContext;
import software.amazon.awssdk.crt.mqtt.MqttClient;
import software.amazon.awssdk.crt.mqtt.MqttConnectionEvents;
import software.amazon.awssdk.crt.mqtt.MqttException;
import software.amazon.awssdk.crt.mqtt.MqttMessage;
import software.amazon.awssdk.crt.mqtt.QualityOfService;

public class MqttConnection
extends CrtResource {
    private final MqttClient client;
    private volatile ConnectionState connectionState = ConnectionState.DISCONNECTED;
    private MqttConnectionEvents userConnectionCallbacks;
    private AsyncCallback connectAck;
    private AsyncCallback disconnectAck;

    public MqttConnection(MqttClient mqttClient) throws MqttException {
        this(mqttClient, null);
    }

    public MqttConnection(MqttClient mqttClient, MqttConnectionEvents callbacks) throws MqttException {
        if (mqttClient == null) {
            throw new MqttException("MqttClient must not be null");
        }
        this.client = mqttClient;
        this.userConnectionCallbacks = callbacks;
        try {
            this.acquire(MqttConnection.mqttConnectionNew(this.client.native_ptr(), this));
        }
        catch (CrtRuntimeException ex) {
            throw new MqttException("Exception during mqttConnectionNew: " + ex.getMessage());
        }
    }

    @Override
    public void close() {
        this.disconnect();
        MqttConnection.mqttConnectionDestroy(this.release());
        super.close();
    }

    public ConnectionState getState() {
        return this.connectionState;
    }

    public void setLogin(String user, String pass) throws MqttException {
        try {
            MqttConnection.mqttConnectionSetLogin(this.native_ptr(), user, pass);
        }
        catch (CrtRuntimeException ex) {
            throw new MqttException("Failed to set login: " + ex.getMessage());
        }
    }

    private void onConnectionComplete(int errorCode, boolean sessionPresent) {
        if (errorCode == 0) {
            this.connectionState = ConnectionState.CONNECTED;
            if (this.connectAck != null) {
                this.connectAck.onSuccess(sessionPresent);
                this.connectAck = null;
            }
        } else {
            this.connectionState = ConnectionState.DISCONNECTED;
            if (this.connectAck != null) {
                this.connectAck.onFailure(new MqttException(errorCode));
                this.connectAck = null;
            }
        }
    }

    private void onConnectionInterrupted(int errorCode) {
        this.connectionState = ConnectionState.DISCONNECTED;
        if (this.disconnectAck != null) {
            if (errorCode == 0) {
                this.disconnectAck.onSuccess();
            } else {
                this.disconnectAck.onFailure(new MqttException(errorCode));
            }
            this.disconnectAck = null;
        }
        if (this.userConnectionCallbacks != null) {
            this.userConnectionCallbacks.onConnectionInterrupted(errorCode);
        }
    }

    private void onConnectionResumed(boolean sessionPresent) {
        this.connectionState = ConnectionState.CONNECTED;
        if (this.userConnectionCallbacks != null) {
            this.userConnectionCallbacks.onConnectionResumed(sessionPresent);
        }
    }

    public CompletableFuture<Boolean> connect(String clientId, String endpoint, int port) {
        return this.connect(clientId, endpoint, port, null, null, true, 0, 0);
    }

    public CompletableFuture<Boolean> connect(String clientId, String endpoint, int port, SocketOptions socketOptions, TlsContext tls, boolean cleanSession, int keepAliveMs, int pingTimeoutMs) throws MqttException {
        short pingTimeout = (short)Math.max(0, Math.min(pingTimeoutMs, Short.MAX_VALUE));
        if (port > Short.MAX_VALUE || port <= 0) {
            throw new MqttException("Port must be betweeen 0 and 32767");
        }
        CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        this.connectAck = AsyncCallback.wrapFuture(future, null);
        try {
            this.connectionState = ConnectionState.CONNECTING;
            MqttConnection.mqttConnectionConnect(this.native_ptr(), endpoint, (short)port, socketOptions != null ? socketOptions.native_ptr() : 0L, tls != null ? tls.native_ptr() : 0L, clientId, cleanSession, keepAliveMs, pingTimeout);
        }
        catch (CrtRuntimeException ex) {
            future.completeExceptionally(ex);
        }
        return future;
    }

    public CompletableFuture<Void> disconnect() {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (this.isNull()) {
            future.complete(null);
            return future;
        }
        this.disconnectAck = AsyncCallback.wrapFuture(future, null);
        this.connectionState = ConnectionState.DISCONNECTING;
        MqttConnection.mqttConnectionDisconnect(this.native_ptr());
        return future;
    }

    public CompletableFuture<Integer> subscribe(String topic, QualityOfService qos, Consumer<MqttMessage> handler) {
        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
        if (this.isNull()) {
            future.completeExceptionally(new MqttException("Invalid connection during subscribe"));
            return future;
        }
        AsyncCallback subAck = AsyncCallback.wrapFuture(future, 0);
        try {
            short packetId = MqttConnection.mqttConnectionSubscribe(this.native_ptr(), topic, qos.getValue(), new MessageHandler(topic, handler), subAck);
            return future.thenApply(unused -> packetId);
        }
        catch (CrtRuntimeException ex) {
            future.completeExceptionally(ex);
            return future;
        }
    }

    public CompletableFuture<Integer> unsubscribe(String topic) {
        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
        if (this.isNull()) {
            future.completeExceptionally(new MqttException("Invalid connection during unsubscribe"));
            return future;
        }
        AsyncCallback unsubAck = AsyncCallback.wrapFuture(future, 0);
        short packetId = MqttConnection.mqttConnectionUnsubscribe(this.native_ptr(), topic, unsubAck);
        return future.thenApply(unused -> packetId);
    }

    public CompletableFuture<Integer> publish(MqttMessage message, QualityOfService qos, boolean retain) {
        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
        if (this.isNull()) {
            future.completeExceptionally(new MqttException("Invalid connection during publish"));
        }
        AsyncCallback pubAck = AsyncCallback.wrapFuture(future, 0);
        try {
            short packetId = MqttConnection.mqttConnectionPublish(this.native_ptr(), message.getTopic(), qos.getValue(), retain, message.getPayloadDirect(), pubAck);
            return future.thenApply(unused -> packetId);
        }
        catch (CrtRuntimeException ex) {
            future.completeExceptionally(ex);
            return future;
        }
    }

    public void setWill(MqttMessage message, QualityOfService qos, boolean retain) throws MqttException {
        if (this.isNull()) {
            throw new MqttException("Invalid connection during setWill");
        }
        try {
            MqttConnection.mqttConnectionSetWill(this.native_ptr(), message.getTopic(), qos.getValue(), retain, message.getPayloadDirect());
        }
        catch (CrtRuntimeException ex) {
            throw new MqttException("AWS CRT exception: " + ex.toString());
        }
    }

    private static native long mqttConnectionNew(long var0, MqttConnection var2) throws CrtRuntimeException;

    private static native void mqttConnectionDestroy(long var0);

    private static native void mqttConnectionConnect(long var0, String var2, short var3, long var4, long var6, String var8, boolean var9, int var10, short var11) throws CrtRuntimeException;

    private static native void mqttConnectionDisconnect(long var0);

    private static native short mqttConnectionSubscribe(long var0, String var2, int var3, MessageHandler var4, AsyncCallback var5) throws CrtRuntimeException;

    private static native short mqttConnectionUnsubscribe(long var0, String var2, AsyncCallback var3);

    private static native short mqttConnectionPublish(long var0, String var2, int var3, boolean var4, ByteBuffer var5, AsyncCallback var6) throws CrtRuntimeException;

    private static native boolean mqttConnectionSetWill(long var0, String var2, int var3, boolean var4, ByteBuffer var5) throws CrtRuntimeException;

    private static native void mqttConnectionSetLogin(long var0, String var2, String var3) throws CrtRuntimeException;

    private class MessageHandler {
        String topic;
        Consumer<MqttMessage> callback;

        private MessageHandler(String topic, Consumer<MqttMessage> callback) {
            this.callback = callback;
            this.topic = topic;
        }

        void deliver(byte[] payload) {
            this.callback.accept(new MqttMessage(this.topic, ByteBuffer.wrap(payload)));
        }
    }

    public static enum ConnectionState {
        DISCONNECTED,
        CONNECTING,
        CONNECTED,
        DISCONNECTING;

    }
}

