/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.memory;

import com.facebook.presto.ExceededMemoryLimitException;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.execution.LocationFactory;
import com.facebook.presto.execution.QueryExecution;
import com.facebook.presto.execution.QueryIdGenerator;
import com.facebook.presto.memory.ClusterMemoryPool;
import com.facebook.presto.memory.ForMemoryManager;
import com.facebook.presto.memory.LocalMemoryManager;
import com.facebook.presto.memory.MemoryInfo;
import com.facebook.presto.memory.MemoryManagerConfig;
import com.facebook.presto.memory.MemoryPoolAssignment;
import com.facebook.presto.memory.MemoryPoolAssignmentsRequest;
import com.facebook.presto.memory.MemoryPoolId;
import com.facebook.presto.memory.RemoteNodeMemory;
import com.facebook.presto.memory.VersionedMemoryPoolId;
import com.facebook.presto.server.ServerConfig;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.util.ImmutableCollectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.airlift.http.client.HttpClient;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject;
import org.weakref.jmx.JmxException;
import org.weakref.jmx.MBeanExporter;
import org.weakref.jmx.Managed;
import org.weakref.jmx.ObjectNames;

public class ClusterMemoryManager {
    private static final Logger log = Logger.get(ClusterMemoryManager.class);
    private final NodeManager nodeManager;
    private final LocationFactory locationFactory;
    private final HttpClient httpClient;
    private final MBeanExporter exporter;
    private final JsonCodec<MemoryInfo> memoryInfoCodec;
    private final JsonCodec<MemoryPoolAssignmentsRequest> assignmentsRequestJsonCodec;
    private final DataSize maxQueryMemory;
    private final boolean enabled;
    private final String coordinatorId;
    private final AtomicLong memoryPoolAssignmentsVersion = new AtomicLong();
    private final AtomicLong clusterMemoryUsageBytes = new AtomicLong();
    private final AtomicLong clusterMemoryBytes = new AtomicLong();
    private final Map<String, RemoteNodeMemory> nodes = new HashMap<String, RemoteNodeMemory>();
    @GuardedBy(value="this")
    private final Map<MemoryPoolId, ClusterMemoryPool> pools = new HashMap<MemoryPoolId, ClusterMemoryPool>();

    @Inject
    public ClusterMemoryManager(@ForMemoryManager HttpClient httpClient, NodeManager nodeManager, LocationFactory locationFactory, MBeanExporter exporter, JsonCodec<MemoryInfo> memoryInfoCodec, JsonCodec<MemoryPoolAssignmentsRequest> assignmentsRequestJsonCodec, QueryIdGenerator queryIdGenerator, ServerConfig serverConfig, MemoryManagerConfig config) {
        Objects.requireNonNull(config, "config is null");
        this.nodeManager = Objects.requireNonNull(nodeManager, "nodeManager is null");
        this.locationFactory = Objects.requireNonNull(locationFactory, "locationFactory is null");
        this.httpClient = Objects.requireNonNull(httpClient, "httpClient is null");
        this.exporter = Objects.requireNonNull(exporter, "exporter is null");
        this.memoryInfoCodec = Objects.requireNonNull(memoryInfoCodec, "memoryInfoCodec is null");
        this.assignmentsRequestJsonCodec = Objects.requireNonNull(assignmentsRequestJsonCodec, "assignmentsRequestJsonCodec is null");
        this.maxQueryMemory = config.getMaxQueryMemory();
        this.coordinatorId = queryIdGenerator.getCoordinatorId();
        this.enabled = config.isClusterMemoryManagerEnabled() && serverConfig.isCoordinator();
    }

    public void process(Iterable<QueryExecution> queries) {
        if (!this.enabled) {
            return;
        }
        long totalBytes = 0L;
        for (QueryExecution query : queries) {
            long bytes = query.getTotalMemoryReservation();
            DataSize sessionMaxQueryMemory = SystemSessionProperties.getQueryMaxMemory(query.getSession());
            long queryMemoryLimit = Math.min(this.maxQueryMemory.toBytes(), sessionMaxQueryMemory.toBytes());
            totalBytes += bytes;
            if (bytes <= queryMemoryLimit) continue;
            query.fail((Throwable)((Object)new ExceededMemoryLimitException("Query", DataSize.succinctDataSize((double)queryMemoryLimit, (DataSize.Unit)DataSize.Unit.BYTE))));
        }
        this.clusterMemoryUsageBytes.set(totalBytes);
        HashMap<MemoryPoolId, Integer> countByPool = new HashMap<MemoryPoolId, Integer>();
        for (QueryExecution query : queries) {
            MemoryPoolId id = query.getMemoryPool().getId();
            countByPool.put(id, countByPool.getOrDefault(id, 0) + 1);
        }
        this.updatePools(countByPool);
        this.updateNodes(this.updateAssignments(queries));
    }

    @VisibleForTesting
    synchronized Map<MemoryPoolId, ClusterMemoryPool> getPools() {
        return ImmutableMap.copyOf(this.pools);
    }

    private MemoryPoolAssignmentsRequest updateAssignments(Iterable<QueryExecution> queries) {
        ClusterMemoryPool reservedPool = this.pools.get(LocalMemoryManager.RESERVED_POOL);
        ClusterMemoryPool generalPool = this.pools.get(LocalMemoryManager.GENERAL_POOL);
        long version = this.memoryPoolAssignmentsVersion.incrementAndGet();
        if (reservedPool != null && generalPool != null && this.allAssignmentsHavePropagated(queries) && reservedPool.getQueries() == 0 && generalPool.getBlockedNodes() > 0) {
            QueryExecution biggestQuery = null;
            long maxMemory = -1L;
            for (QueryExecution queryExecution : queries) {
                long bytesUsed = queryExecution.getTotalMemoryReservation();
                if (bytesUsed <= maxMemory) continue;
                biggestQuery = queryExecution;
                maxMemory = bytesUsed;
            }
            for (QueryExecution queryExecution : queries) {
                if (!queryExecution.getQueryId().equals(biggestQuery.getQueryId())) continue;
                queryExecution.setMemoryPool(new VersionedMemoryPoolId(LocalMemoryManager.RESERVED_POOL, version));
            }
        }
        ImmutableList.Builder assignments = ImmutableList.builder();
        for (QueryExecution queryExecution : queries) {
            assignments.add((Object)new MemoryPoolAssignment(queryExecution.getQueryId(), queryExecution.getMemoryPool().getId()));
        }
        return new MemoryPoolAssignmentsRequest(this.coordinatorId, version, (List<MemoryPoolAssignment>)assignments.build());
    }

    private boolean allAssignmentsHavePropagated(Iterable<QueryExecution> queries) {
        long mostOutOfDateNode;
        if (this.nodes.isEmpty()) {
            return false;
        }
        long newestAssignment = ImmutableList.copyOf(queries).stream().map(QueryExecution::getMemoryPool).mapToLong(VersionedMemoryPoolId::getVersion).min().orElse(-1L);
        return newestAssignment <= (mostOutOfDateNode = this.nodes.values().stream().mapToLong(RemoteNodeMemory::getCurrentAssignmentVersion).min().orElse(Long.MAX_VALUE));
    }

    private void updateNodes(MemoryPoolAssignmentsRequest assignments) {
        Set activeNodes = this.nodeManager.getActiveNodes();
        ImmutableSet activeNodeIds = activeNodes.stream().map(Node::getNodeIdentifier).collect(ImmutableCollectors.toImmutableSet());
        ImmutableSet deadNodes = ImmutableSet.copyOf((Collection)Sets.difference(this.nodes.keySet(), activeNodeIds));
        this.nodes.keySet().removeAll((Collection<?>)deadNodes);
        for (Node node : activeNodes) {
            if (this.nodes.containsKey(node.getNodeIdentifier())) continue;
            this.nodes.put(node.getNodeIdentifier(), new RemoteNodeMemory(this.httpClient, this.memoryInfoCodec, this.assignmentsRequestJsonCodec, this.locationFactory.createMemoryInfoLocation(node)));
        }
        for (RemoteNodeMemory remoteNodeMemory : this.nodes.values()) {
            remoteNodeMemory.asyncRefresh(assignments);
        }
    }

    private synchronized void updatePools(Map<MemoryPoolId, Integer> queryCounts) {
        List nodeMemoryInfos = (List)this.nodes.values().stream().map(RemoteNodeMemory::getInfo).filter(Optional::isPresent).map(Optional::get).collect(ImmutableCollectors.toImmutableList());
        long totalClusterMemory = nodeMemoryInfos.stream().map(MemoryInfo::getTotalNodeMemory).mapToLong(DataSize::toBytes).sum();
        this.clusterMemoryBytes.set(totalClusterMemory);
        Set activePoolIds = (Set)nodeMemoryInfos.stream().flatMap(info -> info.getPools().keySet().stream()).collect(ImmutableCollectors.toImmutableSet());
        ImmutableSet removedPools = ImmutableSet.copyOf((Collection)Sets.difference(this.pools.keySet(), (Set)activePoolIds));
        for (MemoryPoolId removed : removedPools) {
            this.unexport(this.pools.get(removed));
            this.pools.remove(removed);
        }
        for (MemoryPoolId id : activePoolIds) {
            ClusterMemoryPool pool = this.pools.computeIfAbsent(id, poolId -> {
                ClusterMemoryPool newPool = new ClusterMemoryPool((MemoryPoolId)poolId);
                String objectName = ObjectNames.builder(ClusterMemoryPool.class, (String)newPool.getId().toString()).build();
                try {
                    this.exporter.export(objectName, (Object)newPool);
                }
                catch (JmxException e) {
                    log.error((Throwable)e, "Error exporting memory pool %s", new Object[]{poolId});
                }
                return newPool;
            });
            pool.update(nodeMemoryInfos, queryCounts.getOrDefault(pool.getId(), 0));
        }
    }

    @PreDestroy
    public synchronized void destroy() {
        for (ClusterMemoryPool pool : this.pools.values()) {
            this.unexport(pool);
        }
        this.pools.clear();
    }

    private void unexport(ClusterMemoryPool pool) {
        try {
            String objectName = ObjectNames.builder(ClusterMemoryPool.class, (String)pool.getId().toString()).build();
            this.exporter.unexport(objectName);
        }
        catch (JmxException e) {
            log.error((Throwable)e, "Failed to unexport pool %s", new Object[]{pool.getId()});
        }
    }

    @Managed
    public long getClusterMemoryUsageBytes() {
        return this.clusterMemoryUsageBytes.get();
    }

    @Managed
    public long getClusterMemoryBytes() {
        return this.clusterMemoryBytes.get();
    }
}

