/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.dsl.internal;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.ProtocolException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.HasMetadata;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.KubernetesResource;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ListOptions;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Status;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.StatusDetails;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.WatchEvent;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.Watch;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.Watcher;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.WatcherException;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.dsl.internal.BaseOperation;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.http.HttpClient;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.utils.ExponentialBackoffIntervalCalculator;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.utils.KubernetesSerialization;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.utils.Utils;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.utils.internal.SerialExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractWatchManager<T extends HasMetadata>
implements Watch {
    private static final Logger logger = LoggerFactory.getLogger(AbstractWatchManager.class);
    private static final int INFO_LOG_CONNECTION_ERRORS = 10;
    final Watcher<T> watcher;
    final AtomicReference<String> resourceVersion;
    final AtomicBoolean forceClosed;
    private final int reconnectLimit;
    private final ExponentialBackoffIntervalCalculator retryIntervalCalculator;
    private Future<?> reconnectAttempt;
    protected final HttpClient client;
    protected BaseOperation<T, ?, ?> baseOperation;
    private final ListOptions listOptions;
    private final URL requestUrl;
    private final boolean receiveBookmarks;
    volatile WatchRequestState latestRequestState;
    private final Map<Class<?>, Integer> endErrors = new ConcurrentHashMap();
    private AtomicInteger retryAfterSeconds = new AtomicInteger();
    private int watchEndCheckMs = 120000;

    AbstractWatchManager(Watcher<T> watcher, BaseOperation<T, ?, ?> baseOperation, ListOptions listOptions, int reconnectLimit, int reconnectInterval, HttpClient client) throws MalformedURLException {
        this.watcher = new SerialWatcher(watcher, new SerialExecutor(baseOperation.getOperationContext().getExecutor()));
        this.reconnectLimit = reconnectLimit;
        this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(reconnectInterval, reconnectLimit);
        this.resourceVersion = new AtomicReference<String>(listOptions.getResourceVersion());
        this.forceClosed = new AtomicBoolean();
        this.receiveBookmarks = Boolean.TRUE.equals(listOptions.getAllowWatchBookmarks());
        if (listOptions.getAllowWatchBookmarks() == null) {
            listOptions.setAllowWatchBookmarks(true);
        }
        this.baseOperation = baseOperation;
        this.requestUrl = baseOperation.getNamespacedUrl();
        this.listOptions = listOptions;
        this.client = client;
        this.startWatch();
    }

    protected abstract void start(URL var1, Map<String, String> var2, WatchRequestState var3);

    public synchronized void closeRequest() {
        WatchRequestState state = this.latestRequestState;
        if (state != null && state.closed.compareAndSet(false, true)) {
            logger.debug("Closing the current watch");
            this.closeCurrentRequest();
            CompletableFuture<Void> future = Utils.schedule(this.baseOperation.getOperationContext().getExecutor(), () -> this.failSafeReconnect(state), this.watchEndCheckMs, TimeUnit.MILLISECONDS);
            state.ended.whenComplete((v, t) -> future.cancel(true));
        }
    }

    private synchronized void failSafeReconnect(WatchRequestState state) {
        if (state == this.latestRequestState && !this.forceClosed.get() && (this.reconnectAttempt == null || this.reconnectAttempt.isDone())) {
            logger.error("The last watch has yet to terminate as expected, will force start another watch. Please report this to the Fabric8 Kubernetes Client development team.");
            this.reconnect();
        }
    }

    public void setWatchEndCheckMs(int watchEndCheckMs) {
        this.watchEndCheckMs = watchEndCheckMs;
    }

    protected abstract void closeCurrentRequest();

    final void close(WatcherException cause) {
        if (!this.forceClosed.compareAndSet(false, true)) {
            logger.debug("Ignoring duplicate firing of onClose event");
        } else {
            this.closeRequest();
            try {
                this.watcher.onClose(cause);
            }
            finally {
                this.close();
            }
        }
    }

    final void closeEvent() {
        if (this.forceClosed.getAndSet(true)) {
            logger.debug("Ignoring duplicate firing of onClose event");
            return;
        }
        this.watcher.onClose();
    }

    final synchronized void cancelReconnect() {
        if (this.reconnectAttempt != null) {
            this.reconnectAttempt.cancel(true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void scheduleReconnect(WatchRequestState state) {
        if (!state.reconnected.compareAndSet(false, true)) {
            return;
        }
        if (this.isForceClosed()) {
            logger.debug("Ignoring already closed/closing connection");
            return;
        }
        if (this.cannotReconnect()) {
            this.close(new WatcherException("Exhausted reconnects"));
            return;
        }
        long delay = this.nextReconnectInterval();
        logger.debug("Scheduling reconnect task in {} ms", (Object)delay);
        AbstractWatchManager abstractWatchManager = this;
        synchronized (abstractWatchManager) {
            this.reconnectAttempt = Utils.schedule(this.baseOperation.getOperationContext().getExecutor(), this::reconnect, delay, TimeUnit.MILLISECONDS);
            if (this.isForceClosed()) {
                this.cancelReconnect();
            }
        }
    }

    synchronized void reconnect() {
        try {
            if (this.client.isClosed()) {
                logger.debug("The client has closed, closing the watch");
                this.close();
                return;
            }
            this.startWatch();
            if (this.isForceClosed()) {
                this.closeRequest();
            }
        }
        catch (Exception e) {
            logger.error("Exception in reconnect", (Throwable)e);
            this.close(new WatcherException("Unhandled exception in reconnect attempt", e));
        }
    }

    final boolean cannotReconnect() {
        return !this.watcher.reconnecting() && this.retryIntervalCalculator.getCurrentReconnectAttempt() >= this.reconnectLimit && this.reconnectLimit >= 0;
    }

    final long nextReconnectInterval() {
        return Math.max((long)this.retryAfterSeconds.getAndSet(0) * 1000L, this.retryIntervalCalculator.nextReconnectInterval());
    }

    void resetReconnectAttempts(WatchRequestState state) {
        if (state.closed.get()) {
            return;
        }
        this.retryIntervalCalculator.resetReconnectAttempts();
    }

    boolean isForceClosed() {
        return this.forceClosed.get();
    }

    void eventReceived(Watcher.Action action, HasMetadata resource) {
        if (!this.receiveBookmarks && action == Watcher.Action.BOOKMARK) {
            return;
        }
        if (resource != null && !this.baseOperation.getType().isAssignableFrom(resource.getClass())) {
            resource = (HasMetadata)this.baseOperation.getKubernetesSerialization().convertValue(resource, this.baseOperation.getType());
        }
        HasMetadata t = resource;
        try {
            this.watcher.eventReceived(action, t);
        }
        catch (Exception e) {
            logger.error("Unhandled exception encountered in watcher event handler", (Throwable)e);
        }
    }

    void updateResourceVersion(String newResourceVersion) {
        this.resourceVersion.set(newResourceVersion);
    }

    protected void startWatch() {
        this.listOptions.setResourceVersion(this.resourceVersion.get());
        URL url = this.baseOperation.appendListOptionParams(this.requestUrl, this.listOptions);
        String origin = this.requestUrl.getProtocol() + "://" + this.requestUrl.getHost();
        if (this.requestUrl.getPort() != -1) {
            origin = origin + ":" + this.requestUrl.getPort();
        }
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("Origin", origin);
        logger.debug("Watching {}...", (Object)url);
        this.closeRequest();
        this.latestRequestState = new WatchRequestState();
        this.start(url, headers, this.latestRequestState);
    }

    @Override
    public void close() {
        logger.debug("Force closing the watch");
        this.closeEvent();
        this.closeRequest();
        this.cancelReconnect();
    }

    private WatchEvent contextAwareWatchEventDeserializer(String messageSource) throws JsonProcessingException {
        KubernetesSerialization kubernetesSerialization = this.baseOperation.getKubernetesSerialization();
        try {
            return kubernetesSerialization.unmarshal(messageSource, WatchEvent.class);
        }
        catch (Exception ex1) {
            JsonNode json = kubernetesSerialization.unmarshal(messageSource, JsonNode.class);
            JsonNode objectJson = null;
            if (json instanceof ObjectNode && json.has("object")) {
                objectJson = ((ObjectNode)json).remove("object");
            }
            WatchEvent watchEvent = kubernetesSerialization.convertValue(json, WatchEvent.class);
            KubernetesResource object = (KubernetesResource)kubernetesSerialization.convertValue(objectJson, this.baseOperation.getType());
            watchEvent.setObject(object);
            return watchEvent;
        }
    }

    protected void onMessage(String message, WatchRequestState state) {
        this.endErrors.clear();
        if (state.closed.get() || this.forceClosed.get()) {
            return;
        }
        try {
            WatchEvent event = this.contextAwareWatchEventDeserializer(message);
            KubernetesResource object = event.getObject();
            Watcher.Action action = Watcher.Action.valueOf(event.getType());
            if (action == Watcher.Action.ERROR) {
                if (object instanceof Status) {
                    Status status = (Status)object;
                    this.onStatus(status, state);
                } else {
                    logger.error("Received an error which is not a status but {} - will retry", (Object)message);
                    this.closeRequest();
                }
            } else if (object instanceof HasMetadata) {
                HasMetadata hasMetadata = (HasMetadata)object;
                this.updateResourceVersion(hasMetadata.getMetadata().getResourceVersion());
                this.eventReceived(action, hasMetadata);
            } else {
                String msg = String.format("Invalid object received: %s", message);
                this.close(new WatcherException(msg, null, message));
            }
        }
        catch (ClassCastException e) {
            String msg = "Received wrong type of object for watch";
            this.close(new WatcherException("Received wrong type of object for watch", e, message));
        }
        catch (JsonProcessingException e) {
            String msg = "Couldn't deserialize watch event: " + message;
            this.close(new WatcherException(msg, e, message));
        }
        catch (Exception e) {
            String msg = "Unexpected exception processing watch event";
            this.close(new WatcherException("Unexpected exception processing watch event", e, message));
        }
    }

    protected boolean onStatus(Status status, WatchRequestState state) {
        this.endErrors.clear();
        if (state.closed.get()) {
            return true;
        }
        if (Integer.valueOf(410).equals(status.getCode())) {
            this.close(new WatcherException(status.getMessage(), new KubernetesClientException(status)));
            return true;
        }
        logger.error("Error received: {}, will retry", (Object)status);
        this.retryAfterSeconds.set(Optional.ofNullable(status.getDetails()).map(StatusDetails::getRetryAfterSeconds).orElse(0));
        this.closeRequest();
        return false;
    }

    void watchEnded(Throwable t, WatchRequestState state) {
        state.ended.complete(null);
        if (state != this.latestRequestState) {
            if (t != null) {
                logger.debug("Watch error received after the next watch started", t);
            }
            return;
        }
        if (t instanceof ProtocolException) {
            this.close(new WatcherException("Could not process Watch response", t));
            return;
        }
        try {
            if (t != null) {
                this.logEndError(t);
            }
        }
        finally {
            this.scheduleReconnect(state);
        }
    }

    private void logEndError(Throwable t) {
        int occurrences = this.endErrors.compute(t.getClass(), (k, v) -> v == null ? 1 : v + 1);
        if (t instanceof IOException || t instanceof KubernetesClientException) {
            if (occurrences > 10) {
                logger.info("Watch connection error recieved {} times without progress, will reconnect if possible", (Object)occurrences, (Object)t);
            } else {
                logger.debug("Watch connection error, will reconnect if possible", t);
            }
        } else if (occurrences > 1) {
            logger.info("Unknown Watch error received {} times without progress, will reconnect if possible", (Object)occurrences, (Object)t);
        } else {
            logger.debug("Unknown Watch error received, will reconnect if possible", t);
        }
    }

    public static class WatchRequestState {
        final AtomicBoolean reconnected = new AtomicBoolean();
        final AtomicBoolean closed = new AtomicBoolean();
        final CompletableFuture<Void> ended = new CompletableFuture();
    }

    private static final class SerialWatcher<T>
    implements Watcher<T> {
        private final Watcher<T> watcher;
        SerialExecutor serialExecutor;

        private SerialWatcher(Watcher<T> watcher, SerialExecutor serialExecutor) {
            this.watcher = watcher;
            this.serialExecutor = serialExecutor;
        }

        @Override
        public void eventReceived(Watcher.Action action, T resource) {
            this.serialExecutor.execute(() -> this.watcher.eventReceived(action, resource));
        }

        @Override
        public void onClose(WatcherException cause) {
            this.serialExecutor.execute(() -> {
                this.watcher.onClose(cause);
                this.serialExecutor.shutdownNow();
            });
        }

        @Override
        public void onClose() {
            this.serialExecutor.execute(() -> {
                this.watcher.onClose();
                this.serialExecutor.shutdownNow();
            });
        }

        @Override
        public boolean reconnecting() {
            return this.watcher.reconnecting();
        }
    }
}

