/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.memory;

import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.GarbageCollectionNotificationInfo;
import com.facebook.presto.execution.SqlTask;
import com.facebook.presto.execution.SqlTaskManager;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.memory.HighMemoryTaskKillerStrategy;
import com.facebook.presto.operator.TaskStats;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.StandardErrorCode;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ListMultimap;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.management.JMException;
import javax.management.Notification;
import javax.management.NotificationListener;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;

public class HighMemoryTaskKiller {
    private static final Logger log = Logger.get(HighMemoryTaskKiller.class);
    private static final String GC_NOTIFICATION_TYPE = "com.sun.management.gc.notification";
    private static final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
    private final NotificationListener gcNotificationListener = (notification, ignored) -> this.onGCNotification(notification);
    private final SqlTaskManager sqlTaskManager;
    private final HighMemoryTaskKillerStrategy taskKillerStrategy;
    private final boolean taskKillerEnabled;
    private final Duration taskKillerFrequentFullGCDurationThreshold;
    private Duration lastFullGCTimestamp;
    private long lastFullGCCollectedBytes;
    private final long reclaimMemoryThreshold;
    private final long heapMemoryThreshold;
    Ticker ticker;

    @Inject
    public HighMemoryTaskKiller(SqlTaskManager sqlTaskManager, TaskManagerConfig taskManagerConfig) {
        Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null");
        this.sqlTaskManager = Objects.requireNonNull(sqlTaskManager, "sqlTaskManager must not be null");
        this.taskKillerStrategy = taskManagerConfig.getHighMemoryTaskKillerStrategy();
        this.taskKillerEnabled = taskManagerConfig.isHighMemoryTaskKillerEnabled();
        this.taskKillerFrequentFullGCDurationThreshold = taskManagerConfig.getHighMemoryTaskKillerFrequentFullGCDurationThreshold();
        this.reclaimMemoryThreshold = (long)((double)memoryMXBean.getHeapMemoryUsage().getMax() * taskManagerConfig.getHighMemoryTaskKillerGCReclaimMemoryThreshold());
        this.heapMemoryThreshold = (long)((double)memoryMXBean.getHeapMemoryUsage().getMax() * taskManagerConfig.getHighMemoryTaskKillerHeapMemoryThreshold());
        this.ticker = Ticker.systemTicker();
    }

    @PostConstruct
    public void start() {
        if (!this.taskKillerEnabled) {
            return;
        }
        for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
            if (mbean.getName().equals("TestingMBeanServer")) continue;
            ObjectName objectName = mbean.getObjectName();
            try {
                ManagementFactory.getPlatformMBeanServer().addNotificationListener(objectName, this.gcNotificationListener, null, null);
            }
            catch (JMException e) {
                throw new RuntimeException("Unable to add listener", e);
            }
        }
    }

    @PreDestroy
    public void stop() {
        if (!this.taskKillerEnabled) {
            return;
        }
        for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
            ObjectName objectName = mbean.getObjectName();
            try {
                ManagementFactory.getPlatformMBeanServer().removeNotificationListener(objectName, this.gcNotificationListener);
            }
            catch (JMException ignored) {
                log.error("Error removing notification: " + ignored);
            }
        }
    }

    private void onGCNotification(Notification notification) {
        List<SqlTask> activeTasks;
        ListMultimap activeQueriesToTasksMap;
        Optional<QueryId> queryId;
        GarbageCollectionNotificationInfo info;
        if (GC_NOTIFICATION_TYPE.equals(notification.getType()) && (info = new GarbageCollectionNotificationInfo((CompositeData)notification.getUserData())).isMajorGc() && this.shouldTriggerTaskKiller(info) && (queryId = HighMemoryTaskKiller.getMaxMemoryConsumingQuery((ListMultimap<QueryId, SqlTask>)(activeQueriesToTasksMap = (ListMultimap)(activeTasks = this.getActiveTasks()).stream().collect(ImmutableListMultimap.toImmutableListMultimap(task -> task.getQueryContext().getQueryId(), Function.identity()))))).isPresent()) {
            List activeTasksToKill = activeQueriesToTasksMap.get((Object)queryId.get());
            for (SqlTask sqlTask : activeTasksToKill) {
                TaskStats taskStats = sqlTask.getTaskInfo().getStats();
                sqlTask.failed(new PrestoException((ErrorCodeSupplier)StandardErrorCode.EXCEEDED_HEAP_MEMORY_LIMIT, String.format("Worker heap memory limit exceeded: User Memory: %d, System Memory: %d, Revocable Memory: %d", taskStats.getUserMemoryReservationInBytes(), taskStats.getSystemMemoryReservationInBytes(), taskStats.getRevocableMemoryReservationInBytes())));
            }
        }
    }

    private boolean shouldTriggerTaskKiller(GarbageCollectionNotificationInfo info) {
        boolean triggerTaskKiller = false;
        DataSize beforeGcDataSize = info.getBeforeGcTotal();
        DataSize afterGcDataSize = info.getAfterGcTotal();
        if (this.taskKillerStrategy == HighMemoryTaskKillerStrategy.FREE_MEMORY_ON_FREQUENT_FULL_GC) {
            long currentGarbageCollectedBytes = beforeGcDataSize.toBytes() - afterGcDataSize.toBytes();
            Duration currentFullGCTimestamp = new Duration((double)this.ticker.read(), TimeUnit.NANOSECONDS);
            if (this.isFrequentFullGC(this.lastFullGCTimestamp, currentFullGCTimestamp) && !this.hasFullGCFreedEnoughBytes(currentGarbageCollectedBytes)) {
                triggerTaskKiller = true;
            }
            this.lastFullGCTimestamp = currentFullGCTimestamp;
            this.lastFullGCCollectedBytes = currentGarbageCollectedBytes;
        } else if (this.taskKillerStrategy == HighMemoryTaskKillerStrategy.FREE_MEMORY_ON_FULL_GC && this.isLowMemory() && beforeGcDataSize.toBytes() - afterGcDataSize.toBytes() < this.reclaimMemoryThreshold) {
            triggerTaskKiller = true;
        }
        log.debug("Task Killer Trigger: " + triggerTaskKiller + ", Before Full GC Head Size: " + beforeGcDataSize.toBytes() + " After Full GC Heap Size: " + afterGcDataSize.toBytes());
        return triggerTaskKiller;
    }

    private List<SqlTask> getActiveTasks() {
        return (List)this.sqlTaskManager.getAllTasks().stream().filter(task -> !task.getTaskState().isDone()).collect(ImmutableList.toImmutableList());
    }

    public static Optional<QueryId> getMaxMemoryConsumingQuery(ListMultimap<QueryId, SqlTask> queryIDToSqlTaskMap) {
        if (queryIDToSqlTaskMap.isEmpty()) {
            return Optional.empty();
        }
        Comparator<Map.Entry> comparator = Comparator.comparingLong(Map.Entry::getValue);
        Optional<QueryId> maxMemoryConsumpingQueryId = queryIDToSqlTaskMap.asMap().entrySet().stream().map(entry -> new AbstractMap.SimpleEntry(entry.getKey(), ((Collection)entry.getValue()).stream().map(SqlTask::getTaskInfo).map(TaskInfo::getStats).mapToLong(stats -> stats.getUserMemoryReservationInBytes() + stats.getSystemMemoryReservationInBytes() + stats.getRevocableMemoryReservationInBytes()).sum())).max(comparator).map(Map.Entry::getKey);
        return maxMemoryConsumpingQueryId;
    }

    private boolean isFrequentFullGC(Duration lastFullGCTime, Duration currentFullGCTime) {
        long diffBetweenFullGCMilis = currentFullGCTime.toMillis() - lastFullGCTime.toMillis();
        log.debug("Time difference between last 2 full GC in miliseconds: " + diffBetweenFullGCMilis);
        if ((double)diffBetweenFullGCMilis > this.taskKillerFrequentFullGCDurationThreshold.getValue(TimeUnit.MILLISECONDS)) {
            log.debug("Skip killing tasks Due to full GCs were not happening frequently.");
            return false;
        }
        return true;
    }

    private boolean hasFullGCFreedEnoughBytes(long currentGarbageCollectedBytes) {
        if (currentGarbageCollectedBytes < this.reclaimMemoryThreshold && this.lastFullGCCollectedBytes < this.reclaimMemoryThreshold) {
            log.debug("Full GC not able to free enough memory. Current freed bytes: " + currentGarbageCollectedBytes + " previously freed bytes: " + this.lastFullGCCollectedBytes);
            return false;
        }
        log.debug("Full GC able to free enough memory. Current freed bytes: " + currentGarbageCollectedBytes + " previously freed bytes: " + this.lastFullGCCollectedBytes);
        return true;
    }

    private boolean isLowMemory() {
        MemoryUsage memoryUsage = memoryMXBean.getHeapMemoryUsage();
        return memoryUsage.getUsed() > this.heapMemoryThreshold;
    }
}

