/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.memory;

import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.ExceededMemoryLimitException;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.execution.LocationFactory;
import com.facebook.presto.execution.QueryExecution;
import com.facebook.presto.execution.QueryIdGenerator;
import com.facebook.presto.execution.QueryTracker;
import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
import com.facebook.presto.memory.ClusterMemoryLeakDetector;
import com.facebook.presto.memory.ClusterMemoryPool;
import com.facebook.presto.memory.ForMemoryManager;
import com.facebook.presto.memory.LocalMemoryManager;
import com.facebook.presto.memory.LowMemoryKiller;
import com.facebook.presto.memory.MemoryInfo;
import com.facebook.presto.memory.MemoryManagerConfig;
import com.facebook.presto.memory.MemoryPoolAssignment;
import com.facebook.presto.memory.MemoryPoolAssignmentsRequest;
import com.facebook.presto.memory.NodeMemoryConfig;
import com.facebook.presto.memory.NoneLowMemoryKiller;
import com.facebook.presto.memory.RemoteNodeMemory;
import com.facebook.presto.memory.VersionedMemoryPoolId;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.server.InternalCommunicationConfig;
import com.facebook.presto.server.ServerConfig;
import com.facebook.presto.server.codec.Codec;
import com.facebook.presto.server.smile.JsonCodecWrapper;
import com.facebook.presto.server.smile.SmileCodec;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.NodeState;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.memory.ClusterMemoryPoolManager;
import com.facebook.presto.spi.memory.MemoryPoolId;
import com.facebook.presto.spi.memory.MemoryPoolInfo;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.MoreCollectors;
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import com.google.common.io.Closer;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject;
import org.weakref.jmx.JmxException;
import org.weakref.jmx.MBeanExporter;
import org.weakref.jmx.Managed;
import org.weakref.jmx.ObjectNames;

public class ClusterMemoryManager
implements ClusterMemoryPoolManager {
    private static final Logger log = Logger.get(ClusterMemoryManager.class);
    private final ExecutorService listenerExecutor = Executors.newSingleThreadExecutor();
    private final ClusterMemoryLeakDetector memoryLeakDetector = new ClusterMemoryLeakDetector();
    private final InternalNodeManager nodeManager;
    private final LocationFactory locationFactory;
    private final HttpClient httpClient;
    private final MBeanExporter exporter;
    private final Codec<MemoryInfo> memoryInfoCodec;
    private final Codec<MemoryPoolAssignmentsRequest> assignmentsRequestCodec;
    private final DataSize maxQueryMemory;
    private final DataSize maxQueryTotalMemory;
    private final boolean enabled;
    private final LowMemoryKiller lowMemoryKiller;
    private final Duration killOnOutOfMemoryDelay;
    private final String coordinatorId;
    private final AtomicLong memoryPoolAssignmentsVersion = new AtomicLong();
    private final AtomicLong clusterUserMemoryReservation = new AtomicLong();
    private final AtomicLong clusterTotalMemoryReservation = new AtomicLong();
    private final AtomicLong clusterMemoryBytes = new AtomicLong();
    private final AtomicLong queriesKilledDueToOutOfMemory = new AtomicLong();
    private final boolean isWorkScheduledOnCoordinator;
    private final boolean isBinaryTransportEnabled;
    @GuardedBy(value="this")
    private final Map<String, RemoteNodeMemory> nodes = new HashMap<String, RemoteNodeMemory>();
    @GuardedBy(value="this")
    private final Map<MemoryPoolId, List<Consumer<MemoryPoolInfo>>> changeListeners = new HashMap<MemoryPoolId, List<Consumer<MemoryPoolInfo>>>();
    @GuardedBy(value="this")
    private final Map<MemoryPoolId, ClusterMemoryPool> pools;
    @GuardedBy(value="this")
    private long lastTimeNotOutOfMemory = System.nanoTime();
    @GuardedBy(value="this")
    private QueryId lastKilledQuery;

    @Inject
    public ClusterMemoryManager(@ForMemoryManager HttpClient httpClient, InternalNodeManager nodeManager, LocationFactory locationFactory, MBeanExporter exporter, JsonCodec<MemoryInfo> memoryInfoJsonCodec, SmileCodec<MemoryInfo> memoryInfoSmileCodec, JsonCodec<MemoryPoolAssignmentsRequest> assignmentsRequestJsonCodec, SmileCodec<MemoryPoolAssignmentsRequest> assignmentsRequestSmileCodec, QueryIdGenerator queryIdGenerator, LowMemoryKiller lowMemoryKiller, ServerConfig serverConfig, MemoryManagerConfig config, NodeMemoryConfig nodeMemoryConfig, NodeSchedulerConfig schedulerConfig, InternalCommunicationConfig communicationConfig) {
        Objects.requireNonNull(config, "config is null");
        Objects.requireNonNull(nodeMemoryConfig, "nodeMemoryConfig is null");
        Objects.requireNonNull(serverConfig, "serverConfig is null");
        Objects.requireNonNull(schedulerConfig, "schedulerConfig is null");
        Objects.requireNonNull(communicationConfig, "communicationConfig is null");
        this.nodeManager = Objects.requireNonNull(nodeManager, "nodeManager is null");
        this.locationFactory = Objects.requireNonNull(locationFactory, "locationFactory is null");
        this.httpClient = Objects.requireNonNull(httpClient, "httpClient is null");
        this.exporter = Objects.requireNonNull(exporter, "exporter is null");
        this.lowMemoryKiller = Objects.requireNonNull(lowMemoryKiller, "lowMemoryKiller is null");
        this.maxQueryMemory = config.getMaxQueryMemory();
        this.maxQueryTotalMemory = config.getMaxQueryTotalMemory();
        this.coordinatorId = queryIdGenerator.getCoordinatorId();
        this.enabled = serverConfig.isCoordinator();
        this.killOnOutOfMemoryDelay = config.getKillOnOutOfMemoryDelay();
        this.isWorkScheduledOnCoordinator = schedulerConfig.isIncludeCoordinator();
        this.isBinaryTransportEnabled = communicationConfig.isBinaryTransportEnabled();
        if (this.isBinaryTransportEnabled) {
            this.memoryInfoCodec = Objects.requireNonNull(memoryInfoSmileCodec, "memoryInfoSmileCodec is null");
            this.assignmentsRequestCodec = Objects.requireNonNull(assignmentsRequestSmileCodec, "assignmentsRequestSmileCodec is null");
        } else {
            this.memoryInfoCodec = JsonCodecWrapper.wrapJsonCodec(Objects.requireNonNull(memoryInfoJsonCodec, "memoryInfoJsonCodec is null"));
            this.assignmentsRequestCodec = JsonCodecWrapper.wrapJsonCodec(Objects.requireNonNull(assignmentsRequestJsonCodec, "assignmentsRequestJsonCodec is null"));
        }
        Verify.verify((this.maxQueryMemory.toBytes() <= this.maxQueryTotalMemory.toBytes() ? 1 : 0) != 0, (String)"maxQueryMemory cannot be greater than maxQueryTotalMemory", (Object[])new Object[0]);
        Verify.verify((config.getSoftMaxQueryMemory().toBytes() <= this.maxQueryMemory.toBytes() ? 1 : 0) != 0, (String)"Soft max query memory cannot be greater than hard limit", (Object[])new Object[0]);
        Verify.verify((config.getSoftMaxQueryTotalMemory().toBytes() <= this.maxQueryTotalMemory.toBytes() ? 1 : 0) != 0, (String)"Soft max query total memory cannot be greater than hard limit", (Object[])new Object[0]);
        this.pools = this.createClusterMemoryPools(nodeMemoryConfig.isReservedPoolEnabled());
    }

    private Map<MemoryPoolId, ClusterMemoryPool> createClusterMemoryPools(boolean reservedPoolEnabled) {
        HashSet<MemoryPoolId> memoryPools = new HashSet<MemoryPoolId>();
        memoryPools.add(LocalMemoryManager.GENERAL_POOL);
        if (reservedPoolEnabled) {
            memoryPools.add(LocalMemoryManager.RESERVED_POOL);
        }
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (MemoryPoolId poolId : memoryPools) {
            ClusterMemoryPool pool = new ClusterMemoryPool(poolId);
            builder.put((Object)poolId, (Object)pool);
            try {
                this.exporter.export(ObjectNames.generatedNameOf(ClusterMemoryPool.class, (String)poolId.toString()), (Object)pool);
            }
            catch (JmxException e) {
                log.error((Throwable)e, "Error exporting memory pool %s", new Object[]{poolId});
            }
        }
        return builder.build();
    }

    public synchronized void addChangeListener(MemoryPoolId poolId, Consumer<MemoryPoolInfo> listener) {
        Verify.verify((boolean)this.memoryPoolExists(poolId), (String)"Memory pool does not exist: %s", (Object)poolId);
        this.changeListeners.computeIfAbsent(poolId, id -> new ArrayList()).add(listener);
    }

    public synchronized boolean memoryPoolExists(MemoryPoolId poolId) {
        return this.pools.containsKey(poolId);
    }

    public synchronized void process(Iterable<QueryExecution> runningQueries, Supplier<List<BasicQueryInfo>> allQueryInfoSupplier) {
        boolean shouldCallOomKiller;
        if (!this.enabled) {
            return;
        }
        this.memoryLeakDetector.checkForMemoryLeaks(allQueryInfoSupplier, this.pools.get(LocalMemoryManager.GENERAL_POOL).getQueryMemoryReservations());
        boolean outOfMemory = this.isClusterOutOfMemory();
        if (!outOfMemory) {
            this.lastTimeNotOutOfMemory = System.nanoTime();
        }
        boolean queryKilled = false;
        long totalUserMemoryBytes = 0L;
        long totalMemoryBytes = 0L;
        for (QueryExecution query : runningQueries) {
            boolean resourceOvercommit = SystemSessionProperties.resourceOvercommit(query.getSession());
            long userMemoryReservation = query.getUserMemoryReservation().toBytes();
            long totalMemoryReservation = query.getTotalMemoryReservation().toBytes();
            if (resourceOvercommit && outOfMemory) {
                DataSize memory = DataSize.succinctBytes((long)this.getQueryMemoryReservation(query));
                query.fail(new PrestoException((ErrorCodeSupplier)StandardErrorCode.CLUSTER_OUT_OF_MEMORY, String.format("The cluster is out of memory and %s=true, so this query was killed. It was using %s of memory", "resource_overcommit", memory)));
                queryKilled = true;
            }
            if (!resourceOvercommit) {
                long totalMemoryLimit;
                long userMemoryLimit = Math.min(this.maxQueryMemory.toBytes(), SystemSessionProperties.getQueryMaxMemory(query.getSession()).toBytes());
                if (userMemoryReservation > userMemoryLimit) {
                    query.fail((Throwable)((Object)ExceededMemoryLimitException.exceededGlobalUserLimit(DataSize.succinctBytes((long)userMemoryLimit))));
                    queryKilled = true;
                }
                if (totalMemoryReservation > (totalMemoryLimit = Math.min(this.maxQueryTotalMemory.toBytes(), SystemSessionProperties.getQueryMaxTotalMemory(query.getSession()).toBytes()))) {
                    query.fail((Throwable)((Object)ExceededMemoryLimitException.exceededGlobalTotalLimit(DataSize.succinctBytes((long)totalMemoryLimit))));
                    queryKilled = true;
                }
            }
            totalUserMemoryBytes += userMemoryReservation;
            totalMemoryBytes += totalMemoryReservation;
        }
        this.clusterUserMemoryReservation.set(totalUserMemoryBytes);
        this.clusterTotalMemoryReservation.set(totalMemoryBytes);
        boolean killOnOomDelayPassed = Duration.nanosSince((long)this.lastTimeNotOutOfMemory).compareTo(this.killOnOutOfMemoryDelay) > 0;
        boolean lastKilledQueryGone = this.isLastKilledQueryGone();
        boolean bl = shouldCallOomKiller = !(this.lowMemoryKiller instanceof NoneLowMemoryKiller) && outOfMemory && !queryKilled && killOnOomDelayPassed && lastKilledQueryGone;
        if (shouldCallOomKiller) {
            this.callOomKiller(runningQueries);
        } else if (outOfMemory) {
            log.debug("The cluster is out of memory and the OOM killer is not called (query killed: %s, kill on OOM delay passed: %s, last killed query gone: %s).", new Object[]{queryKilled, killOnOomDelayPassed, lastKilledQueryGone});
        }
        HashMap<MemoryPoolId, Integer> countByPool = new HashMap<MemoryPoolId, Integer>();
        for (QueryExecution query : runningQueries) {
            MemoryPoolId id = query.getMemoryPool().getId();
            countByPool.put(id, countByPool.getOrDefault(id, 0) + 1);
        }
        this.updatePools(countByPool);
        MemoryPoolAssignmentsRequest assignmentsRequest = this.pools.containsKey(LocalMemoryManager.RESERVED_POOL) ? this.updateAssignments(runningQueries) : new MemoryPoolAssignmentsRequest(this.coordinatorId, Long.MIN_VALUE, (List<MemoryPoolAssignment>)ImmutableList.of());
        this.updateNodes(assignmentsRequest);
    }

    private synchronized void callOomKiller(Iterable<QueryExecution> runningQueries) {
        List nodeMemoryInfos;
        List queryMemoryInfoList = (List)Streams.stream(runningQueries).map(this::createQueryMemoryInfo).collect(ImmutableList.toImmutableList());
        Optional<QueryId> chosenQueryId = this.lowMemoryKiller.chooseQueryToKill(queryMemoryInfoList, nodeMemoryInfos = (List)this.nodes.values().stream().map(RemoteNodeMemory::getInfo).filter(Optional::isPresent).map(Optional::get).collect(ImmutableList.toImmutableList()));
        if (chosenQueryId.isPresent()) {
            log.debug("Low memory killer chose %s", new Object[]{chosenQueryId.get()});
            Optional chosenQuery = (Optional)Streams.stream(runningQueries).filter(query -> ((QueryId)chosenQueryId.get()).equals((Object)query.getQueryId())).collect(MoreCollectors.toOptional());
            if (chosenQuery.isPresent()) {
                ((QueryExecution)chosenQuery.get()).fail(new PrestoException((ErrorCodeSupplier)StandardErrorCode.CLUSTER_OUT_OF_MEMORY, "Query killed because the cluster is out of memory. Please try again in a few minutes."));
                this.queriesKilledDueToOutOfMemory.incrementAndGet();
                this.lastKilledQuery = chosenQueryId.get();
                this.logQueryKill(chosenQueryId.get(), nodeMemoryInfos);
            }
        }
    }

    @GuardedBy(value="this")
    private boolean isLastKilledQueryGone() {
        if (this.lastKilledQuery == null) {
            return true;
        }
        if (this.memoryLeakDetector.wasQueryPossiblyLeaked(this.lastKilledQuery)) {
            this.lastKilledQuery = null;
            return true;
        }
        return !this.pools.get(LocalMemoryManager.GENERAL_POOL).getQueryMemoryReservations().containsKey(this.lastKilledQuery);
    }

    private void logQueryKill(QueryId killedQueryId, List<MemoryInfo> nodes) {
        if (!log.isInfoEnabled()) {
            return;
        }
        StringBuilder nodeDescription = new StringBuilder();
        nodeDescription.append("Query Kill Decision: Killed ").append(killedQueryId).append("\n");
        Comparator<Map.Entry> nodeMemoryComparator = Comparator.comparingLong(Map.Entry::getValue);
        nodes.stream().filter(node -> node.getPools().get(LocalMemoryManager.GENERAL_POOL) != null).map(node -> new AbstractMap.SimpleEntry<MemoryPoolInfo, Long>(node.getPools().get(LocalMemoryManager.GENERAL_POOL), node.getPools().get(LocalMemoryManager.GENERAL_POOL).getQueryMemoryReservations().values().stream().mapToLong(l -> l).sum())).sorted(nodeMemoryComparator.reversed()).map(Map.Entry::getKey).forEachOrdered(memoryPoolInfo -> {
            nodeDescription.append("Query Kill Scenario: ");
            nodeDescription.append("MaxBytes ").append(memoryPoolInfo.getMaxBytes()).append(' ');
            nodeDescription.append("FreeBytes ").append(memoryPoolInfo.getFreeBytes() + memoryPoolInfo.getReservedRevocableBytes()).append(' ');
            nodeDescription.append("Queries ");
            Comparator<Map.Entry> queryMemoryComparator = Comparator.comparingLong(Map.Entry::getValue);
            Stream<Map.Entry> sortedMemoryReservations = memoryPoolInfo.getQueryMemoryReservations().entrySet().stream().sorted(queryMemoryComparator.reversed());
            Joiner.on((String)",").withKeyValueSeparator("=").appendTo(nodeDescription, sortedMemoryReservations::iterator);
            nodeDescription.append('\n');
        });
        log.info(nodeDescription.toString());
    }

    @VisibleForTesting
    synchronized Map<MemoryPoolId, ClusterMemoryPool> getPools() {
        return ImmutableMap.copyOf(this.pools);
    }

    public synchronized Map<MemoryPoolId, MemoryPoolInfo> getMemoryPoolInfo() {
        ImmutableMap.Builder builder = new ImmutableMap.Builder();
        this.pools.forEach((poolId, memoryPool) -> builder.put(poolId, (Object)memoryPool.getInfo()));
        return builder.build();
    }

    private synchronized boolean isClusterOutOfMemory() {
        ClusterMemoryPool reservedPool = this.pools.get(LocalMemoryManager.RESERVED_POOL);
        ClusterMemoryPool generalPool = this.pools.get(LocalMemoryManager.GENERAL_POOL);
        if (reservedPool == null) {
            return generalPool.getBlockedNodes() > 0;
        }
        return reservedPool.getAssignedQueries() > 0 && generalPool.getBlockedNodes() > 0;
    }

    private synchronized MemoryPoolAssignmentsRequest updateAssignments(Iterable<QueryExecution> queries) {
        ClusterMemoryPool reservedPool = this.pools.get(LocalMemoryManager.RESERVED_POOL);
        ClusterMemoryPool generalPool = this.pools.get(LocalMemoryManager.GENERAL_POOL);
        Verify.verify((generalPool != null ? 1 : 0) != 0, (String)"generalPool is null", (Object[])new Object[0]);
        Verify.verify((reservedPool != null ? 1 : 0) != 0, (String)"reservedPool is null", (Object[])new Object[0]);
        long version = this.memoryPoolAssignmentsVersion.incrementAndGet();
        if (this.allAssignmentsHavePropagated(queries) && reservedPool.getAssignedQueries() == 0 && generalPool.getBlockedNodes() > 0) {
            QueryTracker.TrackedQuery biggestQuery = null;
            long maxMemory = -1L;
            for (QueryExecution queryExecution : queries) {
                long bytesUsed;
                if (SystemSessionProperties.resourceOvercommit(queryExecution.getSession()) || (bytesUsed = this.getQueryMemoryReservation(queryExecution)) <= maxMemory) continue;
                biggestQuery = queryExecution;
                maxMemory = bytesUsed;
            }
            if (biggestQuery != null) {
                log.info("Moving query %s to the reserved pool", new Object[]{biggestQuery.getQueryId()});
                biggestQuery.setMemoryPool(new VersionedMemoryPoolId(LocalMemoryManager.RESERVED_POOL, version));
            }
        }
        ImmutableList.Builder assignments = ImmutableList.builder();
        for (QueryExecution queryExecution : queries) {
            assignments.add((Object)new MemoryPoolAssignment(queryExecution.getQueryId(), queryExecution.getMemoryPool().getId()));
        }
        return new MemoryPoolAssignmentsRequest(this.coordinatorId, version, (List<MemoryPoolAssignment>)assignments.build());
    }

    private LowMemoryKiller.QueryMemoryInfo createQueryMemoryInfo(QueryExecution query) {
        return new LowMemoryKiller.QueryMemoryInfo(query.getQueryId(), query.getMemoryPool().getId(), query.getTotalMemoryReservation().toBytes());
    }

    private long getQueryMemoryReservation(QueryExecution query) {
        return query.getTotalMemoryReservation().toBytes();
    }

    private synchronized boolean allAssignmentsHavePropagated(Iterable<QueryExecution> queries) {
        long mostOutOfDateNode;
        if (this.nodes.isEmpty()) {
            return false;
        }
        long newestAssignment = ImmutableList.copyOf(queries).stream().map(QueryExecution::getMemoryPool).mapToLong(VersionedMemoryPoolId::getVersion).min().orElse(-1L);
        return newestAssignment <= (mostOutOfDateNode = this.nodes.values().stream().mapToLong(RemoteNodeMemory::getCurrentAssignmentVersion).min().orElse(Long.MAX_VALUE));
    }

    private synchronized void updateNodes(MemoryPoolAssignmentsRequest assignments) {
        ImmutableSet.Builder builder = ImmutableSet.builder();
        ImmutableSet aliveNodes = builder.addAll(this.nodeManager.getNodes(NodeState.ACTIVE)).addAll(this.nodeManager.getNodes(NodeState.SHUTTING_DOWN)).build();
        ImmutableSet aliveNodeIds = (ImmutableSet)aliveNodes.stream().map(InternalNode::getNodeIdentifier).collect(ImmutableSet.toImmutableSet());
        ImmutableSet deadNodes = ImmutableSet.copyOf((Collection)Sets.difference(this.nodes.keySet(), (Set)aliveNodeIds));
        this.nodes.keySet().removeAll((Collection<?>)deadNodes);
        for (InternalNode internalNode : aliveNodes) {
            if (this.nodes.containsKey(internalNode.getNodeIdentifier())) continue;
            this.nodes.put(internalNode.getNodeIdentifier(), new RemoteNodeMemory(internalNode, this.httpClient, this.memoryInfoCodec, this.assignmentsRequestCodec, this.locationFactory.createMemoryInfoLocation(internalNode), this.isBinaryTransportEnabled));
        }
        if (!this.isWorkScheduledOnCoordinator) {
            this.nodes.remove(this.nodeManager.getCurrentNode().getNodeIdentifier());
        }
        for (RemoteNodeMemory remoteNodeMemory : this.nodes.values()) {
            remoteNodeMemory.asyncRefresh(assignments);
        }
    }

    private synchronized void updatePools(Map<MemoryPoolId, Integer> queryCounts) {
        List nodeMemoryInfos = (List)this.nodes.values().stream().map(RemoteNodeMemory::getInfo).filter(Optional::isPresent).map(Optional::get).collect(ImmutableList.toImmutableList());
        long totalClusterMemory = nodeMemoryInfos.stream().map(MemoryInfo::getTotalNodeMemory).mapToLong(DataSize::toBytes).sum();
        this.clusterMemoryBytes.set(totalClusterMemory);
        for (ClusterMemoryPool pool : this.pools.values()) {
            pool.update(nodeMemoryInfos, queryCounts.getOrDefault(pool.getId(), 0));
            if (!this.changeListeners.containsKey(pool.getId())) continue;
            MemoryPoolInfo info = pool.getInfo();
            for (Consumer<MemoryPoolInfo> listener : this.changeListeners.get(pool.getId())) {
                this.listenerExecutor.execute(() -> listener.accept(info));
            }
        }
    }

    public synchronized Map<String, Optional<MemoryInfo>> getWorkerMemoryInfo() {
        HashMap<String, Optional<MemoryInfo>> memoryInfo = new HashMap<String, Optional<MemoryInfo>>();
        for (Map.Entry<String, RemoteNodeMemory> entry : this.nodes.entrySet()) {
            String workerId = entry.getKey() + " [" + entry.getValue().getNode().getHost() + "]";
            memoryInfo.put(workerId, entry.getValue().getInfo());
        }
        return memoryInfo;
    }

    @PreDestroy
    public synchronized void destroy() throws IOException {
        try (Closer closer = Closer.create();){
            for (ClusterMemoryPool pool : this.pools.values()) {
                closer.register(() -> this.exporter.unexport(ObjectNames.generatedNameOf(ClusterMemoryPool.class, (String)pool.getId().toString())));
            }
            closer.register(this.listenerExecutor::shutdownNow);
        }
    }

    @Managed
    public int getNumberOfLeakedQueries() {
        return this.memoryLeakDetector.getNumberOfLeakedQueries();
    }

    @Managed
    public long getClusterUserMemoryReservation() {
        return this.clusterUserMemoryReservation.get();
    }

    @Managed
    public long getClusterTotalMemoryReservation() {
        return this.clusterTotalMemoryReservation.get();
    }

    @Managed
    public long getClusterMemoryBytes() {
        return this.clusterMemoryBytes.get();
    }

    @Managed
    public long getQueriesKilledDueToOutOfMemory() {
        return this.queriesKilledDueToOutOfMemory.get();
    }
}

