/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.servlet;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.linkedin.cruisecontrol.servlet.EndPoint;
import com.linkedin.cruisecontrol.servlet.EndpointType;
import com.linkedin.cruisecontrol.servlet.parameters.CruiseControlParameters;
import com.linkedin.cruisecontrol.servlet.response.CruiseControlResponse;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.servlet.CruiseControlEndPoint;
import com.linkedin.kafka.cruisecontrol.servlet.CruiseControlEndpointType;
import com.linkedin.kafka.cruisecontrol.servlet.KafkaCruiseControlServletUtils;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable.OperationFuture;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils;
import com.linkedin.kafka.cruisecontrol.servlet.purgatory.Purgatory;
import com.linkedin.kafka.cruisecontrol.servlet.response.JsonResponseClass;
import com.linkedin.kafka.cruisecontrol.servlet.response.JsonResponseField;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UserTaskManager
implements Closeable {
    public static final String USER_TASK_HEADER_NAME = "User-Task-ID";
    public static final long USER_TASK_SCANNER_PERIOD_SECONDS = 5L;
    public static final long USER_TASK_SCANNER_INITIAL_DELAY_SECONDS = 0L;
    private static final Logger LOG = LoggerFactory.getLogger(UserTaskManager.class);
    private static final Logger OPERATION_LOG = LoggerFactory.getLogger((String)"operationLogger");
    private final Map<SessionKey, UUID> _sessionKeyToUserTaskIdMap;
    private final Map<UUID, UserTaskInfo> _uuidToActiveUserTaskInfoMap;
    private final Map<EndpointType, Map<UUID, UserTaskInfo>> _uuidToCompletedUserTaskInfoMap;
    private UserTaskInfo _inExecutionUserTaskInfo;
    private final long _sessionExpiryMs;
    private final int _maxActiveUserTasks;
    private final Time _time;
    private final ScheduledExecutorService _userTaskScannerExecutor = Executors.newSingleThreadScheduledExecutor(new KafkaCruiseControlThreadFactory("UserTaskScanner", true, null));
    private final ExecutorService _userTaskLoggerExecutor = Executors.newSingleThreadScheduledExecutor(new KafkaCruiseControlThreadFactory("UserTaskLogger", true, null));
    private final UUIDGenerator _uuidGenerator;
    private final Map<EndPoint, Timer> _successfulRequestExecutionTimer;
    private final Map<EndpointType, Long> _completedUserTaskRetentionTimeMs;
    private final Purgatory _purgatory;

    public UserTaskManager(KafkaCruiseControlConfig config, MetricRegistry dropwizardMetricRegistry, Map<EndPoint, Timer> successfulRequestExecutionTimer, Purgatory purgatory) {
        this._purgatory = purgatory;
        this._sessionKeyToUserTaskIdMap = new HashMap<SessionKey, UUID>();
        List<CruiseControlEndpointType> endpointTypes = Collections.unmodifiableList(Arrays.asList(CruiseControlEndpointType.values()));
        this._uuidToCompletedUserTaskInfoMap = new HashMap<EndpointType, Map<UUID, UserTaskInfo>>(endpointTypes.size());
        this._completedUserTaskRetentionTimeMs = new HashMap<EndpointType, Long>(endpointTypes.size());
        this.initCompletedUserTaskRetentionPolicy(config, endpointTypes);
        this._sessionExpiryMs = config.getLong("webserver.session.maxExpiryTimeMs");
        this._maxActiveUserTasks = config.getInt("max.active.user.tasks");
        this._uuidToActiveUserTaskInfoMap = new LinkedHashMap<UUID, UserTaskInfo>(this._maxActiveUserTasks);
        this._time = Time.SYSTEM;
        this._uuidGenerator = new UUIDGenerator();
        this._userTaskScannerExecutor.scheduleAtFixedRate(new UserTaskScanner(), 0L, 5L, TimeUnit.SECONDS);
        dropwizardMetricRegistry.register(MetricRegistry.name((String)"UserTaskManager", (String[])new String[]{"num-active-sessions"}), (Metric)((Gauge)this._sessionKeyToUserTaskIdMap::size));
        dropwizardMetricRegistry.register(MetricRegistry.name((String)"UserTaskManager", (String[])new String[]{"num-active-user-tasks"}), (Metric)((Gauge)this._uuidToActiveUserTaskInfoMap::size));
        this._successfulRequestExecutionTimer = successfulRequestExecutionTimer;
    }

    UserTaskManager(long sessionExpiryMs, int maxActiveUserTasks, long completedUserTaskRetentionTimeMs, final int maxCachedCompletedUserTasks, Time time, UUIDGenerator uuidGenerator) {
        this._purgatory = null;
        this._sessionKeyToUserTaskIdMap = new HashMap<SessionKey, UUID>();
        this._uuidToActiveUserTaskInfoMap = new LinkedHashMap<UUID, UserTaskInfo>(maxActiveUserTasks);
        List<CruiseControlEndpointType> endpointTypes = Collections.unmodifiableList(Arrays.asList(CruiseControlEndpointType.values()));
        this._uuidToCompletedUserTaskInfoMap = new HashMap<EndpointType, Map<UUID, UserTaskInfo>>(endpointTypes.size());
        this._completedUserTaskRetentionTimeMs = new HashMap<EndpointType, Long>(endpointTypes.size());
        for (CruiseControlEndpointType endpointType : endpointTypes) {
            this._uuidToCompletedUserTaskInfoMap.put(endpointType, (Map<UUID, UserTaskInfo>)new LinkedHashMap<UUID, UserTaskInfo>(){

                @Override
                protected boolean removeEldestEntry(Map.Entry<UUID, UserTaskInfo> eldest) {
                    return this.size() > maxCachedCompletedUserTasks;
                }
            });
            this._completedUserTaskRetentionTimeMs.put(endpointType, completedUserTaskRetentionTimeMs);
        }
        this._sessionExpiryMs = sessionExpiryMs;
        this._maxActiveUserTasks = maxActiveUserTasks;
        this._time = time;
        this._uuidGenerator = uuidGenerator;
        this._userTaskScannerExecutor.scheduleAtFixedRate(new UserTaskScanner(), 0L, 5L, TimeUnit.SECONDS);
        this._successfulRequestExecutionTimer = new HashMap<EndPoint, Timer>();
        CruiseControlEndPoint.cachedValues().forEach(e -> this._successfulRequestExecutionTimer.put((EndPoint)e, new Timer()));
    }

    UserTaskManager(long sessionExpiryMs, int maxActiveUserTasks, long completedUserTaskRetentionTimeMs, int maxCachedCompletedUserTasks, Time time) {
        this(sessionExpiryMs, maxActiveUserTasks, completedUserTaskRetentionTimeMs, maxCachedCompletedUserTasks, time, new UUIDGenerator());
    }

    private void initCompletedUserTaskRetentionPolicy(KafkaCruiseControlConfig config, List<CruiseControlEndpointType> endpointTypes) {
        Integer defaultMaxCachedCompletedUserTasks = config.getInt("max.cached.completed.user.tasks");
        Long defaultCompletedUserTaskRetentionTimeMs = config.getLong("completed.user.task.retention.time.ms");
        for (CruiseControlEndpointType endpointType : endpointTypes) {
            Long completedUserTaskRetentionTimeMs;
            Integer maxCachedCompletedUserTasks;
            switch (endpointType) {
                case CRUISE_CONTROL_ADMIN: {
                    maxCachedCompletedUserTasks = config.getInt("max.cached.completed.cruise.control.admin.user.tasks");
                    completedUserTaskRetentionTimeMs = config.getLong("completed.cruise.control.admin.user.task.retention.time.ms");
                    break;
                }
                case KAFKA_ADMIN: {
                    maxCachedCompletedUserTasks = config.getInt("max.cached.completed.kafka.admin.user.tasks");
                    completedUserTaskRetentionTimeMs = config.getLong("completed.kafka.admin.user.task.retention.time.ms");
                    break;
                }
                case CRUISE_CONTROL_MONITOR: {
                    maxCachedCompletedUserTasks = config.getInt("max.cached.completed.cruise.control.monitor.user.tasks");
                    completedUserTaskRetentionTimeMs = config.getLong("completed.cruise.control.monitor.user.task.retention.time.ms");
                    break;
                }
                case KAFKA_MONITOR: {
                    maxCachedCompletedUserTasks = config.getInt("max.cached.completed.kafka.monitor.user.tasks");
                    completedUserTaskRetentionTimeMs = config.getLong("completed.kafka.monitor.user.task.retention.time.ms");
                    break;
                }
                default: {
                    throw new IllegalStateException("Unknown endpoint type " + endpointType);
                }
            }
            final Integer mapSize = maxCachedCompletedUserTasks == null ? defaultMaxCachedCompletedUserTasks : maxCachedCompletedUserTasks;
            this._uuidToCompletedUserTaskInfoMap.put(endpointType, (Map<UUID, UserTaskInfo>)new LinkedHashMap<UUID, UserTaskInfo>(){

                @Override
                protected boolean removeEldestEntry(Map.Entry<UUID, UserTaskInfo> eldest) {
                    return this.size() > mapSize;
                }
            });
            this._completedUserTaskRetentionTimeMs.put(endpointType, completedUserTaskRetentionTimeMs == null ? defaultCompletedUserTaskRetentionTimeMs : completedUserTaskRetentionTimeMs);
        }
    }

    public List<OperationFuture> getOrCreateUserTask(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Function<String, OperationFuture> function, int step, boolean isAsyncRequest, CruiseControlParameters parameters) {
        UUID userTaskId = this.getUserTaskId(httpServletRequest);
        UserTaskInfo userTaskInfo = this.getUserTaskByUserTaskId(userTaskId, httpServletRequest);
        if (userTaskInfo != null) {
            LOG.info("Fetch an existing UserTask {}", (Object)userTaskId);
            httpServletResponse.setHeader(USER_TASK_HEADER_NAME, userTaskId.toString());
            if (step < userTaskInfo.futures().size()) {
                return Collections.unmodifiableList(userTaskInfo.futures());
            }
            if (step == userTaskInfo.futures().size()) {
                LOG.info("Add a new future to existing UserTask {}", (Object)userTaskId);
                return Collections.unmodifiableList(this.insertFuturesByUserTaskId(userTaskId, function, httpServletRequest, parameters).futures());
            }
            throw new IllegalArgumentException(String.format("There are %d steps in the session. Cannot add step %d.", userTaskInfo.futures().size(), step));
        }
        KafkaCruiseControlServletUtils.ensureHeaderNotPresent(httpServletRequest, USER_TASK_HEADER_NAME);
        if (step != 0) {
            throw new IllegalArgumentException(String.format("There are no step in the session. Cannot add step %d.", step));
        }
        userTaskId = this._uuidGenerator.randomUUID();
        userTaskInfo = this.insertFuturesByUserTaskId(userTaskId, function, httpServletRequest, parameters);
        if (isAsyncRequest) {
            this.createSessionKeyMapping(userTaskId, httpServletRequest);
        }
        httpServletResponse.setHeader(USER_TASK_HEADER_NAME, userTaskId.toString());
        return Collections.unmodifiableList(userTaskInfo.futures());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createSessionKeyMapping(UUID userTaskId, HttpServletRequest httpServletRequest) {
        SessionKey sessionKey = new SessionKey(httpServletRequest);
        LOG.info("Create a new UserTask {} with SessionKey {}", (Object)userTaskId, (Object)sessionKey);
        Map<SessionKey, UUID> map = this._sessionKeyToUserTaskIdMap;
        synchronized (map) {
            this._sessionKeyToUserTaskIdMap.put(sessionKey, userTaskId);
        }
    }

    public <T> T getFuture(HttpServletRequest request) {
        UUID userTaskId = this.getUserTaskId(request);
        UserTaskInfo userTaskInfo = this.getUserTaskByUserTaskId(userTaskId, request);
        List<OperationFuture> operationFutures = null;
        if (userTaskInfo != null) {
            operationFutures = userTaskInfo.futures();
        }
        if (operationFutures == null || operationFutures.isEmpty()) {
            return null;
        }
        return (T)operationFutures.get(operationFutures.size() - 1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void expireOldSessions() {
        long now = this._time.milliseconds();
        Map<SessionKey, UUID> map = this._sessionKeyToUserTaskIdMap;
        synchronized (map) {
            Iterator<Map.Entry<SessionKey, UUID>> iter = this._sessionKeyToUserTaskIdMap.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry<SessionKey, UUID> entry = iter.next();
                SessionKey sessionKey = entry.getKey();
                HttpSession session = sessionKey.httpSession();
                try {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Session {} was last accessed at {}, age is {} ms.", new Object[]{session, session.getLastAccessedTime(), now - session.getLastAccessedTime()});
                    }
                    if (now < session.getLastAccessedTime() + this._sessionExpiryMs) continue;
                    LOG.info("Expiring the session associated with {}.", (Object)sessionKey);
                    session.invalidate();
                    iter.remove();
                }
                catch (IllegalStateException e) {
                    LOG.info("Already expired the session associated with {}.", (Object)sessionKey);
                    iter.remove();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public UUID getUserTaskId(HttpServletRequest httpServletRequest) {
        UUID userTaskId;
        String userTaskIdString = httpServletRequest.getHeader(USER_TASK_HEADER_NAME);
        if (userTaskIdString != null && !userTaskIdString.isEmpty()) {
            userTaskId = UUID.fromString(userTaskIdString);
        } else {
            SessionKey sessionKey = new SessionKey(httpServletRequest);
            Map<SessionKey, UUID> map = this._sessionKeyToUserTaskIdMap;
            synchronized (map) {
                userTaskId = this._sessionKeyToUserTaskIdMap.get(sessionKey);
            }
        }
        return userTaskId;
    }

    synchronized void checkActiveUserTasks() {
        Iterator<Map.Entry<UUID, UserTaskInfo>> iter = this._uuidToActiveUserTaskInfoMap.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<UUID, UserTaskInfo> entry = iter.next();
            if (entry.getValue().isUserTaskDoneExceptionally()) {
                LOG.warn("UserTask {} is completed with Exception and removed from active tasks list", (Object)entry.getKey());
                this._uuidToCompletedUserTaskInfoMap.get(entry.getValue().endPoint().endpointType()).put(entry.getKey(), entry.getValue().setState(TaskState.COMPLETED_WITH_ERROR));
                iter.remove();
                this._userTaskLoggerExecutor.submit(() -> ((UserTaskInfo)entry.getValue()).logOperation());
                continue;
            }
            if (!entry.getValue().isUserTaskDone()) continue;
            LOG.info("UserTask {} is completed and removed from active tasks list", (Object)entry.getKey());
            this._successfulRequestExecutionTimer.get(entry.getValue().endPoint()).update(entry.getValue().executionTimeNs(), TimeUnit.NANOSECONDS);
            this._uuidToCompletedUserTaskInfoMap.get(entry.getValue().endPoint().endpointType()).put(entry.getKey(), entry.getValue().setState(TaskState.COMPLETED));
            iter.remove();
            this._userTaskLoggerExecutor.submit(() -> ((UserTaskInfo)entry.getValue()).logOperation());
        }
    }

    private synchronized void removeFromPurgatory(UserTaskInfo userTaskInfo) {
        String parameterString;
        if (this._purgatory != null && (parameterString = ParameterUtils.caseSensitiveParameterName(userTaskInfo.queryParams(), "review_id")) != null) {
            int reviewId = Integer.parseInt(userTaskInfo.queryParams().get(parameterString)[0]);
            try {
                this._purgatory.removeSubmitted(reviewId);
                LOG.info("Successfully removed submitted request corresponding to review id {} from purgatory.", (Object)reviewId);
            }
            catch (IllegalStateException ise) {
                LOG.error("Should never attempt to remove this request from purgatory.", (Throwable)ise);
            }
        }
    }

    private synchronized void removeOldUserTasks() {
        LOG.debug("Remove old user tasks");
        for (Map.Entry<EndpointType, Long> retentionByType : this._completedUserTaskRetentionTimeMs.entrySet()) {
            long completedUserTaskRetentionTimeMs = retentionByType.getValue();
            Iterator<Map.Entry<UUID, UserTaskInfo>> iterator = this._uuidToCompletedUserTaskInfoMap.get(retentionByType.getKey()).entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<UUID, UserTaskInfo> entry = iterator.next();
                if (entry.getValue().startMs() + completedUserTaskRetentionTimeMs >= this._time.milliseconds()) continue;
                this.removeFromPurgatory(entry.getValue());
                iterator.remove();
            }
        }
    }

    public synchronized UserTaskInfo markTaskExecutionBegan(String uuid) {
        UUID userTaskId = UUID.fromString(uuid);
        for (Map<UUID, UserTaskInfo> infoMap : this._uuidToCompletedUserTaskInfoMap.values()) {
            if (!infoMap.containsKey(userTaskId)) continue;
            this._inExecutionUserTaskInfo = infoMap.remove(userTaskId).setState(TaskState.IN_EXECUTION);
            return this._inExecutionUserTaskInfo;
        }
        if (this._uuidToActiveUserTaskInfoMap.containsKey(userTaskId)) {
            this._inExecutionUserTaskInfo = this._uuidToActiveUserTaskInfoMap.remove(userTaskId).setState(TaskState.IN_EXECUTION);
            this._inExecutionUserTaskInfo.logOperation();
        }
        return this._inExecutionUserTaskInfo;
    }

    public synchronized void markTaskExecutionFinished(String uuid, boolean completeWithError) {
        LOG.debug("Task execution with uuid {} completed{}.", (Object)uuid, (Object)(completeWithError ? " with error" : ""));
        if (!this._inExecutionUserTaskInfo.userTaskId().equals(UUID.fromString(uuid))) {
            throw new IllegalStateException(String.format("Task %s is not found in UserTaskManager.", uuid));
        }
        if (completeWithError) {
            this._inExecutionUserTaskInfo.setState(TaskState.COMPLETED_WITH_ERROR);
        } else {
            this._inExecutionUserTaskInfo.setState(TaskState.COMPLETED);
        }
        this._uuidToCompletedUserTaskInfoMap.get(this._inExecutionUserTaskInfo.endPoint().endpointType()).put(this._inExecutionUserTaskInfo.userTaskId(), this._inExecutionUserTaskInfo);
        this._inExecutionUserTaskInfo = null;
    }

    public synchronized UserTaskInfo getUserTaskByUserTaskId(UUID userTaskId, HttpServletRequest httpServletRequest) {
        UserTaskInfo userTaskInfo;
        if (userTaskId == null) {
            return null;
        }
        String requestUrl = KafkaCruiseControlServletUtils.httpServletRequestToString(httpServletRequest);
        for (Map<UUID, UserTaskInfo> infoMap : this._uuidToCompletedUserTaskInfoMap.values()) {
            UserTaskInfo userTaskInfo2;
            if (!infoMap.containsKey(userTaskId) || !(userTaskInfo2 = infoMap.get(userTaskId)).requestUrl().equals(requestUrl) || !this.hasTheSameHttpParameter(userTaskInfo2.queryParams(), httpServletRequest.getParameterMap())) continue;
            return userTaskInfo2;
        }
        if (this._uuidToActiveUserTaskInfoMap.containsKey(userTaskId) && (userTaskInfo = this._uuidToActiveUserTaskInfoMap.get(userTaskId)).requestUrl().equals(requestUrl) && this.hasTheSameHttpParameter(userTaskInfo.queryParams(), httpServletRequest.getParameterMap())) {
            return userTaskInfo;
        }
        if (this._inExecutionUserTaskInfo != null && this._inExecutionUserTaskInfo.userTaskId().equals(userTaskId) && this._inExecutionUserTaskInfo.requestUrl().equals(requestUrl) && this.hasTheSameHttpParameter(this._inExecutionUserTaskInfo.queryParams(), httpServletRequest.getParameterMap())) {
            return this._inExecutionUserTaskInfo;
        }
        return null;
    }

    private synchronized UserTaskInfo insertFuturesByUserTaskId(UUID userTaskId, Function<String, OperationFuture> operation, HttpServletRequest httpServletRequest, CruiseControlParameters parameters) {
        if (this._uuidToActiveUserTaskInfoMap.containsKey(userTaskId)) {
            this._uuidToActiveUserTaskInfoMap.get(userTaskId).futures().add(operation.apply(userTaskId.toString()));
        } else {
            if (this._uuidToActiveUserTaskInfoMap.size() >= this._maxActiveUserTasks) {
                throw new RuntimeException("There are already " + this._uuidToActiveUserTaskInfoMap.size() + " active user tasks, which has reached the servlet capacity.");
            }
            UserTaskInfo userTaskInfo = new UserTaskInfo(httpServletRequest, new ArrayList<OperationFuture>(Collections.singleton(operation.apply(userTaskId.toString()))), this._time.milliseconds(), userTaskId, TaskState.ACTIVE, parameters);
            this._uuidToActiveUserTaskInfoMap.put(userTaskId, userTaskInfo);
        }
        return this._uuidToActiveUserTaskInfoMap.get(userTaskId);
    }

    public synchronized List<UserTaskInfo> getAllUserTasks() {
        ArrayList<UserTaskInfo> allUserTasks = new ArrayList<UserTaskInfo>(this._uuidToActiveUserTaskInfoMap.values());
        if (this._inExecutionUserTaskInfo != null) {
            allUserTasks.add(this._inExecutionUserTaskInfo);
        }
        allUserTasks.addAll(this._uuidToCompletedUserTaskInfoMap.values().stream().flatMap(map -> map.values().stream()).collect(Collectors.toList()));
        return allUserTasks;
    }

    public String toString() {
        LinkedHashMap uuidToCompletedWithSuccessUserTaskInfoMap = new LinkedHashMap();
        LinkedHashMap uuidToCompletedWithErrorUserTaskInfoMap = new LinkedHashMap();
        for (Map<UUID, UserTaskInfo> infoMap : this._uuidToCompletedUserTaskInfoMap.values()) {
            infoMap.forEach((k, v) -> {
                if (v.state() == TaskState.COMPLETED) {
                    uuidToCompletedWithSuccessUserTaskInfoMap.put(k, v);
                } else {
                    uuidToCompletedWithErrorUserTaskInfoMap.put(k, v);
                }
            });
        }
        return "UserTaskManager{_sessionKeyToUserTaskIdMap=" + this._sessionKeyToUserTaskIdMap + ", _uuidToActiveUserTaskInfoMap=" + this._uuidToActiveUserTaskInfoMap + ", _inExecutionUserTask=" + (this._inExecutionUserTaskInfo != null ? this._inExecutionUserTaskInfo : "No-User-Initiated-Execution") + ", _uuidToCompletedWithSuccessUserTaskInfoMap=" + uuidToCompletedWithSuccessUserTaskInfoMap + ", _uuidToCompletedWithErrorUserTaskInfoMap=" + uuidToCompletedWithErrorUserTaskInfoMap + "}";
    }

    @Override
    public void close() {
        this._userTaskScannerExecutor.shutdownNow();
        this._userTaskLoggerExecutor.shutdownNow();
    }

    private boolean hasTheSameHttpParameter(Map<String, String[]> params1, Map<String, String[]> params2) {
        boolean isSameParameters = params1.keySet().equals(params2.keySet());
        if (isSameParameters) {
            for (Map.Entry<String, String[]> entry : params1.entrySet()) {
                HashSet<String> values2;
                HashSet<String> values1 = new HashSet<String>(Arrays.asList(entry.getValue()));
                if (values1.equals(values2 = new HashSet<String>(Arrays.asList(params2.get(entry.getKey()))))) continue;
                return false;
            }
        }
        return isSameParameters;
    }

    int numActiveSessionKeys() {
        return this._sessionKeyToUserTaskIdMap.size();
    }

    public static enum TaskState {
        ACTIVE("Active"),
        IN_EXECUTION("InExecution"),
        COMPLETED("Completed"),
        COMPLETED_WITH_ERROR("CompletedWithError");

        private String _type;
        private static final List<TaskState> CACHED_VALUES;

        private TaskState(String type) {
            this._type = type;
        }

        public String toString() {
            return this._type;
        }

        public static List<TaskState> cachedValues() {
            return CACHED_VALUES;
        }

        static {
            CACHED_VALUES = Collections.unmodifiableList(Arrays.asList(TaskState.values()));
        }
    }

    private class UserTaskScanner
    implements Runnable {
        private UserTaskScanner() {
        }

        @Override
        public void run() {
            try {
                UserTaskManager.this.expireOldSessions();
                UserTaskManager.this.checkActiveUserTasks();
                UserTaskManager.this.removeOldUserTasks();
            }
            catch (Throwable t) {
                LOG.warn("Received exception when trying to expire sessions.", t);
            }
        }
    }

    @JsonResponseClass
    public static class UserTaskInfo {
        @JsonResponseField
        protected static final String USER_TASK_ID = "UserTaskId";
        @JsonResponseField
        protected static final String REQUEST_URL = "RequestURL";
        @JsonResponseField
        protected static final String CLIENT_ID = "ClientIdentity";
        @JsonResponseField
        protected static final String START_MS = "StartMs";
        @JsonResponseField
        protected static final String STATUS = "Status";
        @JsonResponseField(required=false)
        protected static final String ORIGINAL_RESPONSE = "originalResponse";
        private final List<OperationFuture> _futures;
        private final String _requestUrl;
        private final String _clientIdentity;
        private final long _startMs;
        private final UUID _userTaskId;
        private final Map<String, String[]> _queryParams;
        private final EndPoint _endPoint;
        private TaskState _state;
        private final CruiseControlParameters _parameters;

        public UserTaskInfo(HttpServletRequest httpServletRequest, List<OperationFuture> futures, long startMs, UUID userTaskId, TaskState state, CruiseControlParameters parameters) {
            if (futures == null || futures.isEmpty()) {
                throw new IllegalArgumentException("Invalid OperationFuture list " + futures + " is provided for UserTaskInfo.");
            }
            this._futures = futures;
            this._requestUrl = KafkaCruiseControlServletUtils.httpServletRequestToString(httpServletRequest);
            this._clientIdentity = KafkaCruiseControlServletUtils.getClientIpAddress(httpServletRequest);
            this._startMs = startMs;
            this._userTaskId = userTaskId;
            this._queryParams = httpServletRequest.getParameterMap();
            this._endPoint = ParameterUtils.endPoint(httpServletRequest);
            this._state = state;
            this._parameters = parameters;
        }

        public List<OperationFuture> futures() {
            return this._futures;
        }

        public String requestUrl() {
            return this._requestUrl;
        }

        public String clientIdentity() {
            return this._clientIdentity;
        }

        public long startMs() {
            return this._startMs;
        }

        public UUID userTaskId() {
            return this._userTaskId;
        }

        public Map<String, String[]> queryParams() {
            return this._queryParams;
        }

        public EndPoint endPoint() {
            return this._endPoint;
        }

        public long executionTimeNs() {
            return this.lastFuture().finishTimeNs() - TimeUnit.MILLISECONDS.toNanos(this._startMs);
        }

        private OperationFuture lastFuture() {
            return this._futures.get(this._futures.size() - 1);
        }

        public TaskState state() {
            return this._state;
        }

        public String requestWithParams() {
            StringBuilder sb = new StringBuilder(this._requestUrl);
            String queryParamDelimiter = "?";
            for (Map.Entry<String, String[]> paramSet : this._queryParams.entrySet()) {
                for (String paramValue : paramSet.getValue()) {
                    sb.append(queryParamDelimiter).append(paramSet.getKey()).append("=").append(paramValue);
                    if (!queryParamDelimiter.equals("?")) continue;
                    queryParamDelimiter = "&";
                }
            }
            return sb.toString();
        }

        public UserTaskInfo setState(TaskState nextState) {
            this._state = nextState;
            return this;
        }

        public CruiseControlParameters parameters() {
            return this._parameters;
        }

        boolean isUserTaskDone() {
            return this.lastFuture().isDone();
        }

        boolean isUserTaskDoneExceptionally() {
            return this.lastFuture().isCompletedExceptionally();
        }

        private void logOperation() {
            try {
                CruiseControlResponse response = this.lastFuture().get();
                response.discardIrrelevantResponse(this._parameters);
                OPERATION_LOG.info("Task [{}] calculation finishes, result:\n{}", (Object)this._userTaskId, (Object)response.cachedResponse());
            }
            catch (InterruptedException | ExecutionException e) {
                OPERATION_LOG.info("Task [{}] calculation fails, exception:\n{}", (Object)this._userTaskId, (Object)e);
            }
        }

        public Map<String, Object> getJsonStructure(boolean fetchCompletedTask) {
            HashMap<String, Object> jsonObjectMap = new HashMap<String, Object>(fetchCompletedTask ? 6 : 5);
            String status = this._state.toString();
            jsonObjectMap.put(USER_TASK_ID, this._userTaskId.toString());
            jsonObjectMap.put(REQUEST_URL, this.requestWithParams());
            jsonObjectMap.put(CLIENT_ID, this._clientIdentity);
            jsonObjectMap.put(START_MS, Long.toString(this._startMs));
            jsonObjectMap.put(STATUS, status);
            if (fetchCompletedTask) {
                try {
                    jsonObjectMap.put(ORIGINAL_RESPONSE, this.lastFuture().get().cachedResponse());
                }
                catch (InterruptedException | ExecutionException e) {
                    if (this._state == TaskState.COMPLETED_WITH_ERROR) {
                        jsonObjectMap.put(ORIGINAL_RESPONSE, TaskState.COMPLETED_WITH_ERROR.toString());
                    }
                    throw new IllegalStateException("Error happened in fetching response for task " + this._userTaskId.toString(), e);
                }
            }
            return jsonObjectMap;
        }
    }

    public static class UUIDGenerator {
        UUID randomUUID() {
            return UUID.randomUUID();
        }
    }

    public static class SessionKey {
        private final HttpSession _httpSession;
        private final String _requestUrl;
        private final Map<String, Set<String>> _queryParams;

        SessionKey(HttpServletRequest httpServletRequest) {
            this._httpSession = httpServletRequest.getSession();
            this._requestUrl = KafkaCruiseControlServletUtils.httpServletRequestToString(httpServletRequest);
            this._queryParams = new HashMap<String, Set<String>>();
            httpServletRequest.getParameterMap().forEach((k, v) -> this._queryParams.put((String)k, (Set<String>)new HashSet<String>(Arrays.asList(v))));
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            SessionKey that = (SessionKey)o;
            return Objects.equals(this._httpSession, that._httpSession) && Objects.equals(this._requestUrl, that._requestUrl) && Objects.equals(this._queryParams, that._queryParams);
        }

        public int hashCode() {
            return Objects.hash(this._httpSession, this._requestUrl, this._queryParams);
        }

        public String toString() {
            return String.format("SessionKey{_httpSession=%s,_requestUrl=%s,_queryParams=%s}", this._httpSession, this._requestUrl, this._queryParams);
        }

        public HttpSession httpSession() {
            return this._httpSession;
        }
    }
}

