/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.execution;

import com.facebook.airlift.concurrent.ThreadPoolExecutorMBean;
import com.facebook.airlift.concurrent.Threads;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.node.NodeInfo;
import com.facebook.airlift.stats.CounterStat;
import com.facebook.airlift.stats.GcMonitor;
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.event.SplitMonitor;
import com.facebook.presto.execution.LocationFactory;
import com.facebook.presto.execution.SqlTask;
import com.facebook.presto.execution.SqlTaskExecutionFactory;
import com.facebook.presto.execution.SqlTaskIoStats;
import com.facebook.presto.execution.StateMachine;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.TaskManagementExecutor;
import com.facebook.presto.execution.TaskManager;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.execution.TaskMetadataContext;
import com.facebook.presto.execution.TaskSource;
import com.facebook.presto.execution.TaskState;
import com.facebook.presto.execution.TaskStatus;
import com.facebook.presto.execution.buffer.BufferResult;
import com.facebook.presto.execution.buffer.OutputBuffers;
import com.facebook.presto.execution.buffer.SpoolingOutputBufferFactory;
import com.facebook.presto.execution.executor.TaskExecutor;
import com.facebook.presto.execution.scheduler.TableWriteInfo;
import com.facebook.presto.memory.LocalMemoryManager;
import com.facebook.presto.memory.MemoryPool;
import com.facebook.presto.memory.MemoryPoolAssignment;
import com.facebook.presto.memory.MemoryPoolAssignmentsRequest;
import com.facebook.presto.memory.NodeMemoryConfig;
import com.facebook.presto.memory.QueryContext;
import com.facebook.presto.metadata.MetadataUpdates;
import com.facebook.presto.operator.ExchangeClientSupplier;
import com.facebook.presto.operator.FragmentResultCacheManager;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.connector.ConnectorMetadataUpdater;
import com.facebook.presto.spiller.LocalSpillManager;
import com.facebook.presto.spiller.NodeSpillConfig;
import com.facebook.presto.sql.gen.OrderingCompiler;
import com.facebook.presto.sql.planner.LocalExecutionPlanner;
import com.facebook.presto.sql.planner.PlanFragment;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject;
import org.joda.time.DateTime;
import org.joda.time.ReadableInstant;
import org.weakref.jmx.Flatten;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

public class SqlTaskManager
implements TaskManager,
Closeable {
    private static final Logger log = Logger.get(SqlTaskManager.class);
    private final ExecutorService taskNotificationExecutor;
    private final ThreadPoolExecutorMBean taskNotificationExecutorMBean;
    private final ScheduledExecutorService taskManagementExecutor;
    private final ScheduledExecutorService driverYieldExecutor;
    private final Duration infoCacheTime;
    private final Duration clientTimeout;
    private final LocalMemoryManager localMemoryManager;
    private final LoadingCache<QueryId, QueryContext> queryContexts;
    private final LoadingCache<TaskId, SqlTask> tasks;
    private final SqlTaskIoStats cachedStats = new SqlTaskIoStats();
    private final SqlTaskIoStats finishedTaskStats = new SqlTaskIoStats();
    @GuardedBy(value="this")
    private final Map<String, Long> currentMemoryPoolAssignmentVersions = new Object2LongOpenHashMap();
    private final CounterStat failedTasks = new CounterStat();

    @Inject
    public SqlTaskManager(LocalExecutionPlanner planner, LocationFactory locationFactory, TaskExecutor taskExecutor, SplitMonitor splitMonitor, NodeInfo nodeInfo, LocalMemoryManager localMemoryManager, TaskManagementExecutor taskManagementExecutor, TaskManagerConfig config, NodeMemoryConfig nodeMemoryConfig, LocalSpillManager localSpillManager, ExchangeClientSupplier exchangeClientSupplier, NodeSpillConfig nodeSpillConfig, GcMonitor gcMonitor, BlockEncodingSerde blockEncodingSerde, OrderingCompiler orderingCompiler, FragmentResultCacheManager fragmentResultCacheManager, ObjectMapper objectMapper, SpoolingOutputBufferFactory spoolingOutputBufferFactory) {
        Objects.requireNonNull(nodeInfo, "nodeInfo is null");
        Objects.requireNonNull(config, "config is null");
        this.infoCacheTime = config.getInfoMaxAge();
        this.clientTimeout = config.getClientTimeout();
        DataSize maxBufferSize = config.getSinkMaxBufferSize();
        this.taskNotificationExecutor = Executors.newFixedThreadPool(config.getTaskNotificationThreads(), Threads.threadsNamed((String)"task-notification-%s"));
        this.taskNotificationExecutorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor)this.taskNotificationExecutor);
        this.taskManagementExecutor = Objects.requireNonNull(taskManagementExecutor, "taskManagementExecutor cannot be null").getExecutor();
        this.driverYieldExecutor = Executors.newScheduledThreadPool(config.getTaskYieldThreads(), Threads.threadsNamed((String)"task-yield-%s"));
        SqlTaskExecutionFactory sqlTaskExecutionFactory = new SqlTaskExecutionFactory(this.taskNotificationExecutor, taskExecutor, planner, blockEncodingSerde, orderingCompiler, splitMonitor, config);
        this.localMemoryManager = Objects.requireNonNull(localMemoryManager, "localMemoryManager is null");
        DataSize maxQueryUserMemoryPerNode = nodeMemoryConfig.getMaxQueryMemoryPerNode();
        DataSize maxQueryTotalMemoryPerNode = nodeMemoryConfig.getMaxQueryTotalMemoryPerNode();
        DataSize maxQuerySpillPerNode = nodeSpillConfig.getQueryMaxSpillPerNode();
        DataSize maxRevocableMemoryPerNode = nodeSpillConfig.getMaxRevocableMemoryPerNode();
        DataSize maxQueryBroadcastMemory = nodeMemoryConfig.getMaxQueryBroadcastMemory();
        this.queryContexts = CacheBuilder.newBuilder().weakValues().build(CacheLoader.from(queryId -> this.createQueryContext((QueryId)queryId, localMemoryManager, localSpillManager, gcMonitor, maxQueryUserMemoryPerNode, maxQueryTotalMemoryPerNode, maxRevocableMemoryPerNode, maxQuerySpillPerNode, maxQueryBroadcastMemory)));
        Objects.requireNonNull(spoolingOutputBufferFactory, "spoolingOutputBufferFactory is null");
        this.tasks = CacheBuilder.newBuilder().build(CacheLoader.from(taskId -> SqlTask.createSqlTask(taskId, locationFactory.createLocalTaskLocation((TaskId)taskId), nodeInfo.getNodeId(), (QueryContext)this.queryContexts.getUnchecked((Object)taskId.getQueryId()), sqlTaskExecutionFactory, exchangeClientSupplier, this.taskNotificationExecutor, sqlTask -> {
            this.finishedTaskStats.merge(sqlTask.getIoStats());
            return null;
        }, maxBufferSize, this.failedTasks, spoolingOutputBufferFactory)));
    }

    private QueryContext createQueryContext(QueryId queryId, LocalMemoryManager localMemoryManager, LocalSpillManager localSpillManager, GcMonitor gcMonitor, DataSize maxQueryUserMemoryPerNode, DataSize maxQueryTotalMemoryPerNode, DataSize maxRevocableMemoryPerNode, DataSize maxQuerySpillPerNode, DataSize maxQueryBroadcastMemory) {
        return new QueryContext(queryId, maxQueryUserMemoryPerNode, maxQueryTotalMemoryPerNode, maxQueryBroadcastMemory, maxRevocableMemoryPerNode, localMemoryManager.getGeneralPool(), gcMonitor, this.taskNotificationExecutor, this.driverYieldExecutor, maxQuerySpillPerNode, localSpillManager.getSpillSpaceTracker());
    }

    @Override
    public synchronized void updateMemoryPoolAssignments(MemoryPoolAssignmentsRequest assignments) {
        String assignmentCoordinatorId = assignments.getCoordinatorId();
        long assignmentVersion = assignments.getVersion();
        if (assignmentVersion <= this.currentMemoryPoolAssignmentVersions.getOrDefault(assignmentCoordinatorId, Long.MIN_VALUE)) {
            return;
        }
        this.currentMemoryPoolAssignmentVersions.put(assignmentCoordinatorId, assignmentVersion);
        for (MemoryPoolAssignment assignment : assignments.getAssignments()) {
            if (assignment.getPoolId().equals((Object)LocalMemoryManager.GENERAL_POOL)) {
                ((QueryContext)this.queryContexts.getUnchecked((Object)assignment.getQueryId())).setMemoryPool(this.localMemoryManager.getGeneralPool());
                continue;
            }
            if (assignment.getPoolId().equals((Object)LocalMemoryManager.RESERVED_POOL)) {
                MemoryPool reservedPool = this.localMemoryManager.getReservedPool().orElseThrow(() -> new IllegalArgumentException(String.format("Cannot move %s to the reserved pool as the reserved pool is not enabled", assignment.getQueryId())));
                ((QueryContext)this.queryContexts.getUnchecked((Object)assignment.getQueryId())).setMemoryPool(reservedPool);
                continue;
            }
            throw new IllegalArgumentException(String.format("Cannot move %s to %s as the target memory pool id is invalid", assignment.getQueryId(), assignment.getPoolId()));
        }
    }

    @PostConstruct
    public void start() {
        this.taskManagementExecutor.scheduleWithFixedDelay(() -> {
            try {
                this.removeOldTasks();
            }
            catch (Throwable e) {
                log.error(e, "Error removing old tasks");
            }
            try {
                this.failAbandonedTasks();
            }
            catch (Throwable e) {
                log.error(e, "Error canceling abandoned tasks");
            }
        }, 200L, 200L, TimeUnit.MILLISECONDS);
        this.taskManagementExecutor.scheduleWithFixedDelay(() -> {
            try {
                this.updateStats();
            }
            catch (Throwable e) {
                log.error(e, "Error updating stats");
            }
        }, 0L, 1L, TimeUnit.SECONDS);
    }

    @Override
    @PreDestroy
    public void close() {
        boolean taskCanceled = false;
        for (SqlTask task : this.tasks.asMap().values()) {
            if (task.getTaskStatus().getState().isDone()) continue;
            task.failed(new PrestoException((ErrorCodeSupplier)StandardErrorCode.SERVER_SHUTTING_DOWN, String.format("Server is shutting down. Task %s has been canceled", task.getTaskId())));
            taskCanceled = true;
        }
        if (taskCanceled) {
            try {
                TimeUnit.SECONDS.sleep(5L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        this.taskNotificationExecutor.shutdownNow();
    }

    @Managed
    @Flatten
    public SqlTaskIoStats getIoStats() {
        return this.cachedStats;
    }

    @Managed(description="Task notification executor")
    @Nested
    public ThreadPoolExecutorMBean getTaskNotificationExecutor() {
        return this.taskNotificationExecutorMBean;
    }

    @Managed(description="Failed tasks counter")
    @Nested
    public CounterStat getFailedTasks() {
        return this.failedTasks;
    }

    public List<SqlTask> getAllTasks() {
        return ImmutableList.copyOf(this.tasks.asMap().values());
    }

    public SqlTask getTask(TaskId taskId) {
        return (SqlTask)this.tasks.getUnchecked((Object)taskId);
    }

    @Override
    public List<TaskInfo> getAllTaskInfo() {
        return ImmutableList.copyOf((Iterable)Iterables.transform(this.tasks.asMap().values(), SqlTask::getTaskInfo));
    }

    @Override
    public TaskInfo getTaskInfo(TaskId taskId) {
        Objects.requireNonNull(taskId, "taskId is null");
        SqlTask sqlTask = (SqlTask)this.tasks.getUnchecked((Object)taskId);
        sqlTask.recordHeartbeat();
        return sqlTask.getTaskInfo();
    }

    @Override
    public TaskStatus getTaskStatus(TaskId taskId) {
        Objects.requireNonNull(taskId, "taskId is null");
        SqlTask sqlTask = (SqlTask)this.tasks.getUnchecked((Object)taskId);
        sqlTask.recordHeartbeat();
        return sqlTask.getTaskStatus();
    }

    @Override
    public ListenableFuture<TaskInfo> getTaskInfo(TaskId taskId, TaskState currentState) {
        Objects.requireNonNull(taskId, "taskId is null");
        Objects.requireNonNull(currentState, "currentState is null");
        SqlTask sqlTask = (SqlTask)this.tasks.getUnchecked((Object)taskId);
        sqlTask.recordHeartbeat();
        return sqlTask.getTaskInfo(currentState);
    }

    @Override
    public String getTaskInstanceId(TaskId taskId) {
        SqlTask sqlTask = (SqlTask)this.tasks.getUnchecked((Object)taskId);
        sqlTask.recordHeartbeat();
        return sqlTask.getTaskInstanceId();
    }

    @Override
    public ListenableFuture<TaskStatus> getTaskStatus(TaskId taskId, TaskState currentState) {
        Objects.requireNonNull(taskId, "taskId is null");
        Objects.requireNonNull(currentState, "currentState is null");
        SqlTask sqlTask = (SqlTask)this.tasks.getUnchecked((Object)taskId);
        sqlTask.recordHeartbeat();
        return sqlTask.getTaskStatus(currentState);
    }

    @Override
    public TaskInfo updateTask(Session session, TaskId taskId, Optional<PlanFragment> fragment, List<TaskSource> sources, OutputBuffers outputBuffers, Optional<TableWriteInfo> tableWriteInfo) {
        Objects.requireNonNull(session, "session is null");
        Objects.requireNonNull(taskId, "taskId is null");
        Objects.requireNonNull(fragment, "fragment is null");
        Objects.requireNonNull(sources, "sources is null");
        Objects.requireNonNull(outputBuffers, "outputBuffers is null");
        SqlTask sqlTask = (SqlTask)this.tasks.getUnchecked((Object)taskId);
        QueryContext queryContext = sqlTask.getQueryContext();
        if (!queryContext.isMemoryLimitsInitialized()) {
            if (SystemSessionProperties.resourceOvercommit(session)) {
                queryContext.setResourceOvercommit();
            } else {
                queryContext.setMemoryLimits(SystemSessionProperties.getQueryMaxMemoryPerNode(session), SystemSessionProperties.getQueryMaxTotalMemoryPerNode(session), SystemSessionProperties.getQueryMaxBroadcastMemory(session));
            }
        }
        sqlTask.recordHeartbeat();
        return sqlTask.updateTask(session, fragment, sources, outputBuffers, tableWriteInfo);
    }

    @Override
    public void updateMetadataResults(TaskId taskId, MetadataUpdates metadataUpdates) {
        TaskMetadataContext metadataContext = ((SqlTask)this.tasks.getUnchecked((Object)taskId)).getTaskMetadataContext();
        for (ConnectorMetadataUpdater metadataUpdater : metadataContext.getMetadataUpdaters()) {
            metadataUpdater.setMetadataUpdateResults(metadataUpdates.getMetadataUpdates());
        }
    }

    @Override
    public ListenableFuture<BufferResult> getTaskResults(TaskId taskId, OutputBuffers.OutputBufferId bufferId, long startingSequenceId, DataSize maxSize) {
        Objects.requireNonNull(taskId, "taskId is null");
        Objects.requireNonNull(bufferId, "bufferId is null");
        Preconditions.checkArgument((startingSequenceId >= 0L ? 1 : 0) != 0, (Object)"startingSequenceId is negative");
        Objects.requireNonNull(maxSize, "maxSize is null");
        return ((SqlTask)this.tasks.getUnchecked((Object)taskId)).getTaskResults(bufferId, startingSequenceId, maxSize);
    }

    @Override
    public void acknowledgeTaskResults(TaskId taskId, OutputBuffers.OutputBufferId bufferId, long sequenceId) {
        Objects.requireNonNull(taskId, "taskId is null");
        Objects.requireNonNull(bufferId, "bufferId is null");
        Preconditions.checkArgument((sequenceId >= 0L ? 1 : 0) != 0, (Object)"sequenceId is negative");
        ((SqlTask)this.tasks.getUnchecked((Object)taskId)).acknowledgeTaskResults(bufferId, sequenceId);
    }

    @Override
    public TaskInfo abortTaskResults(TaskId taskId, OutputBuffers.OutputBufferId bufferId) {
        Objects.requireNonNull(taskId, "taskId is null");
        Objects.requireNonNull(bufferId, "bufferId is null");
        return ((SqlTask)this.tasks.getUnchecked((Object)taskId)).abortTaskResults(bufferId);
    }

    @Override
    public void removeRemoteSource(TaskId taskId, TaskId remoteSourceTaskId) {
        Objects.requireNonNull(taskId, "taskId is null");
        Objects.requireNonNull(remoteSourceTaskId, "remoteSourceTaskId is null");
        ((SqlTask)this.tasks.getUnchecked((Object)taskId)).removeRemoteSource(remoteSourceTaskId);
    }

    @Override
    public TaskInfo cancelTask(TaskId taskId) {
        Objects.requireNonNull(taskId, "taskId is null");
        return ((SqlTask)this.tasks.getUnchecked((Object)taskId)).cancel();
    }

    @Override
    public TaskInfo abortTask(TaskId taskId) {
        Objects.requireNonNull(taskId, "taskId is null");
        return ((SqlTask)this.tasks.getUnchecked((Object)taskId)).abort();
    }

    public void removeOldTasks() {
        DateTime oldestAllowedTask = DateTime.now().minus(this.infoCacheTime.toMillis());
        for (TaskInfo taskInfo : Iterables.filter((Iterable)Iterables.transform(this.tasks.asMap().values(), SqlTask::getTaskInfo), (Predicate)Predicates.notNull())) {
            TaskId taskId = taskInfo.getTaskId();
            try {
                DateTime endTime = taskInfo.getStats().getEndTime();
                if (endTime == null || !endTime.isBefore((ReadableInstant)oldestAllowedTask)) continue;
                this.tasks.asMap().remove(taskId);
            }
            catch (RuntimeException e) {
                log.warn((Throwable)e, "Error while inspecting age of complete task %s", new Object[]{taskId});
            }
        }
    }

    public void failAbandonedTasks() {
        DateTime now = DateTime.now();
        DateTime oldestAllowedHeartbeat = now.minus(this.clientTimeout.toMillis());
        for (SqlTask sqlTask : this.tasks.asMap().values()) {
            try {
                DateTime lastHeartbeat;
                TaskInfo taskInfo = sqlTask.getTaskInfo();
                TaskStatus taskStatus = taskInfo.getTaskStatus();
                if (taskStatus.getState().isDone() || (lastHeartbeat = taskInfo.getLastHeartbeat()) == null || !lastHeartbeat.isBefore((ReadableInstant)oldestAllowedHeartbeat)) continue;
                log.info("Failing abandoned task %s", new Object[]{taskInfo.getTaskId()});
                sqlTask.failed(new PrestoException((ErrorCodeSupplier)StandardErrorCode.ABANDONED_TASK, String.format("Task %s has not been accessed since %s: currentTime %s", taskInfo.getTaskId(), lastHeartbeat, now)));
            }
            catch (RuntimeException e) {
                log.warn((Throwable)e, "Error while inspecting age of task %s", new Object[]{sqlTask.getTaskId()});
            }
        }
    }

    private void updateStats() {
        SqlTaskIoStats tempIoStats = new SqlTaskIoStats();
        tempIoStats.merge(this.finishedTaskStats);
        this.tasks.asMap().values().stream().filter(task -> !task.getTaskState().isDone()).forEach(task -> tempIoStats.merge(task.getIoStats()));
        this.cachedStats.resetTo(tempIoStats);
    }

    @Override
    public void addStateChangeListener(TaskId taskId, StateMachine.StateChangeListener<TaskState> stateChangeListener) {
        Objects.requireNonNull(taskId, "taskId is null");
        ((SqlTask)this.tasks.getUnchecked((Object)taskId)).addStateChangeListener(stateChangeListener);
    }

    public QueryContext getQueryContext(QueryId queryId) {
        return (QueryContext)this.queryContexts.getUnchecked((Object)queryId);
    }
}

