/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.ttl.nodettlfetchermanagers;

import com.facebook.airlift.concurrent.Threads;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.CounterStat;
import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
import com.facebook.presto.metadata.AllNodes;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.spi.ttl.NodeInfo;
import com.facebook.presto.spi.ttl.NodeTtl;
import com.facebook.presto.spi.ttl.NodeTtlFetcher;
import com.facebook.presto.spi.ttl.NodeTtlFetcherFactory;
import com.facebook.presto.ttl.nodettlfetchermanagers.NodeTtlFetcherManager;
import com.facebook.presto.ttl.nodettlfetchermanagers.NodeTtlFetcherManagerConfig;
import com.facebook.presto.util.PeriodicTaskExecutor;
import com.facebook.presto.util.PropertiesUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.airlift.units.Duration;
import java.io.File;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

public class ConfidenceBasedNodeTtlFetcherManager
implements NodeTtlFetcherManager {
    private static final Logger log = Logger.get(ConfidenceBasedNodeTtlFetcherManager.class);
    private static final File TTL_FETCHER_CONFIG = new File("etc/node-ttl-fetcher.properties");
    private static final String TTL_FETCHER_PROPERTY_NAME = "node-ttl-fetcher.factory";
    private final AtomicReference<NodeTtlFetcher> ttlFetcher = new AtomicReference();
    private final InternalNodeManager nodeManager;
    private final ConcurrentHashMap<InternalNode, NodeTtl> nodeTtlMap = new ConcurrentHashMap();
    private final boolean isWorkScheduledOnCoordinator;
    private final Map<String, NodeTtlFetcherFactory> ttlFetcherFactories = new ConcurrentHashMap<String, NodeTtlFetcherFactory>();
    private final NodeTtlFetcherManagerConfig nodeTtlFetcherManagerConfig;
    private final AtomicLong lastRefreshEpochMillis = new AtomicLong(Long.MAX_VALUE);
    private final CounterStat refreshFailures = new CounterStat();
    private final Consumer<AllNodes> nodeChangeListener = this::refreshTtlInfo;
    private PeriodicTaskExecutor periodicTtlRefresher;

    @Inject
    public ConfidenceBasedNodeTtlFetcherManager(InternalNodeManager nodeManager, NodeSchedulerConfig schedulerConfig, NodeTtlFetcherManagerConfig nodeTtlFetcherManagerConfig) {
        this.nodeManager = Objects.requireNonNull(nodeManager, "nodeManager is null");
        this.isWorkScheduledOnCoordinator = Objects.requireNonNull(schedulerConfig, "nodeSchedulerConfig is null").isIncludeCoordinator();
        this.nodeTtlFetcherManagerConfig = Objects.requireNonNull(nodeTtlFetcherManagerConfig, "nodeTtlFetcherManagerConfig is null");
    }

    private static long jitterForPeriodicRefresh(long delayMillis) {
        double maxJitter = (double)delayMillis * 0.1;
        return Math.round((double)delayMillis + maxJitter * (2.0 * ThreadLocalRandom.current().nextDouble() - 1.0));
    }

    public void scheduleRefresh() {
        this.periodicTtlRefresher = new PeriodicTaskExecutor(this.ttlFetcher.get().getRefreshInterval().toMillis(), this.nodeTtlFetcherManagerConfig.getInitialDelayBeforeRefresh().toMillis(), Executors.newSingleThreadScheduledExecutor(Threads.threadsNamed((String)"refresh-node-ttl-executor-%s")), this::refreshTtlInfo, ConfidenceBasedNodeTtlFetcherManager::jitterForPeriodicRefresh);
        this.periodicTtlRefresher.start();
    }

    @PreDestroy
    public void stop() {
        this.nodeManager.removeNodeChangeListener(this.nodeChangeListener);
        if (this.periodicTtlRefresher != null) {
            this.periodicTtlRefresher.stop();
        }
    }

    @VisibleForTesting
    public synchronized void refreshTtlInfo() {
        AllNodes allNodes = this.nodeManager.getAllNodes();
        this.refreshTtlInfo(allNodes);
    }

    private synchronized void refreshTtlInfo(AllNodes allNodes) {
        try {
            Sets.SetView activeWorkers = Sets.difference(allNodes.getActiveNodes(), allNodes.getActiveResourceManagers());
            if (!this.isWorkScheduledOnCoordinator) {
                activeWorkers = Sets.difference((Set)activeWorkers, allNodes.getActiveCoordinators());
            }
            Map internalNodeMap = (Map)activeWorkers.stream().collect(ImmutableMap.toImmutableMap(node -> new NodeInfo(node.getNodeIdentifier(), node.getHost()), Function.identity()));
            Map ttlInfo = this.ttlFetcher.get().getTtlInfo((Set)ImmutableSet.copyOf(internalNodeMap.keySet()));
            this.nodeTtlMap.putAll((Map)ttlInfo.entrySet().stream().collect(ImmutableMap.toImmutableMap(e -> (InternalNode)internalNodeMap.get(e.getKey()), Map.Entry::getValue)));
            ImmutableSet deadNodes = Sets.difference((Set)this.nodeTtlMap.keySet(), (Set)activeWorkers).immutableCopy();
            ((ConcurrentHashMap.KeySetView)this.nodeTtlMap.keySet()).removeAll((Collection)deadNodes);
            log.info("Node ttls refreshed, nodeTtlMap: %s", new Object[]{this.nodeTtlMap});
            this.lastRefreshEpochMillis.set(System.currentTimeMillis());
        }
        catch (Throwable e2) {
            this.refreshFailures.update(1L);
            log.error(e2, "Error loading node ttls");
        }
    }

    @Override
    public Optional<NodeTtl> getTtlInfo(InternalNode node) {
        return this.nodeTtlMap.containsKey(node) ? Optional.of(this.nodeTtlMap.get(node)) : Optional.empty();
    }

    @Override
    public Map<InternalNode, NodeTtl> getAllTtls() {
        return ImmutableMap.copyOf(this.nodeTtlMap);
    }

    @Override
    public void addNodeTtlFetcherFactory(NodeTtlFetcherFactory nodeTtlFetcherFactory) {
        Objects.requireNonNull(nodeTtlFetcherFactory, "nodeTtlFetcherFactory is null");
        if (this.ttlFetcherFactories.putIfAbsent(nodeTtlFetcherFactory.getName(), nodeTtlFetcherFactory) != null) {
            throw new IllegalArgumentException(String.format("Node ttl fetcher factory '%s' is already registered", nodeTtlFetcherFactory.getName()));
        }
    }

    @Override
    public void loadNodeTtlFetcher() throws Exception {
        String factoryName = "infinite";
        Object properties = ImmutableMap.of();
        if (TTL_FETCHER_CONFIG.exists()) {
            properties = new HashMap<String, String>(PropertiesUtil.loadProperties(TTL_FETCHER_CONFIG));
            factoryName = (String)properties.remove(TTL_FETCHER_PROPERTY_NAME);
            Preconditions.checkArgument((!Strings.isNullOrEmpty((String)factoryName) ? 1 : 0) != 0, (String)"Node ttl fetcher configuration %s does not contain %s", (Object)TTL_FETCHER_CONFIG.getAbsoluteFile(), (Object)TTL_FETCHER_PROPERTY_NAME);
        }
        this.load(factoryName, (Map<String, String>)properties);
        if (this.ttlFetcher.get().needsPeriodicRefresh()) {
            this.scheduleRefresh();
        } else {
            this.refreshTtlInfo();
        }
        this.nodeManager.addNodeChangeListener(this.nodeChangeListener);
    }

    @VisibleForTesting
    public void load(String factoryName, Map<String, String> properties) {
        log.info("-- Loading node ttl fetcher factory --");
        NodeTtlFetcherFactory nodeTtlFetcherFactory = this.ttlFetcherFactories.get(factoryName);
        Preconditions.checkState((nodeTtlFetcherFactory != null ? 1 : 0) != 0, (String)"Node ttl fetcher factory %s is not registered", (Object)factoryName);
        NodeTtlFetcher nodeTtlFetcher = nodeTtlFetcherFactory.create(properties);
        Preconditions.checkState((boolean)this.ttlFetcher.compareAndSet(null, nodeTtlFetcher), (Object)"Node ttl fetcher has already been set!");
        log.info("-- Loaded node ttl fetcher %s --", new Object[]{factoryName});
    }

    @Managed
    public long getTimeInMillisSinceLastTtlRefresh() {
        if (this.lastRefreshEpochMillis.get() == Long.MAX_VALUE) {
            return -1L;
        }
        return System.currentTimeMillis() - this.lastRefreshEpochMillis.get();
    }

    @Managed
    public long getStaleTtlWorkerCount() {
        Duration staleDuration = this.nodeTtlFetcherManagerConfig.getStaleTtlThreshold();
        Instant staleInstant = Instant.now().minus(staleDuration.toMillis(), ChronoUnit.MILLIS);
        return this.nodeTtlMap.values().stream().filter(nodeTtl -> nodeTtl.getTtlPredictionInstant().isBefore(staleInstant)).count();
    }

    @Managed
    @Nested
    public CounterStat getRefreshFailures() {
        return this.refreshFailures;
    }
}

