/*
 * Decompiled with CFR 0.152.
 */
package io.yupiik.bundlebee.operator.launcher;

import io.yupiik.bundlebee.core.kube.HttpKubeClient;
import io.yupiik.bundlebee.core.qualifier.BundleBee;
import io.yupiik.bundlebee.operator.BundlebeeOperator;
import io.yupiik.bundlebee.operator.handler.ActionHandler;
import io.yupiik.bundlebee.operator.model.Event;
import java.io.IOException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.json.bind.Jsonb;
import org.eclipse.microprofile.config.inject.ConfigProperty;

@ApplicationScoped
public class OperatorLoop {
    private final Logger logger = Logger.getLogger(this.getClass().getName());
    @Inject
    private HttpKubeClient client;
    @Inject
    private ActionHandler actionHandler;
    @Inject
    @BundleBee
    private Jsonb jsonb;
    @Inject
    @ConfigProperty(name="bundlebee.operator.storage", defaultValue="/opt/yupiik/state/bundlebee-operator")
    private String stateLocation;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(AtomicBoolean running) {
        this.logger.info("Started Bundlebee Operator");
        Thread hook = new Thread(() -> running.set(false), BundlebeeOperator.class.getName() + "-shutdown");
        Runtime.getRuntime().addShutdownHook(hook);
        final ExecutorService pool = this.createThreadPool();
        final Path lastResourceVersionLocation = Path.of(this.stateLocation, new String[0]).resolve("lastResourceVersion");
        try {
            HttpClient httpClient = this.client.getClient();
            this.checkPerms();
            while (running.get()) {
                try {
                    String lastResourceVersion = this.readLastResourceVersion(lastResourceVersionLocation);
                    this.logger.info("Starting to watch alveoli (resource version=" + lastResourceVersion + ")");
                    HttpRequest request = this.client.prepareRequest(HttpRequest.newBuilder().header("Accept", "application/json"), "/apis/bundlebee.yupiik.io/v1/namespaces/" + this.client.getNamespace() + "/alveoli?watch=true&includeUninitialized=false&allowWatchBookmarks=true&" + (String)(lastResourceVersion.isBlank() ? "" : "&resourceVersion=" + lastResourceVersion));
                    httpClient.send(request, info -> {
                        if (info.statusCode() != 200) {
                            this.logger.info(() -> "Got watch response: " + info.statusCode() + ", skipping");
                            return HttpResponse.BodySubscribers.discarding();
                        }
                        this.logger.info(() -> "Got watch response: " + info.statusCode() + ", starting to watch");
                        return HttpResponse.BodySubscribers.fromLineSubscriber((Flow.Subscriber<? super String>)new Flow.Subscriber<String>(){
                            private Flow.Subscription subscription;

                            @Override
                            public void onSubscribe(Flow.Subscription subscription) {
                                if (this.subscription != null) {
                                    this.subscription.cancel();
                                }
                                this.subscription = subscription;
                                this.subscription.request(Long.MAX_VALUE);
                            }

                            @Override
                            public void onNext(String line) {
                                Event model = (Event)OperatorLoop.this.jsonb.fromJson(line, Event.class);
                                if ("BOOKMARK".equalsIgnoreCase(model.getType())) {
                                    if (model.getObject().getMetadata().getResourceVersion() != null) {
                                        OperatorLoop.this.syncResourceVersion(model.getObject().getMetadata().getResourceVersion(), lastResourceVersionLocation);
                                    }
                                } else if ("ERROR".equalsIgnoreCase(model.getType())) {
                                    OperatorLoop.this.logger.log(Level.SEVERE, () -> "Error event: '" + line + "'");
                                } else if (model.getType() != null) {
                                    String max;
                                    try {
                                        max = OperatorLoop.this.max(OperatorLoop.this.readLastResourceVersion(lastResourceVersionLocation), model.getObject().getMetadata().getResourceVersion());
                                    }
                                    catch (IOException e) {
                                        max = model.getObject().getMetadata().getResourceVersion();
                                    }
                                    if (max != null && !max.isBlank()) {
                                        OperatorLoop.this.syncResourceVersion(max, lastResourceVersionLocation);
                                    }
                                    pool.submit(() -> {
                                        try {
                                            OperatorLoop.this.actionHandler.onEvent(model.getType(), model.getObject());
                                        }
                                        catch (RuntimeException re) {
                                            OperatorLoop.this.logger.log(Level.SEVERE, re, re::getMessage);
                                        }
                                    });
                                }
                            }

                            @Override
                            public void onError(Throwable throwable) {
                                OperatorLoop.this.logger.log(Level.SEVERE, throwable, throwable::getMessage);
                            }

                            @Override
                            public void onComplete() {
                                OperatorLoop.this.logger.finest(() -> "Ending watching current request");
                            }
                        });
                    });
                }
                catch (IOException | RuntimeException ex) {
                    this.logger.log(Level.WARNING, ex, ex::getMessage);
                    try {
                        Thread.sleep(500L);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
                catch (InterruptedException ie) {
                    this.logger.info("Application interrupted");
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
        finally {
            try {
                Runtime.getRuntime().removeShutdownHook(hook);
            }
            catch (IllegalStateException illegalStateException) {}
            this.stopPool(pool);
        }
    }

    private String readLastResourceVersion(Path lastResourceVersionLocation) throws IOException {
        return Files.exists(lastResourceVersionLocation, new LinkOption[0]) ? Files.readString(lastResourceVersionLocation).strip() : "";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void syncResourceVersion(String resourceVersion, Path lastResourceVersionLocation) {
        Path path = lastResourceVersionLocation;
        synchronized (path) {
            try {
                Files.writeString(lastResourceVersionLocation, (CharSequence)resourceVersion, new OpenOption[0]);
            }
            catch (IOException e) {
                this.logger.log(Level.WARNING, e, e::getMessage);
            }
        }
    }

    private String max(String current, String newOne) {
        if (current == null) {
            return newOne;
        }
        if (newOne == null) {
            return current;
        }
        try {
            return Integer.parseInt(current) - Integer.parseInt(newOne) < 0 ? newOne : current;
        }
        catch (NumberFormatException nfe) {
            return current;
        }
    }

    private void checkPerms() {
        try {
            if (((HttpResponse)this.client.execute(HttpRequest.newBuilder().header("Accept", "application/json"), "/apis/bundlebee.yupiik.io/v1/namespaces/" + this.client.getNamespace() + "/alveoli?limit=1").toCompletableFuture().get()).statusCode() != 200) {
                throw new IllegalStateException("Can't call Kubernetes API to get alveoli, check your role setup.");
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException e) {
            throw new IllegalStateException(e);
        }
    }

    private void stopPool(ExecutorService pool) {
        pool.shutdownNow();
        try {
            if (!pool.awaitTermination(1L, TimeUnit.MINUTES)) {
                this.logger.warning("Didn't stop properly in 1mn, giving up");
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private ExecutorService createThreadPool() {
        int threads = Math.max(1, Runtime.getRuntime().availableProcessors());
        return new ForkJoinPool(threads, new ForkJoinPool.ForkJoinWorkerThreadFactory(){
            private final AtomicInteger counter = new AtomicInteger();

            @Override
            public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
                String name = BundlebeeOperator.class.getName() + "-" + this.counter.incrementAndGet();
                ForkJoinWorkerThread thread = new ForkJoinWorkerThread(pool){};
                thread.setName(name);
                thread.setContextClassLoader(BundlebeeOperator.class.getClassLoader());
                return thread;
            }
        }, (t, e) -> this.logger.log(Level.SEVERE, e, e::getMessage), true);
    }
}

