/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.execution;

import com.facebook.presto.OutputBuffers;
import com.facebook.presto.Session;
import com.facebook.presto.TaskSource;
import com.facebook.presto.event.query.QueryMonitor;
import com.facebook.presto.execution.AbandonedException;
import com.facebook.presto.execution.BufferResult;
import com.facebook.presto.execution.LocationFactory;
import com.facebook.presto.execution.SqlTask;
import com.facebook.presto.execution.SqlTaskExecutionFactory;
import com.facebook.presto.execution.SqlTaskIoStats;
import com.facebook.presto.execution.TaskExecutor;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.TaskManager;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.execution.TaskState;
import com.facebook.presto.sql.planner.LocalExecutionPlanner;
import com.facebook.presto.sql.planner.PlanFragment;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.ThreadPoolExecutorMBean;
import io.airlift.concurrent.Threads;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import org.joda.time.DateTime;
import org.joda.time.ReadableInstant;
import org.weakref.jmx.Flatten;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

public class SqlTaskManager
implements TaskManager,
Closeable {
    private static final Logger log = Logger.get(SqlTaskManager.class);
    private final ExecutorService taskNotificationExecutor;
    private final ThreadPoolExecutorMBean taskNotificationExecutorMBean;
    private final ScheduledExecutorService taskManagementExecutor;
    private final ThreadPoolExecutorMBean taskManagementExecutorMBean;
    private final Duration infoCacheTime;
    private final Duration clientTimeout;
    private final LoadingCache<TaskId, SqlTask> tasks;
    private final SqlTaskIoStats cachedStats = new SqlTaskIoStats();
    private final SqlTaskIoStats finishedTaskStats = new SqlTaskIoStats();

    @Inject
    public SqlTaskManager(LocalExecutionPlanner planner, final LocationFactory locationFactory, TaskExecutor taskExecutor, QueryMonitor queryMonitor, TaskManagerConfig config) {
        Preconditions.checkNotNull((Object)config, (Object)"config is null");
        this.infoCacheTime = config.getInfoMaxAge();
        this.clientTimeout = config.getClientTimeout();
        final DataSize maxBufferSize = config.getSinkMaxBufferSize();
        this.taskNotificationExecutor = Executors.newCachedThreadPool(Threads.threadsNamed((String)"task-notification-%d"));
        this.taskNotificationExecutorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor)this.taskNotificationExecutor);
        this.taskManagementExecutor = Executors.newScheduledThreadPool(5, Threads.threadsNamed((String)"task-management-%d"));
        this.taskManagementExecutorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor)((Object)this.taskManagementExecutor));
        final SqlTaskExecutionFactory sqlTaskExecutionFactory = new SqlTaskExecutionFactory(this.taskNotificationExecutor, taskExecutor, planner, queryMonitor, config);
        this.tasks = CacheBuilder.newBuilder().build((CacheLoader)new CacheLoader<TaskId, SqlTask>(){

            public SqlTask load(TaskId taskId) throws Exception {
                return new SqlTask(taskId, locationFactory.createLocalTaskLocation(taskId), sqlTaskExecutionFactory, SqlTaskManager.this.taskNotificationExecutor, sqlTask -> {
                    SqlTaskManager.this.finishedTaskStats.merge(sqlTask.getIoStats());
                    return null;
                }, maxBufferSize);
            }
        });
    }

    @PostConstruct
    public void start() {
        this.taskManagementExecutor.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    SqlTaskManager.this.removeOldTasks();
                }
                catch (Throwable e) {
                    log.warn(e, "Error removing old tasks");
                }
                try {
                    SqlTaskManager.this.failAbandonedTasks();
                }
                catch (Throwable e) {
                    log.warn(e, "Error canceling abandoned tasks");
                }
            }
        }, 200L, 200L, TimeUnit.MILLISECONDS);
        this.taskManagementExecutor.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    SqlTaskManager.this.updateStats();
                }
                catch (Throwable e) {
                    log.warn(e, "Error updating stats");
                }
            }
        }, 0L, 1L, TimeUnit.SECONDS);
    }

    @Override
    @PreDestroy
    public void close() {
        this.taskNotificationExecutor.shutdownNow();
        this.taskManagementExecutor.shutdownNow();
    }

    @Managed
    @Flatten
    public SqlTaskIoStats getIoStats() {
        return this.cachedStats;
    }

    @Managed(description="Task notification executor")
    @Nested
    public ThreadPoolExecutorMBean getTaskNotificationExecutor() {
        return this.taskNotificationExecutorMBean;
    }

    @Managed(description="Task garbage collector executor")
    @Nested
    public ThreadPoolExecutorMBean getTaskManagementExecutor() {
        return this.taskManagementExecutorMBean;
    }

    @Override
    public List<TaskInfo> getAllTaskInfo() {
        return ImmutableList.copyOf((Iterable)Iterables.transform(this.tasks.asMap().values(), SqlTask::getTaskInfo));
    }

    @Override
    public TaskInfo getTaskInfo(TaskId taskId) {
        Preconditions.checkNotNull((Object)taskId, (Object)"taskId is null");
        return ((SqlTask)this.tasks.getUnchecked((Object)taskId)).getTaskInfo();
    }

    @Override
    public ListenableFuture<TaskInfo> getTaskInfo(TaskId taskId, TaskState currentState) {
        Preconditions.checkNotNull((Object)taskId, (Object)"taskId is null");
        Preconditions.checkNotNull((Object)((Object)currentState), (Object)"currentState is null");
        return ((SqlTask)this.tasks.getUnchecked((Object)taskId)).getTaskInfo(currentState);
    }

    @Override
    public TaskInfo updateTask(Session session, TaskId taskId, PlanFragment fragment, List<TaskSource> sources, OutputBuffers outputBuffers) {
        Preconditions.checkNotNull((Object)session, (Object)"session is null");
        Preconditions.checkNotNull((Object)taskId, (Object)"taskId is null");
        Preconditions.checkNotNull((Object)fragment, (Object)"fragment is null");
        Preconditions.checkNotNull(sources, (Object)"sources is null");
        Preconditions.checkNotNull((Object)outputBuffers, (Object)"outputBuffers is null");
        return ((SqlTask)this.tasks.getUnchecked((Object)taskId)).updateTask(session, fragment, sources, outputBuffers);
    }

    @Override
    public ListenableFuture<BufferResult> getTaskResults(TaskId taskId, TaskId outputName, long startingSequenceId, DataSize maxSize) {
        Preconditions.checkNotNull((Object)taskId, (Object)"taskId is null");
        Preconditions.checkNotNull((Object)outputName, (Object)"outputName is null");
        Preconditions.checkArgument((startingSequenceId >= 0L ? 1 : 0) != 0, (Object)"startingSequenceId is negative");
        Preconditions.checkNotNull((Object)maxSize, (Object)"maxSize is null");
        return ((SqlTask)this.tasks.getUnchecked((Object)taskId)).getTaskResults(outputName, startingSequenceId, maxSize);
    }

    @Override
    public TaskInfo abortTaskResults(TaskId taskId, TaskId outputId) {
        Preconditions.checkNotNull((Object)taskId, (Object)"taskId is null");
        Preconditions.checkNotNull((Object)outputId, (Object)"outputId is null");
        return ((SqlTask)this.tasks.getUnchecked((Object)taskId)).abortTaskResults(outputId);
    }

    @Override
    public TaskInfo cancelTask(TaskId taskId) {
        Preconditions.checkNotNull((Object)taskId, (Object)"taskId is null");
        return ((SqlTask)this.tasks.getUnchecked((Object)taskId)).cancel();
    }

    @Override
    public TaskInfo abortTask(TaskId taskId) {
        Preconditions.checkNotNull((Object)taskId, (Object)"taskId is null");
        return ((SqlTask)this.tasks.getUnchecked((Object)taskId)).abort();
    }

    public void removeOldTasks() {
        DateTime oldestAllowedTask = DateTime.now().minus(this.infoCacheTime.toMillis());
        for (TaskInfo taskInfo : Iterables.filter((Iterable)Iterables.transform(this.tasks.asMap().values(), SqlTask::getTaskInfo), (Predicate)Predicates.notNull())) {
            try {
                DateTime endTime = taskInfo.getStats().getEndTime();
                if (endTime == null || !endTime.isBefore((ReadableInstant)oldestAllowedTask)) continue;
                this.tasks.asMap().remove(taskInfo.getTaskId());
            }
            catch (RuntimeException e) {
                log.warn((Throwable)e, "Error while inspecting age of complete task %s", new Object[]{taskInfo.getTaskId()});
            }
        }
    }

    public void failAbandonedTasks() {
        DateTime now = DateTime.now();
        DateTime oldestAllowedHeartbeat = now.minus(this.clientTimeout.toMillis());
        for (SqlTask sqlTask : this.tasks.asMap().values()) {
            try {
                DateTime lastHeartbeat;
                TaskInfo taskInfo = sqlTask.getTaskInfo();
                if (taskInfo.getState().isDone() || (lastHeartbeat = taskInfo.getLastHeartbeat()) == null || !lastHeartbeat.isBefore((ReadableInstant)oldestAllowedHeartbeat)) continue;
                log.info("Failing abandoned task %s", new Object[]{taskInfo.getTaskId()});
                sqlTask.failed((Throwable)((Object)new AbandonedException("Task " + taskInfo.getTaskId(), lastHeartbeat, now)));
            }
            catch (RuntimeException e) {
                log.warn((Throwable)e, "Error while inspecting age of task %s", new Object[]{sqlTask.getTaskId()});
            }
        }
    }

    private void updateStats() {
        SqlTaskIoStats tempIoStats = new SqlTaskIoStats();
        tempIoStats.merge(this.finishedTaskStats);
        for (SqlTask task : this.tasks.asMap().values()) {
            if (task.getTaskInfo().getState().isDone()) continue;
            tempIoStats.merge(task.getIoStats());
        }
        this.cachedStats.resetTo(tempIoStats);
    }
}

