/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.api.resources;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiKeyAuthDefinition;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.common.minion.BaseTaskGeneratorInfo;
import org.apache.pinot.common.minion.TaskManagerStatusCache;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.api.access.Authenticate;
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.api.exception.NoTaskScheduledException;
import org.apache.pinot.controller.api.exception.TaskAlreadyExistsException;
import org.apache.pinot.controller.api.exception.UnknownTaskTypeException;
import org.apache.pinot.controller.api.resources.StringResultResponse;
import org.apache.pinot.controller.api.resources.SuccessResponse;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.util.CompletionServiceHelper;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.task.AdhocTaskConfig;
import org.apache.pinot.spi.utils.JsonUtils;
import org.glassfish.grizzly.http.server.Request;
import org.glassfish.jersey.server.ManagedAsync;
import org.quartz.CronTrigger;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerMetaData;
import org.quartz.SimpleTrigger;
import org.quartz.Trigger;
import org.quartz.impl.matchers.GroupMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Api(tags={"Task"}, authorizations={@Authorization(value="oauth")})
@SwaggerDefinition(securityDefinition=@SecurityDefinition(apiKeyAuthDefinitions={@ApiKeyAuthDefinition(name="Authorization", in=ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key="oauth")}))
@Path(value="/")
public class PinotTaskRestletResource {
    public static final Logger LOGGER = LoggerFactory.getLogger(PinotTaskRestletResource.class);
    private static final String TASK_QUEUE_STATE_STOP = "STOP";
    private static final String TASK_QUEUE_STATE_RESUME = "RESUME";
    @Inject
    PinotHelixTaskResourceManager _pinotHelixTaskResourceManager;
    @Inject
    PinotTaskManager _pinotTaskManager;
    @Inject
    TaskManagerStatusCache _taskManagerStatusCache;
    @Inject
    PinotHelixResourceManager _pinotHelixResourceManager;
    @Inject
    Executor _executor;
    @Inject
    HttpClientConnectionManager _connectionManager;
    @Inject
    ControllerConf _controllerConf;
    @Context
    private UriInfo _uriInfo;

    @GET
    @Path(value="/tasks/tasktypes")
    @Authorize(targetType=TargetType.CLUSTER, action="GetTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="List all task types")
    public Set<String> listTaskTypes() {
        return this._pinotHelixTaskResourceManager.getTaskTypes();
    }

    @Deprecated
    @GET
    @Path(value="/tasks/taskqueues")
    @Authorize(targetType=TargetType.CLUSTER, action="GetTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="List all task queues (deprecated)")
    public Set<String> getTaskQueues() {
        return this._pinotHelixTaskResourceManager.getTaskQueues();
    }

    @GET
    @Path(value="/tasks/{taskType}/state")
    @Authorize(targetType=TargetType.CLUSTER, action="GetTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="Get the state (task queue state) for the given task type")
    public TaskState getTaskQueueState(@ApiParam(value="Task type", required=true) @PathParam(value="taskType") String taskType) {
        return this._pinotHelixTaskResourceManager.getTaskQueueState(taskType);
    }

    @Deprecated
    @GET
    @Path(value="/tasks/taskqueuestate/{taskType}")
    @Authorize(targetType=TargetType.CLUSTER, action="GetTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="Get the state (task queue state) for the given task type (deprecated)")
    public StringResultResponse getTaskQueueStateDeprecated(@ApiParam(value="Task type", required=true) @PathParam(value="taskType") String taskType) {
        return new StringResultResponse(this._pinotHelixTaskResourceManager.getTaskQueueState(taskType).toString());
    }

    @GET
    @Path(value="/tasks/{taskType}/tasks")
    @Authorize(targetType=TargetType.CLUSTER, action="GetTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="List all tasks for the given task type")
    public Set<String> getTasks(@ApiParam(value="Task type", required=true) @PathParam(value="taskType") String taskType) {
        return this._pinotHelixTaskResourceManager.getTasks(taskType);
    }

    @GET
    @Path(value="/tasks/{taskType}/{tableNameWithType}/state")
    @Authorize(targetType=TargetType.CLUSTER, action="GetTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="List all tasks for the given task type")
    public Map<String, TaskState> getTaskStatesByTable(@ApiParam(value="Task type", required=true) @PathParam(value="taskType") String taskType, @ApiParam(value="Table name with type", required=true) @PathParam(value="tableNameWithType") String tableNameWithType) {
        return this._pinotHelixTaskResourceManager.getTaskStatesByTable(taskType, tableNameWithType);
    }

    @GET
    @Path(value="/tasks/{taskType}/{tableNameWithType}/metadata")
    @Authorize(targetType=TargetType.CLUSTER, action="GetTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="Get task metadata for the given task type and table")
    public String getTaskMetadataByTable(@ApiParam(value="Task type", required=true) @PathParam(value="taskType") String taskType, @ApiParam(value="Table name with type", required=true) @PathParam(value="tableNameWithType") String tableNameWithType) {
        try {
            return this._pinotHelixTaskResourceManager.getTaskMetadataByTable(taskType, tableNameWithType);
        }
        catch (JsonProcessingException e) {
            throw new ControllerApplicationException(LOGGER, String.format("Failed to format task metadata into Json for task type: %s from table: %s", taskType, tableNameWithType), Response.Status.INTERNAL_SERVER_ERROR, (Throwable)e);
        }
    }

    @DELETE
    @Path(value="/tasks/{taskType}/{tableNameWithType}/metadata")
    @Authorize(targetType=TargetType.CLUSTER, action="DeleteTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="Delete task metadata for the given task type and table")
    public SuccessResponse deleteTaskMetadataByTable(@ApiParam(value="Task type", required=true) @PathParam(value="taskType") String taskType, @ApiParam(value="Table name with type", required=true) @PathParam(value="tableNameWithType") String tableNameWithType) {
        this._pinotHelixTaskResourceManager.deleteTaskMetadataByTable(taskType, tableNameWithType);
        return new SuccessResponse(String.format("Successfully deleted metadata for task type: %s from table: %s", taskType, tableNameWithType));
    }

    @GET
    @Path(value="/tasks/{taskType}/taskcounts")
    @Authorize(targetType=TargetType.CLUSTER, action="GetTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="Fetch count of sub-tasks for each of the tasks for the given task type")
    public Map<String, PinotHelixTaskResourceManager.TaskCount> getTaskCounts(@ApiParam(value="Task type", required=true) @PathParam(value="taskType") String taskType) {
        return this._pinotHelixTaskResourceManager.getTaskCounts(taskType);
    }

    @GET
    @Path(value="/tasks/{taskType}/debug")
    @Authorize(targetType=TargetType.CLUSTER, action="DebugTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="Fetch information for all the tasks for the given task type")
    public Map<String, PinotHelixTaskResourceManager.TaskDebugInfo> getTasksDebugInfo(@ApiParam(value="Task type", required=true) @PathParam(value="taskType") String taskType, @ApiParam(value="verbosity (Prints information for all the tasks for the given task type.By default, only prints subtask details for running and error tasks. Value of > 0 prints subtask details for all tasks)") @DefaultValue(value="0") @QueryParam(value="verbosity") int verbosity) {
        return this._pinotHelixTaskResourceManager.getTasksDebugInfo(taskType, verbosity);
    }

    @GET
    @Path(value="/tasks/{taskType}/{tableNameWithType}/debug")
    @Authorize(targetType=TargetType.CLUSTER, action="DebugTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="Fetch information for all the tasks for the given task type and table")
    public Map<String, PinotHelixTaskResourceManager.TaskDebugInfo> getTasksDebugInfo(@ApiParam(value="Task type", required=true) @PathParam(value="taskType") String taskType, @ApiParam(value="Table name with type", required=true) @PathParam(value="tableNameWithType") String tableNameWithType, @ApiParam(value="verbosity (Prints information for all the tasks for the given task type and table.By default, only prints subtask details for running and error tasks. Value of > 0 prints subtask details for all tasks)") @DefaultValue(value="0") @QueryParam(value="verbosity") int verbosity) {
        return this._pinotHelixTaskResourceManager.getTasksDebugInfoByTable(taskType, tableNameWithType, verbosity);
    }

    @GET
    @Produces(value={"application/json"})
    @Path(value="/tasks/generator/{tableNameWithType}/{taskType}/debug")
    @Authorize(targetType=TargetType.CLUSTER, action="GetTask")
    @ApiOperation(value="Fetch task generation information for the recent runs of the given task for the given table")
    public String getTaskGenerationDebugInto(@Context HttpHeaders httpHeaders, @ApiParam(value="Task type", required=true) @PathParam(value="taskType") String taskType, @ApiParam(value="Table name with type", required=true) @PathParam(value="tableNameWithType") String tableNameWithType, @ApiParam(value="Whether to only lookup local cache for logs", defaultValue="false") @QueryParam(value="localOnly") boolean localOnly) throws JsonProcessingException {
        if (localOnly) {
            BaseTaskGeneratorInfo taskGeneratorMostRecentRunInfo = this._taskManagerStatusCache.fetchTaskGeneratorInfo(tableNameWithType, taskType);
            if (taskGeneratorMostRecentRunInfo == null) {
                throw new ControllerApplicationException(LOGGER, "Task generation information not found", Response.Status.NOT_FOUND);
            }
            return JsonUtils.objectToString((Object)taskGeneratorMostRecentRunInfo);
        }
        List<InstanceConfig> controllers = this._pinotHelixResourceManager.getAllControllerInstanceConfigs();
        URI uri = this._uriInfo.getRequestUri();
        String scheme = uri.getScheme();
        List<String> controllerUrls = controllers.stream().map(controller -> String.format("%s://%s:%d/tasks/generator/%s/%s/debug?localOnly=true", scheme, controller.getHostName(), Integer.parseInt(controller.getPort()), tableNameWithType, taskType)).collect(Collectors.toList());
        CompletionServiceHelper completionServiceHelper = new CompletionServiceHelper(this._executor, this._connectionManager, (BiMap<String, String>)HashBiMap.create((int)0));
        HashMap<String, String> requestHeaders = new HashMap<String, String>();
        httpHeaders.getRequestHeaders().keySet().forEach(header -> requestHeaders.put((String)header, httpHeaders.getHeaderString(header)));
        LOGGER.debug("Getting task generation info with controllerUrls: {}", controllerUrls);
        CompletionServiceHelper.CompletionServiceResponse serviceResponse = completionServiceHelper.doMultiGetRequest(controllerUrls, null, true, requestHeaders, 10000);
        ArrayList result = new ArrayList();
        serviceResponse._httpResponses.values().forEach(resp -> {
            try {
                result.add(JsonUtils.stringToJsonNode((String)resp));
            }
            catch (IOException e) {
                LOGGER.error("Failed to parse controller response {}", resp, (Object)e);
            }
        });
        return JsonUtils.objectToString(result);
    }

    @GET
    @Path(value="/tasks/task/{taskName}/debug")
    @Authorize(targetType=TargetType.CLUSTER, action="DebugTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="Fetch information for the given task name")
    public PinotHelixTaskResourceManager.TaskDebugInfo getTaskDebugInfo(@ApiParam(value="Task name", required=true) @PathParam(value="taskName") String taskName, @ApiParam(value="verbosity (Prints information for the given task name.By default, only prints subtask details for running and error tasks. Value of > 0 prints subtask details for all tasks)") @DefaultValue(value="0") @QueryParam(value="verbosity") int verbosity) {
        return this._pinotHelixTaskResourceManager.getTaskDebugInfo(taskName, verbosity);
    }

    @Deprecated
    @GET
    @Path(value="/tasks/tasks/{taskType}")
    @Authorize(targetType=TargetType.CLUSTER, action="GetTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="List all tasks for the given task type (deprecated)")
    public Set<String> getTasksDeprecated(@ApiParam(value="Task type", required=true) @PathParam(value="taskType") String taskType) {
        return this._pinotHelixTaskResourceManager.getTasks(taskType);
    }

    @GET
    @Path(value="/tasks/{taskType}/taskstates")
    @Authorize(targetType=TargetType.CLUSTER, action="GetTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="Get a map from task to task state for the given task type")
    public Map<String, TaskState> getTaskStates(@ApiParam(value="Task type", required=true) @PathParam(value="taskType") String taskType) {
        return this._pinotHelixTaskResourceManager.getTaskStates(taskType);
    }

    @Deprecated
    @GET
    @Path(value="/tasks/taskstates/{taskType}")
    @Authorize(targetType=TargetType.CLUSTER, action="GetTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="Get a map from task to task state for the given task type (deprecated)")
    public Map<String, TaskState> getTaskStatesDeprecated(@ApiParam(value="Task type", required=true) @PathParam(value="taskType") String taskType) {
        return this._pinotHelixTaskResourceManager.getTaskStates(taskType);
    }

    @GET
    @Path(value="/tasks/task/{taskName}/state")
    @Authorize(targetType=TargetType.CLUSTER, action="GetTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="Get the task state for the given task")
    public TaskState getTaskState(@ApiParam(value="Task name", required=true) @PathParam(value="taskName") String taskName) {
        return this._pinotHelixTaskResourceManager.getTaskState(taskName);
    }

    @Deprecated
    @GET
    @Path(value="/tasks/taskstate/{taskName}")
    @Authorize(targetType=TargetType.CLUSTER, action="GetTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="Get the task state for the given task (deprecated)")
    public StringResultResponse getTaskStateDeprecated(@ApiParam(value="Task name", required=true) @PathParam(value="taskName") String taskName) {
        return new StringResultResponse(String.valueOf(this._pinotHelixTaskResourceManager.getTaskState(taskName)));
    }

    @GET
    @Path(value="/tasks/subtask/{taskName}/state")
    @Authorize(targetType=TargetType.CLUSTER, action="GetTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="Get the states of all the sub tasks for the given task")
    public Map<String, TaskPartitionState> getSubtaskStates(@ApiParam(value="Task name", required=true) @PathParam(value="taskName") String taskName) {
        return this._pinotHelixTaskResourceManager.getSubtaskStates(taskName);
    }

    @GET
    @Path(value="/tasks/task/{taskName}/config")
    @Authorize(targetType=TargetType.CLUSTER, action="GetTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="Get the task config (a list of child task configs) for the given task")
    public List<PinotTaskConfig> getTaskConfigs(@ApiParam(value="Task name", required=true) @PathParam(value="taskName") String taskName) {
        return this._pinotHelixTaskResourceManager.getSubtaskConfigs(taskName);
    }

    @GET
    @Path(value="/tasks/task/{taskName}/runtime/config")
    @Authorize(targetType=TargetType.CLUSTER, action="GetTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="Get the task runtime config for the given task")
    public Map<String, String> getTaskConfig(@ApiParam(value="Task name", required=true) @PathParam(value="taskName") String taskName) {
        return this._pinotHelixTaskResourceManager.getTaskRuntimeConfig(taskName);
    }

    @Deprecated
    @GET
    @Path(value="/tasks/taskconfig/{taskName}")
    @Authorize(targetType=TargetType.CLUSTER, action="GetTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="Get the task config (a list of child task configs) for the given task (deprecated)")
    public List<PinotTaskConfig> getTaskConfigsDeprecated(@ApiParam(value="Task name", required=true) @PathParam(value="taskName") String taskName) {
        return this._pinotHelixTaskResourceManager.getSubtaskConfigs(taskName);
    }

    @GET
    @Path(value="/tasks/subtask/{taskName}/config")
    @Authorize(targetType=TargetType.CLUSTER, action="GetTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="Get the configs of specified sub tasks for the given task")
    public Map<String, PinotTaskConfig> getSubtaskConfigs(@ApiParam(value="Task name", required=true) @PathParam(value="taskName") String taskName, @ApiParam(value="Sub task names separated by comma") @QueryParam(value="subtaskNames") @Nullable String subtaskNames) {
        return this._pinotHelixTaskResourceManager.getSubtaskConfigs(taskName, subtaskNames);
    }

    @GET
    @Path(value="/tasks/subtask/{taskName}/progress")
    @Authorize(targetType=TargetType.CLUSTER, action="GetTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="Get progress of specified sub tasks for the given task tracked by minion worker in memory")
    public String getSubtaskProgress(@Context HttpHeaders httpHeaders, @ApiParam(value="Task name", required=true) @PathParam(value="taskName") String taskName, @ApiParam(value="Sub task names separated by comma") @QueryParam(value="subtaskNames") @Nullable String subtaskNames) {
        String scheme = this._uriInfo.getRequestUri().getScheme();
        List<InstanceConfig> workers = this._pinotHelixResourceManager.getAllMinionInstanceConfigs();
        HashMap<String, String> workerEndpoints = new HashMap<String, String>();
        for (InstanceConfig worker : workers) {
            workerEndpoints.put(worker.getId(), String.format("%s://%s:%d", scheme, worker.getHostName(), Integer.parseInt(worker.getPort())));
        }
        HashMap<String, String> requestHeaders = new HashMap<String, String>();
        httpHeaders.getRequestHeaders().keySet().forEach(header -> requestHeaders.put((String)header, httpHeaders.getHeaderString(header)));
        int timeoutMs = this._controllerConf.getMinionAdminRequestTimeoutSeconds() * 1000;
        try {
            Map<String, Object> progress = this._pinotHelixTaskResourceManager.getSubtaskProgress(taskName, subtaskNames, this._executor, this._connectionManager, workerEndpoints, requestHeaders, timeoutMs);
            return JsonUtils.objectToString(progress);
        }
        catch (NoTaskScheduledException | UnknownTaskTypeException e) {
            throw new ControllerApplicationException(LOGGER, "Not task with name: " + taskName, Response.Status.NOT_FOUND, (Throwable)e);
        }
        catch (Exception e) {
            throw new ControllerApplicationException(LOGGER, String.format("Failed to get worker side progress for task: %s due to error: %s", taskName, ExceptionUtils.getStackTrace((Throwable)e)), Response.Status.INTERNAL_SERVER_ERROR, (Throwable)e);
        }
    }

    @GET
    @Path(value="/tasks/subtask/workers/progress")
    @Authorize(targetType=TargetType.CLUSTER, action="GetTask")
    @Produces(value={"application/json"})
    @ApiOperation(value="Get progress of all subtasks with specified state tracked by minion worker in memory")
    @ApiResponses(value={@ApiResponse(code=200, message="Success"), @ApiResponse(code=500, message="Internal server error")})
    public String getSubtaskOnWorkerProgress(@Context HttpHeaders httpHeaders, @ApiParam(value="Subtask state (UNKNOWN,IN_PROGRESS,SUCCEEDED,CANCELLED,ERROR)", required=true) @QueryParam(value="subTaskState") String subTaskState, @ApiParam(value="Minion worker IDs separated by comma") @QueryParam(value="minionWorkerIds") @Nullable String minionWorkerIds) {
        HashSet selectedMinionWorkers = new HashSet();
        if (StringUtils.isNotEmpty((CharSequence)minionWorkerIds)) {
            selectedMinionWorkers.addAll(Arrays.stream(StringUtils.split((String)minionWorkerIds, (char)',')).map(String::trim).collect(Collectors.toList()));
        }
        String scheme = this._uriInfo.getRequestUri().getScheme();
        List<InstanceConfig> allMinionWorkerInstanceConfigs = this._pinotHelixResourceManager.getAllMinionInstanceConfigs();
        HashMap<String, String> selectedMinionWorkerEndpoints = new HashMap<String, String>();
        for (InstanceConfig worker : allMinionWorkerInstanceConfigs) {
            if (!selectedMinionWorkers.isEmpty() && !selectedMinionWorkers.contains(worker.getId())) continue;
            selectedMinionWorkerEndpoints.put(worker.getId(), String.format("%s://%s:%d", scheme, worker.getHostName(), Integer.parseInt(worker.getPort())));
        }
        HashMap<String, String> requestHeaders = new HashMap<String, String>();
        httpHeaders.getRequestHeaders().keySet().forEach(header -> requestHeaders.put((String)header, httpHeaders.getHeaderString(header)));
        int timeoutMs = this._controllerConf.getMinionAdminRequestTimeoutSeconds() * 1000;
        try {
            Map<String, Object> minionWorkerIdSubtaskProgressMap = this._pinotHelixTaskResourceManager.getSubtaskOnWorkerProgress(subTaskState, this._executor, this._connectionManager, selectedMinionWorkerEndpoints, requestHeaders, timeoutMs);
            return JsonUtils.objectToString(minionWorkerIdSubtaskProgressMap);
        }
        catch (Exception e) {
            throw new ControllerApplicationException(LOGGER, String.format("Failed to get minion worker side progress for subtasks with state %s due to error: %s", subTaskState, ExceptionUtils.getStackTrace((Throwable)e)), Response.Status.INTERNAL_SERVER_ERROR, (Throwable)e);
        }
    }

    @GET
    @Path(value="/tasks/scheduler/information")
    @Authorize(targetType=TargetType.CLUSTER, action="GetSchedulerInfo")
    @Produces(value={"application/json"})
    @ApiOperation(value="Fetch cron scheduler information")
    public Map<String, Object> getCronSchedulerInformation() throws SchedulerException {
        Scheduler scheduler = this._pinotTaskManager.getScheduler();
        if (scheduler == null) {
            throw new NotFoundException("Task scheduler is disabled");
        }
        SchedulerMetaData metaData = scheduler.getMetaData();
        HashMap<String, Object> schedulerMetaData = new HashMap<String, Object>();
        schedulerMetaData.put("Version", metaData.getVersion());
        schedulerMetaData.put("SchedulerName", metaData.getSchedulerName());
        schedulerMetaData.put("SchedulerInstanceId", metaData.getSchedulerInstanceId());
        schedulerMetaData.put("getThreadPoolClass", metaData.getThreadPoolClass());
        schedulerMetaData.put("getThreadPoolSize", metaData.getThreadPoolSize());
        schedulerMetaData.put("SchedulerClass", metaData.getSchedulerClass());
        schedulerMetaData.put("Clustered", metaData.isJobStoreClustered());
        schedulerMetaData.put("JobStoreClass", metaData.getJobStoreClass());
        schedulerMetaData.put("NumberOfJobsExecuted", metaData.getNumberOfJobsExecuted());
        schedulerMetaData.put("InStandbyMode", metaData.isInStandbyMode());
        schedulerMetaData.put("RunningSince", metaData.getRunningSince());
        ArrayList jobDetails = new ArrayList();
        for (String groupName : scheduler.getJobGroupNames()) {
            for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.jobGroupEquals((String)groupName))) {
                HashMap<String, Object> jobMap = new HashMap<String, Object>();
                List triggers = scheduler.getTriggersOfJob(jobKey);
                jobMap.put("JobKey", jobKey);
                jobMap.put("NextFireTime", ((Trigger)triggers.get(0)).getNextFireTime());
                jobMap.put("PreviousFireTime", ((Trigger)triggers.get(0)).getPreviousFireTime());
                jobDetails.add(jobMap);
            }
        }
        schedulerMetaData.put("JobDetails", jobDetails);
        return schedulerMetaData;
    }

    @GET
    @Path(value="/tasks/scheduler/jobKeys")
    @Authorize(targetType=TargetType.CLUSTER, action="GetSchedulerInfo")
    @Produces(value={"application/json"})
    @ApiOperation(value="Fetch cron scheduler job keys")
    public List<JobKey> getCronSchedulerJobKeys() throws SchedulerException {
        Scheduler scheduler = this._pinotTaskManager.getScheduler();
        if (scheduler == null) {
            throw new NotFoundException("Task scheduler is disabled");
        }
        ArrayList<JobKey> jobKeys = new ArrayList<JobKey>();
        for (String group : scheduler.getTriggerGroupNames()) {
            jobKeys.addAll(scheduler.getJobKeys(GroupMatcher.groupEquals((String)group)));
        }
        return jobKeys;
    }

    @GET
    @Path(value="/tasks/scheduler/jobDetails")
    @Authorize(targetType=TargetType.CLUSTER, action="GetSchedulerInfo")
    @Produces(value={"application/json"})
    @ApiOperation(value="Fetch cron scheduler job keys")
    public Map<String, Object> getCronSchedulerJobDetails(@ApiParam(value="Table name (with type suffix)") @QueryParam(value="tableName") String tableName, @ApiParam(value="Task type") @QueryParam(value="taskType") String taskType) throws SchedulerException {
        Scheduler scheduler = this._pinotTaskManager.getScheduler();
        if (scheduler == null) {
            throw new NotFoundException("Task scheduler is disabled");
        }
        JobKey jobKey = JobKey.jobKey((String)tableName, (String)taskType);
        if (!scheduler.checkExists(jobKey)) {
            throw new NotFoundException("Unable to find job detail for table name - " + tableName + ", task type - " + taskType);
        }
        JobDetail schedulerJobDetail = scheduler.getJobDetail(jobKey);
        HashMap<String, Object> jobDetail = new HashMap<String, Object>();
        jobDetail.put("JobKey", schedulerJobDetail.getKey());
        jobDetail.put("Description", schedulerJobDetail.getDescription());
        jobDetail.put("JobClass", schedulerJobDetail.getJobClass());
        JobDataMap jobData = schedulerJobDetail.getJobDataMap();
        HashMap<String, String> jobDataMap = new HashMap<String, String>();
        for (String key : jobData.getKeys()) {
            jobDataMap.put(key, jobData.get((Object)key).toString());
        }
        jobDetail.put("JobDataMap", jobDataMap);
        List triggers = scheduler.getTriggersOfJob(jobKey);
        ArrayList triggerMaps = new ArrayList();
        if (!triggers.isEmpty()) {
            for (Trigger trigger : triggers) {
                HashMap<String, Object> triggerMap = new HashMap<String, Object>();
                if (trigger instanceof SimpleTrigger) {
                    SimpleTrigger simpleTrigger = (SimpleTrigger)trigger;
                    triggerMap.put("TriggerType", SimpleTrigger.class.getSimpleName());
                    triggerMap.put("RepeatInterval", simpleTrigger.getRepeatInterval());
                    triggerMap.put("RepeatCount", simpleTrigger.getRepeatCount());
                    triggerMap.put("TimesTriggered", simpleTrigger.getTimesTriggered());
                    triggerMap.put("NextFireTime", simpleTrigger.getNextFireTime());
                    triggerMap.put("PreviousFireTime", simpleTrigger.getPreviousFireTime());
                } else if (trigger instanceof CronTrigger) {
                    CronTrigger cronTrigger = (CronTrigger)trigger;
                    triggerMap.put("TriggerType", CronTrigger.class.getSimpleName());
                    triggerMap.put("TimeZone", cronTrigger.getTimeZone());
                    triggerMap.put("CronExpression", cronTrigger.getCronExpression());
                    triggerMap.put("ExpressionSummary", cronTrigger.getExpressionSummary());
                    triggerMap.put("NextFireTime", cronTrigger.getNextFireTime());
                    triggerMap.put("PreviousFireTime", cronTrigger.getPreviousFireTime());
                }
                triggerMaps.add(triggerMap);
            }
        }
        jobDetail.put("Triggers", triggerMaps);
        return jobDetail;
    }

    @POST
    @Path(value="/tasks/schedule")
    @Authorize(targetType=TargetType.CLUSTER, action="CreateTask")
    @Produces(value={"application/json"})
    @Authenticate(value=AccessType.UPDATE)
    @ApiOperation(value="Schedule tasks and return a map from task type to task name scheduled")
    public Map<String, String> scheduleTasks(@ApiParam(value="Task type") @QueryParam(value="taskType") String taskType, @ApiParam(value="Table name (with type suffix)") @QueryParam(value="tableName") String tableName) {
        if (taskType != null) {
            String taskName = tableName != null ? this._pinotTaskManager.scheduleTask(taskType, tableName) : this._pinotTaskManager.scheduleTask(taskType);
            return Collections.singletonMap(taskType, taskName);
        }
        return tableName != null ? this._pinotTaskManager.scheduleTasks(tableName) : this._pinotTaskManager.scheduleTasks();
    }

    @POST
    @ManagedAsync
    @Produces(value={"application/json"})
    @Path(value="/tasks/execute")
    @Authorize(targetType=TargetType.CLUSTER, action="ExecuteTask")
    @Authenticate(value=AccessType.CREATE)
    @ApiOperation(value="Execute a task on minion")
    public void executeAdhocTask(AdhocTaskConfig adhocTaskConfig, @Suspended AsyncResponse asyncResponse, @Context Request requestContext) {
        try {
            asyncResponse.resume(this._pinotTaskManager.createTask(adhocTaskConfig.getTaskType(), adhocTaskConfig.getTableName(), adhocTaskConfig.getTaskName(), adhocTaskConfig.getTaskConfigs()));
        }
        catch (TableNotFoundException e) {
            throw new ControllerApplicationException(LOGGER, "Failed to find table: " + adhocTaskConfig.getTableName(), Response.Status.NOT_FOUND, (Throwable)e);
        }
        catch (TaskAlreadyExistsException e) {
            throw new ControllerApplicationException(LOGGER, "Task already exists: " + adhocTaskConfig.getTaskName(), Response.Status.CONFLICT, (Throwable)e);
        }
        catch (UnknownTaskTypeException e) {
            throw new ControllerApplicationException(LOGGER, "Unknown task type: " + adhocTaskConfig.getTaskType(), Response.Status.NOT_FOUND, (Throwable)e);
        }
        catch (NoTaskScheduledException e) {
            throw new ControllerApplicationException(LOGGER, "No task is generated for table: " + adhocTaskConfig.getTableName() + ", with task type: " + adhocTaskConfig.getTaskType(), Response.Status.BAD_REQUEST);
        }
        catch (Exception e) {
            throw new ControllerApplicationException(LOGGER, "Failed to create adhoc task: " + ExceptionUtils.getStackTrace((Throwable)e), Response.Status.INTERNAL_SERVER_ERROR, (Throwable)e);
        }
    }

    @Deprecated
    @PUT
    @Path(value="/tasks/scheduletasks")
    @Authorize(targetType=TargetType.CLUSTER, action="CreateTask")
    @Produces(value={"application/json"})
    @Authenticate(value=AccessType.UPDATE)
    @ApiOperation(value="Schedule tasks (deprecated)")
    public Map<String, String> scheduleTasksDeprecated() {
        return this._pinotTaskManager.scheduleTasks();
    }

    @PUT
    @Path(value="/tasks/{taskType}/cleanup")
    @Authorize(targetType=TargetType.CLUSTER, action="CleanupTask")
    @Produces(value={"application/json"})
    @Authenticate(value=AccessType.UPDATE)
    @ApiOperation(value="Clean up finished tasks (COMPLETED, FAILED) for the given task type")
    public SuccessResponse cleanUpTasks(@ApiParam(value="Task type", required=true) @PathParam(value="taskType") String taskType) {
        this._pinotHelixTaskResourceManager.cleanUpTaskQueue(taskType);
        return new SuccessResponse("Successfully cleaned up tasks for task type: " + taskType);
    }

    @Deprecated
    @PUT
    @Path(value="/tasks/cleanuptasks/{taskType}")
    @Authorize(targetType=TargetType.CLUSTER, action="CleanupTask")
    @Produces(value={"application/json"})
    @Authenticate(value=AccessType.UPDATE)
    @ApiOperation(value="Clean up finished tasks (COMPLETED, FAILED) for the given task type (deprecated)")
    public SuccessResponse cleanUpTasksDeprecated(@ApiParam(value="Task type", required=true) @PathParam(value="taskType") String taskType) {
        this._pinotHelixTaskResourceManager.cleanUpTaskQueue(taskType);
        return new SuccessResponse("Successfully cleaned up tasks for task type: " + taskType);
    }

    @PUT
    @Path(value="/tasks/{taskType}/stop")
    @Authorize(targetType=TargetType.CLUSTER, action="StopTask")
    @Produces(value={"application/json"})
    @Authenticate(value=AccessType.UPDATE)
    @ApiOperation(value="Stop all running/pending tasks (as well as the task queue) for the given task type")
    public SuccessResponse stopTasks(@ApiParam(value="Task type", required=true) @PathParam(value="taskType") String taskType) {
        this._pinotHelixTaskResourceManager.stopTaskQueue(taskType);
        return new SuccessResponse("Successfully stopped tasks for task type: " + taskType);
    }

    @PUT
    @Path(value="/tasks/{taskType}/resume")
    @Authorize(targetType=TargetType.CLUSTER, action="ResumeTask")
    @Produces(value={"application/json"})
    @Authenticate(value=AccessType.UPDATE)
    @ApiOperation(value="Resume all stopped tasks (as well as the task queue) for the given task type")
    public SuccessResponse resumeTasks(@ApiParam(value="Task type", required=true) @PathParam(value="taskType") String taskType) {
        this._pinotHelixTaskResourceManager.resumeTaskQueue(taskType);
        return new SuccessResponse("Successfully resumed tasks for task type: " + taskType);
    }

    @Deprecated
    @PUT
    @Path(value="/tasks/taskqueue/{taskType}")
    @Authorize(targetType=TargetType.CLUSTER, action="UpdateTaskQueue")
    @Produces(value={"application/json"})
    @Authenticate(value=AccessType.UPDATE)
    @ApiOperation(value="Stop/resume a task queue (deprecated)")
    public SuccessResponse toggleTaskQueueState(@ApiParam(value="Task type", required=true) @PathParam(value="taskType") String taskType, @ApiParam(value="state", required=true) @QueryParam(value="state") String state) {
        switch (state.toUpperCase()) {
            case "STOP": {
                this._pinotHelixTaskResourceManager.stopTaskQueue(taskType);
                return new SuccessResponse("Successfully stopped task queue for task type: " + taskType);
            }
            case "RESUME": {
                this._pinotHelixTaskResourceManager.resumeTaskQueue(taskType);
                return new SuccessResponse("Successfully resumed task queue for task type: " + taskType);
            }
        }
        throw new IllegalArgumentException("Unsupported state: " + state);
    }

    @DELETE
    @Path(value="/tasks/{taskType}")
    @Authorize(targetType=TargetType.CLUSTER, action="DeleteTask")
    @Produces(value={"application/json"})
    @Authenticate(value=AccessType.DELETE)
    @ApiOperation(value="Delete all tasks (as well as the task queue) for the given task type")
    public SuccessResponse deleteTasks(@ApiParam(value="Task type", required=true) @PathParam(value="taskType") String taskType, @ApiParam(value="Whether to force deleting the tasks (expert only option, enable with cautious") @DefaultValue(value="false") @QueryParam(value="forceDelete") boolean forceDelete) {
        this._pinotHelixTaskResourceManager.deleteTaskQueue(taskType, forceDelete);
        return new SuccessResponse("Successfully deleted tasks for task type: " + taskType);
    }

    @DELETE
    @Path(value="/tasks/task/{taskName}")
    @Authorize(targetType=TargetType.CLUSTER, action="DeleteTask")
    @Produces(value={"application/json"})
    @Authenticate(value=AccessType.DELETE)
    @ApiOperation(value="Delete a single task given its task name")
    public SuccessResponse deleteTask(@ApiParam(value="Task name", required=true) @PathParam(value="taskName") String taskName, @ApiParam(value="Whether to force deleting the task (expert only option, enable with cautious") @DefaultValue(value="false") @QueryParam(value="forceDelete") boolean forceDelete) {
        this._pinotHelixTaskResourceManager.deleteTask(taskName, forceDelete);
        return new SuccessResponse("Successfully deleted task: " + taskName);
    }

    @Deprecated
    @DELETE
    @Path(value="/tasks/taskqueue/{taskType}")
    @Authorize(targetType=TargetType.CLUSTER, action="DeleteTask")
    @Produces(value={"application/json"})
    @Authenticate(value=AccessType.DELETE)
    @ApiOperation(value="Delete a task queue (deprecated)")
    public SuccessResponse deleteTaskQueue(@ApiParam(value="Task type", required=true) @PathParam(value="taskType") String taskType, @ApiParam(value="Whether to force delete the task queue (expert only option, enable with cautious") @DefaultValue(value="false") @QueryParam(value="forceDelete") boolean forceDelete) {
        this._pinotHelixTaskResourceManager.deleteTaskQueue(taskType, forceDelete);
        return new SuccessResponse("Successfully deleted task queue for task type: " + taskType);
    }
}

