/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.commandrouter.impl;

import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.eclipse.hono.client.command.CommandRoutingUtil;
import org.eclipse.hono.commandrouter.AdapterInstanceStatusService;
import org.eclipse.hono.util.AdapterInstanceStatus;
import org.eclipse.hono.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KubernetesBasedAdapterInstanceStatusService
implements AdapterInstanceStatusService {
    static final Duration MIN_TIME_IN_SUSPECTED_STATE = Duration.ofMinutes(5L);
    private static final Logger LOG = LoggerFactory.getLogger(KubernetesBasedAdapterInstanceStatusService.class);
    private static final String ADAPTER_NAME_MATCH = "adapter";
    private static final long WATCH_RECREATION_DELAY_MILLIS = 100L;
    private static final int MAX_LRU_MAP_ENTRIES = 200;
    private static final String STARTING_POD_UNKNOWN_CONTAINER_ID_PLACEHOLDER = "";
    private final KubernetesClient client;
    private final String namespace;
    private final AtomicInteger active = new AtomicInteger();
    private final Map<String, String> containerIdToPodNameMap = new ConcurrentHashMap<String, String>();
    private final Map<String, String> podNameToContainerIdMap = new ConcurrentHashMap<String, String>();
    private final Set<String> terminatedContainerIds = Collections.newSetFromMap(Collections.synchronizedMap(new LRUMap(200)));
    private final Map<String, Instant> suspectedContainerIds = Collections.synchronizedMap(new LRUMap(200));
    private Watch watch;
    private Clock clock = Clock.systemUTC();

    private KubernetesBasedAdapterInstanceStatusService() throws KubernetesClientException {
        this(new KubernetesClientBuilder().build());
    }

    KubernetesBasedAdapterInstanceStatusService(KubernetesClient client) throws KubernetesClientException {
        this.client = Objects.requireNonNull(client);
        this.namespace = Optional.ofNullable(client.getNamespace()).orElse("default");
        this.initAdaptersListAndWatch();
    }

    public static KubernetesBasedAdapterInstanceStatusService create() {
        if (KubernetesBasedAdapterInstanceStatusService.runningInKubernetes()) {
            try {
                return new KubernetesBasedAdapterInstanceStatusService();
            }
            catch (Exception e) {
                LOG.error("error creating KubernetesClient or pod watch: {}", (Object)e.toString());
            }
        }
        return null;
    }

    void setClock(Clock clock) {
        this.clock = Objects.requireNonNull(clock);
    }

    private static boolean runningInKubernetes() {
        return System.getenv("KUBERNETES_SERVICE_HOST") != null;
    }

    private void initAdaptersListAndWatch() throws KubernetesClientException {
        if (this.active.get() == -1) {
            return;
        }
        NonNamespaceOperation podListResource = (NonNamespaceOperation)this.client.pods().inNamespace(this.namespace);
        this.refreshContainerLists(((PodList)podListResource.list()).getItems());
        this.watch = podListResource.watch((Watcher)new Watcher<Pod>(){

            public void eventReceived(Watcher.Action watchAction, Pod pod) {
                LOG.trace("event received: {}, pod: {}", (Object)watchAction, pod != null ? pod.getMetadata().getName() : null);
                if (pod != null && KubernetesBasedAdapterInstanceStatusService.isPodNameMatch(pod)) {
                    KubernetesBasedAdapterInstanceStatusService.this.applyPodStatus(pod, watchAction);
                }
            }

            public void onClose(WatcherException e) {
                KubernetesBasedAdapterInstanceStatusService.this.onWatcherClosed(e);
            }
        });
        this.active.compareAndExchange(0, 1);
        LOG.info("initialized list of active adapter containers: {}", this.containerIdToPodNameMap.size() <= 20 ? this.containerIdToPodNameMap : this.containerIdToPodNameMap.size() + " containers");
    }

    private static boolean isPodNameMatch(Pod pod) {
        return pod.getMetadata().getName().contains(ADAPTER_NAME_MATCH);
    }

    private synchronized void refreshContainerLists(List<Pod> podList) {
        this.containerIdToPodNameMap.clear();
        this.podNameToContainerIdMap.clear();
        LOG.info("refresh container status list");
        podList.forEach(pod -> {
            if (KubernetesBasedAdapterInstanceStatusService.isPodNameMatch(pod)) {
                LOG.trace("handle pod list result entry: {}", (Object)pod.getMetadata().getName());
                this.applyPodStatus((Pod)pod, null);
            }
        });
        HashMap<String, Instant> oldSuspectedIds = new HashMap<String, Instant>(this.suspectedContainerIds);
        this.suspectedContainerIds.clear();
        oldSuspectedIds.forEach((suspectedId, timeAdded) -> {
            if (!this.containerIdToPodNameMap.containsKey(suspectedId)) {
                if (timeAdded.plus(MIN_TIME_IN_SUSPECTED_STATE).isBefore(Instant.now(this.clock))) {
                    this.terminatedContainerIds.add((String)suspectedId);
                } else {
                    this.suspectedContainerIds.put((String)suspectedId, (Instant)timeAdded);
                }
            }
        });
    }

    private synchronized void applyPodStatus(Pod pod, Watcher.Action watchAction) {
        if (watchAction == Watcher.Action.DELETED) {
            String podName = pod.getMetadata().getName();
            String shortContainerId = this.podNameToContainerIdMap.remove(podName);
            if (STARTING_POD_UNKNOWN_CONTAINER_ID_PLACEHOLDER.equals(shortContainerId)) {
                this.onAdapterContainerRemoved(podName, null);
            } else if (shortContainerId != null && this.containerIdToPodNameMap.remove(shortContainerId) != null) {
                LOG.info("removed entry for deleted pod [{}], container [{}]; active adapter containers now: {}", new Object[]{podName, shortContainerId, this.containerIdToPodNameMap.size()});
                this.terminatedContainerIds.add(shortContainerId);
                this.onAdapterContainerRemoved(podName, shortContainerId);
            }
        } else if (watchAction == Watcher.Action.ERROR) {
            LOG.error("got ERROR watch action event");
        } else {
            pod.getStatus().getContainerStatuses().forEach(containerStatus -> {
                if (containerStatus.getName().contains(ADAPTER_NAME_MATCH)) {
                    this.applyContainerStatus(pod, (ContainerStatus)containerStatus, watchAction);
                }
            });
            if (pod.getStatus().getContainerStatuses().isEmpty() && watchAction == Watcher.Action.ADDED) {
                this.registerAddedPodWithoutStartedContainer(pod.getMetadata().getName(), "pod ADDED");
            }
        }
    }

    private void applyContainerStatus(Pod pod, ContainerStatus containerStatus, Watcher.Action watchAction) {
        String podName = pod.getMetadata().getName();
        if (containerStatus.getContainerID() == null) {
            if (containerStatus.getState().getWaiting() != null && containerStatus.getLastState().getRunning() == null && containerStatus.getLastState().getTerminated() == null) {
                this.registerAddedPodWithoutStartedContainer(podName, containerStatus.getState().getWaiting().getReason());
            } else {
                this.podNameToContainerIdMap.remove(podName, STARTING_POD_UNKNOWN_CONTAINER_ID_PLACEHOLDER);
            }
            return;
        }
        this.podNameToContainerIdMap.remove(podName, STARTING_POD_UNKNOWN_CONTAINER_ID_PLACEHOLDER);
        String shortContainerId = KubernetesBasedAdapterInstanceStatusService.getShortContainerId(containerStatus.getContainerID());
        if (shortContainerId == null) {
            LOG.warn("unexpected format of container id [{}] in pod [{}]", (Object)containerStatus.getContainerID(), (Object)podName);
        } else if (containerStatus.getState() != null && containerStatus.getState().getTerminated() != null) {
            this.podNameToContainerIdMap.remove(podName, shortContainerId);
            if (this.containerIdToPodNameMap.remove(shortContainerId) != null) {
                LOG.info("removed entry for pod [{}] and terminated container [{}] (reason: '{}'); active adapter containers now: {}", new Object[]{podName, shortContainerId, containerStatus.getState().getTerminated().getReason(), this.containerIdToPodNameMap.size()});
                this.terminatedContainerIds.add(shortContainerId);
                this.onAdapterContainerRemoved(podName, shortContainerId);
            }
        } else {
            String oldContainerId;
            if (watchAction == Watcher.Action.MODIFIED && (oldContainerId = this.podNameToContainerIdMap.get(podName)) != null && !oldContainerId.equals(shortContainerId)) {
                this.podNameToContainerIdMap.remove(podName, oldContainerId);
                if (this.containerIdToPodNameMap.remove(oldContainerId) != null) {
                    LOG.info("removed obsolete entry for pod [{}], container [{}]; active adapter containers now: {}", new Object[]{podName, oldContainerId, this.containerIdToPodNameMap.size()});
                    this.terminatedContainerIds.add(oldContainerId);
                    this.onAdapterContainerRemoved(podName, oldContainerId);
                }
            }
            this.podNameToContainerIdMap.put(podName, shortContainerId);
            if (this.containerIdToPodNameMap.put(shortContainerId, podName) == null) {
                LOG.info("added entry for pod [{}], container [{}]; active adapter containers now: {}", new Object[]{podName, shortContainerId, this.containerIdToPodNameMap.size()});
                this.suspectedContainerIds.remove(shortContainerId);
                this.onAdapterContainerAdded(podName, shortContainerId);
            }
        }
    }

    private void registerAddedPodWithoutStartedContainer(String podName, String statusInfo) {
        if (!this.podNameToContainerIdMap.containsKey(podName)) {
            LOG.debug("new pod [{}] found [state: {}]", (Object)podName, (Object)statusInfo);
            this.podNameToContainerIdMap.put(podName, STARTING_POD_UNKNOWN_CONTAINER_ID_PLACEHOLDER);
            this.onAdapterContainerAdded(podName, null);
        }
    }

    protected void onAdapterContainerAdded(String podName, String containerId) {
    }

    protected void onAdapterContainerRemoved(String podName, String containerId) {
    }

    private void onWatcherClosed(WatcherException cause) {
        if (this.active.compareAndExchange(1, 0) != 1) {
            return;
        }
        this.containerIdToPodNameMap.clear();
        this.podNameToContainerIdMap.clear();
        LOG.error("Watcher closed with error", (Throwable)cause);
        while (this.active.get() == 0) {
            try {
                Thread.sleep(100L);
                LOG.info("Recreating watch");
                this.initAdaptersListAndWatch();
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
            }
            catch (Exception ex) {
                LOG.error("error re-initializing adapter list and pod watch", (Throwable)cause);
            }
        }
    }

    public Future<Void> start() {
        return Future.succeededFuture();
    }

    public Future<Void> stop() {
        LOG.trace("stopping status service");
        if (this.active.getAndSet(-1) == -1) {
            return Future.succeededFuture();
        }
        Watch w = this.watch;
        if (w != null) {
            w.close();
        }
        this.client.close();
        return Future.succeededFuture();
    }

    public AdapterInstanceStatus getStatus(String adapterInstanceId) {
        if (this.active.get() != 1) {
            LOG.debug("no status info available for adapter instance id [{}]; service not active", (Object)adapterInstanceId);
            return AdapterInstanceStatus.UNKNOWN;
        }
        Pair matchedPodNameAndContainerIdPair = CommandRoutingUtil.getK8sPodNameAndContainerIdFromAdapterInstanceId((String)adapterInstanceId);
        if (matchedPodNameAndContainerIdPair == null) {
            return AdapterInstanceStatus.UNKNOWN;
        }
        String shortContainerId = (String)matchedPodNameAndContainerIdPair.two();
        String registeredPodName = this.containerIdToPodNameMap.get(shortContainerId);
        if (registeredPodName != null) {
            LOG.trace("found alive container in pod [{}] for adapter instance id [{}]", (Object)registeredPodName, (Object)adapterInstanceId);
            return AdapterInstanceStatus.ALIVE;
        }
        if (STARTING_POD_UNKNOWN_CONTAINER_ID_PLACEHOLDER.equals(this.podNameToContainerIdMap.get(matchedPodNameAndContainerIdPair.one()))) {
            LOG.debug("returning status UNKNOWN for adapter instance id [{}] - container id not known but found corresponding pod (with no known started container there yet)", (Object)adapterInstanceId);
            return AdapterInstanceStatus.UNKNOWN;
        }
        if (this.terminatedContainerIds.contains(shortContainerId)) {
            LOG.debug("container already terminated for adapter instance id [{}]", (Object)adapterInstanceId);
            return AdapterInstanceStatus.DEAD;
        }
        LOG.debug("no container found for adapter instance id [{}]", (Object)adapterInstanceId);
        this.suspectedContainerIds.putIfAbsent(shortContainerId, Instant.now(this.clock));
        return AdapterInstanceStatus.SUSPECTED_DEAD;
    }

    public Future<Set<String>> getDeadAdapterInstances(Collection<String> adapterInstanceIds) {
        Objects.requireNonNull(adapterInstanceIds);
        if (this.active.get() != 1) {
            return Future.failedFuture((String)"service not active");
        }
        boolean needPodListRefresh = false;
        HashSet<String> resultSet = new HashSet<String>();
        for (String adapterInstanceId : adapterInstanceIds) {
            AdapterInstanceStatus status = this.getStatus(adapterInstanceId);
            if (status == AdapterInstanceStatus.DEAD) {
                resultSet.add(adapterInstanceId);
                continue;
            }
            if (status != AdapterInstanceStatus.SUSPECTED_DEAD) continue;
            needPodListRefresh = true;
        }
        if (needPodListRefresh) {
            Promise resultPromise = Promise.promise();
            Handler resultProvider = promise -> {
                try {
                    this.refreshContainerLists(((PodList)((NonNamespaceOperation)this.client.pods().inNamespace(this.namespace)).list()).getItems());
                    Set deadAdapterInstances = adapterInstanceIds.stream().filter(id -> this.getStatus((String)id) == AdapterInstanceStatus.DEAD).collect(Collectors.toSet());
                    promise.complete(deadAdapterInstances);
                }
                catch (Exception e) {
                    promise.fail((Throwable)e);
                }
            };
            Optional.ofNullable(Vertx.currentContext()).ifPresentOrElse(ctx -> ctx.executeBlocking(resultProvider, false, (Handler)resultPromise), () -> resultProvider.handle((Object)resultPromise));
            return resultPromise.future();
        }
        return Future.succeededFuture(resultSet);
    }

    Optional<Set<String>> getActiveAdapterInstanceContainerIds() {
        if (this.active.get() != 1) {
            return Optional.empty();
        }
        return Optional.of(new HashSet<String>(this.containerIdToPodNameMap.keySet()));
    }

    static String getShortContainerId(String containerId) {
        int lastSlashIndex = containerId.lastIndexOf(47);
        if (lastSlashIndex == -1) {
            return null;
        }
        String fullId = containerId.substring(lastSlashIndex + 1);
        if (fullId.length() < 12) {
            return null;
        }
        return fullId.substring(0, 12);
    }

    private static class LRUMap<T>
    extends LinkedHashMap<String, T> {
        private static final long serialVersionUID = 1L;
        private final int maxEntries;

        LRUMap(int maxEntries) {
            super(16, 0.75f, true);
            this.maxEntries = maxEntries;
        }

        @Override
        protected boolean removeEldestEntry(Map.Entry<String, T> eldest) {
            return this.size() > this.maxEntries;
        }
    }
}

