/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.dsl.internal;

import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.HasMetadata;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.KubernetesResourceList;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ListOptions;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Status;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.Watcher;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.dsl.internal.AbstractWatchManager;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.dsl.internal.BaseOperation;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.dsl.internal.OperationSupport;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.dsl.internal.WatcherWebSocketListener;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.http.HttpClient;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.http.WebSocket;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.http.WebSocketHandshakeException;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.http.WebSocketUpgradeResponse;

public class WatchConnectionManager<T extends HasMetadata, L extends KubernetesResourceList<T>>
extends AbstractWatchManager<T> {
    private final long connectTimeoutMillis;
    protected WatcherWebSocketListener<T> listener;
    private volatile CompletableFuture<WebSocket> websocketFuture;
    volatile boolean ready;

    public WatchConnectionManager(HttpClient client, BaseOperation<T, L, ?> baseOperation, ListOptions listOptions, Watcher<T> watcher, int reconnectInterval, int reconnectLimit, long websocketTimeout) throws MalformedURLException {
        super(watcher, baseOperation, listOptions, reconnectLimit, reconnectInterval, client);
        this.connectTimeoutMillis = websocketTimeout;
    }

    @Override
    protected void closeCurrentRequest() {
        Optional.ofNullable(this.websocketFuture).ifPresent(theFuture -> theFuture.whenComplete((w, t) -> Optional.ofNullable(w).ifPresent(ws -> ws.sendClose(1000, null))));
    }

    public CompletableFuture<WebSocket> getWebsocketFuture() {
        return this.websocketFuture;
    }

    @Override
    protected void start(URL url, Map<String, String> headers, AbstractWatchManager.WatchRequestState state) {
        this.listener = new WatcherWebSocketListener(this, state);
        WebSocket.Builder builder = this.client.newWebSocketBuilder();
        headers.forEach(builder::header);
        builder.uri(URI.create(url.toString())).connectTimeout(this.connectTimeoutMillis, TimeUnit.MILLISECONDS);
        this.websocketFuture = builder.buildAsync(this.listener).handle((w, t) -> {
            if (t != null) {
                try {
                    if (t instanceof WebSocketHandshakeException) {
                        WebSocketHandshakeException wshe = (WebSocketHandshakeException)t;
                        WebSocketUpgradeResponse response = wshe.getResponse();
                        int code = response.code();
                        Status status = OperationSupport.createStatus(response, this.baseOperation.getKubernetesSerialization());
                        t = OperationSupport.requestFailure(this.client.newHttpRequestBuilder().url(url).build(), status, "Received " + code + " on websocket");
                    }
                    throw KubernetesClientException.launderThrowable(t);
                }
                catch (Throwable throwable) {
                    if (this.ready) {
                        this.watchEnded((Throwable)t, state);
                    }
                    throw throwable;
                }
            }
            this.ready = true;
            return w;
        });
    }
}

