/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.awssdk.crt.mqtt;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import software.amazon.awssdk.crt.AsyncCallback;
import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.crt.CrtRuntimeException;
import software.amazon.awssdk.crt.io.SocketOptions;
import software.amazon.awssdk.crt.io.TlsContext;
import software.amazon.awssdk.crt.mqtt.MqttClient;
import software.amazon.awssdk.crt.mqtt.MqttClientConnectionEvents;
import software.amazon.awssdk.crt.mqtt.MqttException;
import software.amazon.awssdk.crt.mqtt.MqttMessage;
import software.amazon.awssdk.crt.mqtt.QualityOfService;

public class MqttClientConnection
extends CrtResource {
    private final MqttClient client;
    private MqttClientConnectionEvents userConnectionCallbacks;
    private AsyncCallback connectAck;

    public MqttClientConnection(MqttClient mqttClient) throws MqttException {
        this(mqttClient, null);
    }

    public MqttClientConnection(MqttClient mqttClient, MqttClientConnectionEvents callbacks) throws MqttException {
        if (mqttClient == null) {
            throw new MqttException("MqttClient must not be null");
        }
        try {
            this.acquireNativeHandle(MqttClientConnection.mqttClientConnectionNew(mqttClient.getNativeHandle(), this));
            this.addReferenceTo(mqttClient);
            this.client = mqttClient;
            this.userConnectionCallbacks = callbacks;
        }
        catch (CrtRuntimeException ex) {
            throw new MqttException("Exception during mqttClientConnectionNew: " + ex.getMessage());
        }
    }

    @Override
    protected void releaseNativeHandle() {
        MqttClientConnection.mqttClientConnectionDestroy(this.getNativeHandle());
    }

    @Override
    protected boolean canReleaseReferencesImmediately() {
        return false;
    }

    public void setLogin(String user, String pass) throws MqttException {
        try {
            MqttClientConnection.mqttClientConnectionSetLogin(this.getNativeHandle(), user, pass);
        }
        catch (CrtRuntimeException ex) {
            throw new MqttException("Failed to set login: " + ex.getMessage());
        }
    }

    private void onConnectionComplete(int errorCode, boolean sessionPresent) {
        if (this.connectAck != null) {
            if (errorCode == 0) {
                this.connectAck.onSuccess(sessionPresent);
            } else {
                this.connectAck.onFailure(new MqttException(errorCode));
            }
            this.connectAck = null;
        }
    }

    private void onConnectionInterrupted(int errorCode, AsyncCallback callback) {
        if (callback != null) {
            if (errorCode == 0) {
                callback.onSuccess();
            } else {
                callback.onFailure(new MqttException(errorCode));
            }
        }
        if (this.userConnectionCallbacks != null) {
            this.userConnectionCallbacks.onConnectionInterrupted(errorCode);
        }
    }

    private void onConnectionResumed(boolean sessionPresent) {
        if (this.userConnectionCallbacks != null) {
            this.userConnectionCallbacks.onConnectionResumed(sessionPresent);
        }
    }

    public CompletableFuture<Boolean> connect(String clientId, String endpoint, int port) {
        return this.connect(clientId, endpoint, port, null, true, 0, 0);
    }

    public CompletableFuture<Boolean> connect(String clientId, String endpoint, int port, SocketOptions socketOptions, boolean cleanSession, int keepAliveMs, int pingTimeoutMs) throws MqttException {
        TlsContext tls = this.client.getTlsContext();
        short pingTimeout = (short)Math.max(0, Math.min(pingTimeoutMs, Short.MAX_VALUE));
        if (port > Short.MAX_VALUE || port <= 0) {
            throw new MqttException("Port must be betweeen 0 and 32767");
        }
        CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        this.connectAck = AsyncCallback.wrapFuture(future, null);
        try {
            MqttClientConnection.mqttClientConnectionConnect(this.getNativeHandle(), endpoint, (short)port, socketOptions != null ? socketOptions.getNativeHandle() : 0L, tls != null ? tls.getNativeHandle() : 0L, clientId, cleanSession, keepAliveMs, pingTimeout);
        }
        catch (CrtRuntimeException ex) {
            future.completeExceptionally(ex);
        }
        return future;
    }

    public CompletableFuture<Void> disconnect() {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (this.isNull()) {
            future.complete(null);
            return future;
        }
        AsyncCallback disconnectAck = AsyncCallback.wrapFuture(future, null);
        MqttClientConnection.mqttClientConnectionDisconnect(this.getNativeHandle(), disconnectAck);
        return future;
    }

    public CompletableFuture<Integer> subscribe(String topic, QualityOfService qos, Consumer<MqttMessage> handler) {
        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
        if (this.isNull()) {
            future.completeExceptionally(new MqttException("Invalid connection during subscribe"));
            return future;
        }
        AsyncCallback subAck = AsyncCallback.wrapFuture(future, 0);
        try {
            short packetId = MqttClientConnection.mqttClientConnectionSubscribe(this.getNativeHandle(), topic, qos.getValue(), handler != null ? new MessageHandler(handler) : null, subAck);
            return future.thenApply(unused -> packetId);
        }
        catch (CrtRuntimeException ex) {
            future.completeExceptionally(ex);
            return future;
        }
    }

    public CompletableFuture<Integer> subscribe(String topic, QualityOfService qos) {
        return this.subscribe(topic, qos, null);
    }

    public void onMessage(Consumer<MqttMessage> handler) {
        MqttClientConnection.mqttClientConnectionOnMessage(this.getNativeHandle(), new MessageHandler(handler));
    }

    public CompletableFuture<Integer> unsubscribe(String topic) {
        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
        if (this.isNull()) {
            future.completeExceptionally(new MqttException("Invalid connection during unsubscribe"));
            return future;
        }
        AsyncCallback unsubAck = AsyncCallback.wrapFuture(future, 0);
        short packetId = MqttClientConnection.mqttClientConnectionUnsubscribe(this.getNativeHandle(), topic, unsubAck);
        return future.thenApply(unused -> packetId);
    }

    public CompletableFuture<Integer> publish(MqttMessage message, QualityOfService qos, boolean retain) {
        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
        if (this.isNull()) {
            future.completeExceptionally(new MqttException("Invalid connection during publish"));
        }
        AsyncCallback pubAck = AsyncCallback.wrapFuture(future, 0);
        try {
            short packetId = MqttClientConnection.mqttClientConnectionPublish(this.getNativeHandle(), message.getTopic(), qos.getValue(), retain, message.getPayload(), pubAck);
            return future.thenApply(unused -> packetId);
        }
        catch (CrtRuntimeException ex) {
            future.completeExceptionally(ex);
            return future;
        }
    }

    public void setWill(MqttMessage message, QualityOfService qos, boolean retain) throws MqttException {
        if (this.isNull()) {
            throw new MqttException("Invalid connection during setWill");
        }
        try {
            MqttClientConnection.mqttClientConnectionSetWill(this.getNativeHandle(), message.getTopic(), qos.getValue(), retain, message.getPayload());
        }
        catch (CrtRuntimeException ex) {
            throw new MqttException("AWS CRT exception: " + ex.toString());
        }
    }

    private static native long mqttClientConnectionNew(long var0, MqttClientConnection var2) throws CrtRuntimeException;

    private static native void mqttClientConnectionDestroy(long var0);

    private static native void mqttClientConnectionConnect(long var0, String var2, short var3, long var4, long var6, String var8, boolean var9, int var10, short var11) throws CrtRuntimeException;

    private static native void mqttClientConnectionDisconnect(long var0, AsyncCallback var2);

    private static native short mqttClientConnectionSubscribe(long var0, String var2, int var3, MessageHandler var4, AsyncCallback var5) throws CrtRuntimeException;

    private static native void mqttClientConnectionOnMessage(long var0, MessageHandler var2) throws CrtRuntimeException;

    private static native short mqttClientConnectionUnsubscribe(long var0, String var2, AsyncCallback var3);

    private static native short mqttClientConnectionPublish(long var0, String var2, int var3, boolean var4, byte[] var5, AsyncCallback var6) throws CrtRuntimeException;

    private static native boolean mqttClientConnectionSetWill(long var0, String var2, int var3, boolean var4, byte[] var5) throws CrtRuntimeException;

    private static native void mqttClientConnectionSetLogin(long var0, String var2, String var3) throws CrtRuntimeException;

    private class MessageHandler {
        Consumer<MqttMessage> callback;

        private MessageHandler(Consumer<MqttMessage> callback) {
            this.callback = callback;
        }

        void deliver(String topic, byte[] payload) {
            this.callback.accept(new MqttMessage(topic, payload));
        }
    }
}

