/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.awssdk.crt.mqtt;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import software.amazon.awssdk.crt.AsyncCallback;
import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.crt.CrtRuntimeException;
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.http.HttpProxyOptions;
import software.amazon.awssdk.crt.http.HttpRequest;
import software.amazon.awssdk.crt.io.SocketOptions;
import software.amazon.awssdk.crt.io.TlsContext;
import software.amazon.awssdk.crt.mqtt.MqttClientConnectionEvents;
import software.amazon.awssdk.crt.mqtt.MqttConnectionConfig;
import software.amazon.awssdk.crt.mqtt.MqttException;
import software.amazon.awssdk.crt.mqtt.MqttMessage;
import software.amazon.awssdk.crt.mqtt.QualityOfService;
import software.amazon.awssdk.crt.mqtt.WebsocketHandshakeTransformArgs;

public class MqttClientConnection
extends CrtResource {
    private MqttConnectionConfig config;
    private AsyncCallback connectAck;

    public MqttClientConnection(MqttConnectionConfig config) throws MqttException {
        if (config.getMqttClient() == null) {
            throw new MqttException("mqttClient must not be null");
        }
        if (config.getClientId() == null) {
            throw new MqttException("clientId must not be null");
        }
        if (config.getEndpoint() == null) {
            throw new MqttException("endpoint must not be null");
        }
        if (config.getPort() <= 0 || config.getPort() > 65535) {
            throw new MqttException("port must be a positive integer between 1 and 65535");
        }
        try {
            MqttMessage message;
            this.acquireNativeHandle(MqttClientConnection.mqttClientConnectionNew(config.getMqttClient().getNativeHandle(), this));
            if (config.getUsername() != null) {
                MqttClientConnection.mqttClientConnectionSetLogin(this.getNativeHandle(), config.getUsername(), config.getPassword());
            }
            if ((message = config.getWillMessage()) != null) {
                MqttClientConnection.mqttClientConnectionSetWill(this.getNativeHandle(), message.getTopic(), config.getWillQos().getValue(), config.getWillRetain(), message.getPayload());
            }
            if (config.getUseWebsockets()) {
                MqttClientConnection.mqttClientConnectionUseWebsockets(this.getNativeHandle());
                if (config.getWebsocketProxyOptions() != null) {
                    HttpProxyOptions options = config.getWebsocketProxyOptions();
                    TlsContext proxyTlsContext = options.getTlsContext();
                    MqttClientConnection.mqttClientConnectionSetWebsocketProxyOptions(this.getNativeHandle(), options.getHost(), options.getPort(), proxyTlsContext != null ? proxyTlsContext.getNativeHandle() : 0L, options.getAuthorizationType().getValue(), options.getAuthorizationUsername(), options.getAuthorizationPassword());
                }
            }
            this.addReferenceTo(config);
            this.config = config;
        }
        catch (CrtRuntimeException ex) {
            throw new MqttException("Exception during mqttClientConnectionNew: " + ex.getMessage());
        }
    }

    @Override
    protected void releaseNativeHandle() {
        MqttClientConnection.mqttClientConnectionDestroy(this.getNativeHandle());
    }

    @Override
    protected boolean canReleaseReferencesImmediately() {
        return false;
    }

    private void onConnectionComplete(int errorCode, boolean sessionPresent) {
        if (this.connectAck != null) {
            if (errorCode == 0) {
                this.connectAck.onSuccess(sessionPresent);
            } else {
                this.connectAck.onFailure(new MqttException(errorCode));
            }
            this.connectAck = null;
        }
    }

    private void onConnectionInterrupted(int errorCode, AsyncCallback callback) {
        MqttClientConnectionEvents callbacks;
        if (callback != null) {
            if (errorCode == 0) {
                callback.onSuccess();
            } else {
                callback.onFailure(new MqttException(errorCode));
            }
        }
        if ((callbacks = this.config.getConnectionCallbacks()) != null) {
            callbacks.onConnectionInterrupted(errorCode);
        }
    }

    private void onConnectionResumed(boolean sessionPresent) {
        MqttClientConnectionEvents callbacks = this.config.getConnectionCallbacks();
        if (callbacks != null) {
            callbacks.onConnectionResumed(sessionPresent);
        }
    }

    public CompletableFuture<Boolean> connect() throws MqttException {
        TlsContext tls = this.config.getMqttClient().getTlsContext();
        short pingTimeout = (short)Math.max(0, Math.min(this.config.getPingTimeoutMs(), Short.MAX_VALUE));
        short port = (short)this.config.getPort();
        if (port > Short.MAX_VALUE || port <= 0) {
            throw new MqttException("Port must be betweeen 0 and 32767");
        }
        CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        this.connectAck = AsyncCallback.wrapFuture(future, null);
        SocketOptions socketOptions = this.config.getSocketOptions();
        try {
            MqttClientConnection.mqttClientConnectionConnect(this.getNativeHandle(), this.config.getEndpoint(), port, socketOptions != null ? socketOptions.getNativeHandle() : 0L, tls != null ? tls.getNativeHandle() : 0L, this.config.getClientId(), this.config.getCleanSession(), this.config.getKeepAliveMs(), pingTimeout);
        }
        catch (CrtRuntimeException ex) {
            future.completeExceptionally(ex);
        }
        return future;
    }

    public CompletableFuture<Void> disconnect() {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (this.isNull()) {
            future.complete(null);
            return future;
        }
        AsyncCallback disconnectAck = AsyncCallback.wrapFuture(future, null);
        MqttClientConnection.mqttClientConnectionDisconnect(this.getNativeHandle(), disconnectAck);
        return future;
    }

    public CompletableFuture<Integer> subscribe(String topic, QualityOfService qos, Consumer<MqttMessage> handler) {
        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
        if (this.isNull()) {
            future.completeExceptionally(new MqttException("Invalid connection during subscribe"));
            return future;
        }
        AsyncCallback subAck = AsyncCallback.wrapFuture(future, 0);
        try {
            short packetId = MqttClientConnection.mqttClientConnectionSubscribe(this.getNativeHandle(), topic, qos.getValue(), handler != null ? new MessageHandler(handler) : null, subAck);
            return future.thenApply(unused -> packetId);
        }
        catch (CrtRuntimeException ex) {
            future.completeExceptionally(ex);
            return future;
        }
    }

    public CompletableFuture<Integer> subscribe(String topic, QualityOfService qos) {
        return this.subscribe(topic, qos, null);
    }

    public void onMessage(Consumer<MqttMessage> handler) {
        MqttClientConnection.mqttClientConnectionOnMessage(this.getNativeHandle(), new MessageHandler(handler));
    }

    public CompletableFuture<Integer> unsubscribe(String topic) {
        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
        if (this.isNull()) {
            future.completeExceptionally(new MqttException("Invalid connection during unsubscribe"));
            return future;
        }
        AsyncCallback unsubAck = AsyncCallback.wrapFuture(future, 0);
        short packetId = MqttClientConnection.mqttClientConnectionUnsubscribe(this.getNativeHandle(), topic, unsubAck);
        return future.thenApply(unused -> packetId);
    }

    public CompletableFuture<Integer> publish(MqttMessage message, QualityOfService qos, boolean retain) {
        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
        if (this.isNull()) {
            future.completeExceptionally(new MqttException("Invalid connection during publish"));
        }
        AsyncCallback pubAck = AsyncCallback.wrapFuture(future, 0);
        try {
            short packetId = MqttClientConnection.mqttClientConnectionPublish(this.getNativeHandle(), message.getTopic(), qos.getValue(), retain, message.getPayload(), pubAck);
            return future.thenApply(unused -> packetId);
        }
        catch (CrtRuntimeException ex) {
            future.completeExceptionally(ex);
            return future;
        }
    }

    private void onWebsocketHandshake(HttpRequest handshakeRequest, long nativeUserData) {
        CompletableFuture<HttpRequest> future = new CompletableFuture<HttpRequest>();
        future.whenComplete((x, throwable) -> MqttClientConnection.mqttClientConnectionWebsocketHandshakeComplete(this.getNativeHandle(), x.getEncodedPath(), x.getHeadersAsArray(), throwable, nativeUserData));
        WebsocketHandshakeTransformArgs args = new WebsocketHandshakeTransformArgs(this, handshakeRequest, future);
        Consumer<WebsocketHandshakeTransformArgs> transform = this.config.getWebsocketHandshakeTransform();
        if (transform != null) {
            transform.accept(args);
        } else {
            args.complete(handshakeRequest);
        }
    }

    private static native long mqttClientConnectionNew(long var0, MqttClientConnection var2) throws CrtRuntimeException;

    private static native void mqttClientConnectionDestroy(long var0);

    private static native void mqttClientConnectionConnect(long var0, String var2, short var3, long var4, long var6, String var8, boolean var9, int var10, short var11) throws CrtRuntimeException;

    private static native void mqttClientConnectionDisconnect(long var0, AsyncCallback var2);

    private static native short mqttClientConnectionSubscribe(long var0, String var2, int var3, MessageHandler var4, AsyncCallback var5) throws CrtRuntimeException;

    private static native void mqttClientConnectionOnMessage(long var0, MessageHandler var2) throws CrtRuntimeException;

    private static native short mqttClientConnectionUnsubscribe(long var0, String var2, AsyncCallback var3);

    private static native short mqttClientConnectionPublish(long var0, String var2, int var3, boolean var4, byte[] var5, AsyncCallback var6) throws CrtRuntimeException;

    private static native boolean mqttClientConnectionSetWill(long var0, String var2, int var3, boolean var4, byte[] var5) throws CrtRuntimeException;

    private static native void mqttClientConnectionSetLogin(long var0, String var2, String var3) throws CrtRuntimeException;

    private static native void mqttClientConnectionUseWebsockets(long var0) throws CrtRuntimeException;

    private static native void mqttClientConnectionWebsocketHandshakeComplete(long var0, String var2, HttpHeader[] var3, Throwable var4, long var5) throws CrtRuntimeException;

    private static native void mqttClientConnectionSetWebsocketProxyOptions(long var0, String var2, int var3, long var4, int var6, String var7, String var8) throws CrtRuntimeException;

    private class MessageHandler {
        Consumer<MqttMessage> callback;

        private MessageHandler(Consumer<MqttMessage> callback) {
            this.callback = callback;
        }

        void deliver(String topic, byte[] payload) {
            this.callback.accept(new MqttMessage(topic, payload));
        }
    }
}

