/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.servlet.parameters;

import com.google.gson.Gson;
import com.linkedin.cruisecontrol.detector.AnomalyType;
import com.linkedin.cruisecontrol.servlet.EndPoint;
import com.linkedin.cruisecontrol.servlet.parameters.CruiseControlParameters;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.IntraBrokerDiskCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.IntraBrokerDiskUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerDiskUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerEvenRackAwareGoal;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.detector.notifier.KafkaAnomalyType;
import com.linkedin.kafka.cruisecontrol.executor.ConcurrencyType;
import com.linkedin.kafka.cruisecontrol.executor.strategy.BaseReplicaMovementStrategy;
import com.linkedin.kafka.cruisecontrol.executor.strategy.ReplicaMovementStrategy;
import com.linkedin.kafka.cruisecontrol.servlet.CruiseControlEndPoint;
import com.linkedin.kafka.cruisecontrol.servlet.KafkaCruiseControlServletUtils;
import com.linkedin.kafka.cruisecontrol.servlet.UserRequestException;
import com.linkedin.kafka.cruisecontrol.servlet.UserTaskManager;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.TopicConfigurationParameters;
import com.linkedin.kafka.cruisecontrol.servlet.purgatory.ReviewStatus;
import com.linkedin.kafka.cruisecontrol.servlet.response.CruiseControlState;
import com.linkedin.kafka.cruisecontrol.servlet.response.ResponseUtils;
import java.io.IOException;
import java.io.Reader;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

public class ParameterUtils {
    public static final String JSON_PARAM = "json";
    public static final String GET_RESPONSE_SCHEMA = "get_response_schema";
    public static final String START_MS_PARAM = "start";
    public static final String END_MS_PARAM = "end";
    public static final String ENTRIES_PARAM = "entries";
    public static final String ALLOW_CAPACITY_ESTIMATION_PARAM = "allow_capacity_estimation";
    public static final String STOP_ONGOING_EXECUTION_PARAM = "stop_ongoing_execution";
    public static final String CLEAR_METRICS_PARAM = "clearmetrics";
    public static final String TIME_PARAM = "time";
    public static final String VERBOSE_PARAM = "verbose";
    public static final String SUPER_VERBOSE_PARAM = "super_verbose";
    public static final String RESOURCE_PARAM = "resource";
    public static final String REASON_PARAM = "reason";
    public static final String DATA_FROM_PARAM = "data_from";
    public static final String KAFKA_ASSIGNER_MODE_PARAM = "kafka_assigner";
    public static final String MAX_LOAD_PARAM = "max_load";
    public static final String AVG_LOAD_PARAM = "avg_load";
    public static final String GOALS_PARAM = "goals";
    public static final String BROKER_ID_PARAM = "brokerid";
    public static final String DROP_RECENTLY_REMOVED_BROKERS_PARAM = "drop_recently_removed_brokers";
    public static final String DROP_RECENTLY_DEMOTED_BROKERS_PARAM = "drop_recently_demoted_brokers";
    public static final String DESTINATION_BROKER_IDS_PARAM = "destination_broker_ids";
    public static final String REVIEW_ID_PARAM = "review_id";
    public static final String REVIEW_IDS_PARAM = "review_ids";
    public static final String TOPIC_PARAM = "topic";
    public static final String PARTITION_PARAM = "partition";
    public static final String DRY_RUN_PARAM = "dryrun";
    public static final String THROTTLE_ADDED_BROKER_PARAM = "throttle_added_broker";
    public static final String THROTTLE_REMOVED_BROKER_PARAM = "throttle_removed_broker";
    public static final String REPLICATION_THROTTLE_PARAM = "replication_throttle";
    public static final String IGNORE_PROPOSAL_CACHE_PARAM = "ignore_proposal_cache";
    public static final String USE_READY_DEFAULT_GOALS_PARAM = "use_ready_default_goals";
    public static final String EXECUTION_PROGRESS_CHECK_INTERVAL_MS_PARAM = "execution_progress_check_interval_ms";
    public static final String CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_PARAM = "concurrent_partition_movements_per_broker";
    public static final String CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_PARAM = "concurrent_intra_broker_partition_movements";
    public static final String CONCURRENT_LEADER_MOVEMENTS_PARAM = "concurrent_leader_movements";
    public static final String DEFAULT_PARTITION_LOAD_RESOURCE = "disk";
    public static final String SUBSTATES_PARAM = "substates";
    public static final String MIN_VALID_PARTITION_RATIO_PARAM = "min_valid_partition_ratio";
    public static final String SKIP_HARD_GOAL_CHECK_PARAM = "skip_hard_goal_check";
    public static final String EXCLUDED_TOPICS_PARAM = "excluded_topics";
    public static final String USER_TASK_IDS_PARAM = "user_task_ids";
    public static final String CLIENT_IDS_PARAM = "client_ids";
    public static final String ENDPOINTS_PARAM = "endpoints";
    public static final String TYPES_PARAM = "types";
    public static final String SKIP_URP_DEMOTION_PARAM = "skip_urp_demotion";
    public static final String EXCLUDE_FOLLOWER_DEMOTION_PARAM = "exclude_follower_demotion";
    public static final String DISABLE_SELF_HEALING_FOR_PARAM = "disable_self_healing_for";
    public static final String ENABLE_SELF_HEALING_FOR_PARAM = "enable_self_healing_for";
    public static final String DISABLE_CONCURRENCY_ADJUSTER_FOR_PARAM = "disable_concurrency_adjuster_for";
    public static final String ENABLE_CONCURRENCY_ADJUSTER_FOR_PARAM = "enable_concurrency_adjuster_for";
    public static final String EXCLUDE_RECENTLY_DEMOTED_BROKERS_PARAM = "exclude_recently_demoted_brokers";
    public static final String EXCLUDE_RECENTLY_REMOVED_BROKERS_PARAM = "exclude_recently_removed_brokers";
    public static final String REPLICA_MOVEMENT_STRATEGIES_PARAM = "replica_movement_strategies";
    public static final String APPROVE_PARAM = "approve";
    public static final String DISCARD_PARAM = "discard";
    public static final String REBALANCE_DISK_MODE_PARAM = "rebalance_disk";
    public static final String POPULATE_DISK_INFO_PARAM = "populate_disk_info";
    public static final String CAPACITY_ONLY_PARAM = "capacity_only";
    public static final String BROKER_ID_AND_LOGDIRS_PARAM = "brokerid_and_logdirs";
    public static final String REPLICATION_FACTOR_PARAM = "replication_factor";
    public static final String SKIP_RACK_AWARENESS_CHECK_PARAM = "skip_rack_awareness_check";
    public static final String FETCH_COMPLETED_TASK_PARAM = "fetch_completed_task";
    public static final String FORCE_STOP_PARAM = "force_stop";
    private static final int MAX_REASON_LENGTH = 50;
    private static final String DELIMITER_BETWEEN_BROKER_ID_AND_LOGDIR = "-";
    public static final long DEFAULT_START_TIME_FOR_CLUSTER_MODEL = -1L;
    public static final String TOPIC_BY_REPLICATION_FACTOR = "topic_by_replication_factor";
    public static final String NO_REASON_PROVIDED = "No reason provided";
    public static final String DO_AS = "doAs";
    public static final String STOP_PROPOSAL_PARAMETER_OBJECT_CONFIG = "stop.proposal.parameter.object";
    public static final String BOOTSTRAP_PARAMETER_OBJECT_CONFIG = "bootstrap.parameter.object";
    public static final String TRAIN_PARAMETER_OBJECT_CONFIG = "train.parameter.object";
    public static final String LOAD_PARAMETER_OBJECT_CONFIG = "load.parameter.object";
    public static final String PARTITION_LOAD_PARAMETER_OBJECT_CONFIG = "partition.load.parameter.object";
    public static final String PROPOSALS_PARAMETER_OBJECT_CONFIG = "proposals.parameter.object";
    public static final String STATE_PARAMETER_OBJECT_CONFIG = "state.parameter.object";
    public static final String KAFKA_CLUSTER_STATE_PARAMETER_OBJECT_CONFIG = "kafka.cluster.state.parameter.object";
    public static final String USER_TASKS_PARAMETER_OBJECT_CONFIG = "user.tasks.parameter.object";
    public static final String REVIEW_BOARD_PARAMETER_OBJECT_CONFIG = "review.board.parameter.object";
    public static final String ADD_BROKER_PARAMETER_OBJECT_CONFIG = "add.broker.parameter.object";
    public static final String REMOVE_BROKER_PARAMETER_OBJECT_CONFIG = "remove.broker.parameter.object";
    public static final String FIX_OFFLINE_REPLICAS_PARAMETER_OBJECT_CONFIG = "fix.offline.replicas.parameter.object";
    public static final String REBALANCE_PARAMETER_OBJECT_CONFIG = "rebalance.parameter.object";
    public static final String PAUSE_RESUME_PARAMETER_OBJECT_CONFIG = "pause.resume.parameter.object";
    public static final String DEMOTE_BROKER_PARAMETER_OBJECT_CONFIG = "demote.broker.parameter.object";
    public static final String ADMIN_PARAMETER_OBJECT_CONFIG = "admin.parameter.object";
    public static final String REVIEW_PARAMETER_OBJECT_CONFIG = "review.parameter.object";
    public static final String TOPIC_CONFIGURATION_PARAMETER_OBJECT_CONFIG = "topic.configuration.parameter.object";

    private ParameterUtils() {
    }

    public static CruiseControlEndPoint endPoint(HttpServletRequest request) {
        List<CruiseControlEndPoint> supportedEndpoints;
        switch (request.getMethod()) {
            case "GET": {
                supportedEndpoints = CruiseControlEndPoint.getEndpoints();
                break;
            }
            case "POST": {
                supportedEndpoints = CruiseControlEndPoint.postEndpoints();
                break;
            }
            default: {
                throw new UserRequestException("Unsupported request method: " + request.getMethod() + ".");
            }
        }
        String path = request.getRequestURI().toUpperCase().replace("/KAFKACRUISECONTROL/", "");
        for (CruiseControlEndPoint endPoint : supportedEndpoints) {
            if (!endPoint.toString().equalsIgnoreCase(path)) continue;
            return endPoint;
        }
        return null;
    }

    static void handleParameterParseException(Exception e, HttpServletResponse response, String errorMessage, boolean json, boolean wantJsonSchema, KafkaCruiseControlConfig config) throws IOException {
        ResponseUtils.writeErrorResponse(response, e, errorMessage, 400, json, wantJsonSchema, config);
    }

    public static boolean hasValidParameterNames(HttpServletRequest request, HttpServletResponse response, KafkaCruiseControlConfig config, CruiseControlParameters parameters) throws IOException {
        CruiseControlEndPoint endPoint = ParameterUtils.endPoint(request);
        SortedSet validParamNames = parameters.caseInsensitiveParameterNames();
        TreeSet userParams = new TreeSet(String.CASE_INSENSITIVE_ORDER);
        userParams.addAll(request.getParameterMap().keySet());
        if (validParamNames != null) {
            userParams.removeAll(validParamNames);
        }
        if (!userParams.isEmpty()) {
            String errorMessage = String.format("Unrecognized endpoint parameters in %s %s request: %s.", new Object[]{endPoint, request.getMethod(), ((Object)userParams).toString()});
            ResponseUtils.writeErrorResponse(response, null, errorMessage, 400, ParameterUtils.wantJSON(request), ParameterUtils.wantResponseSchema(request), config);
            return false;
        }
        return true;
    }

    public static String caseSensitiveParameterName(Map<String, String[]> parameterMap, String parameter) {
        return parameterMap.keySet().stream().filter(parameter::equalsIgnoreCase).findFirst().orElse(null);
    }

    public static boolean getBooleanParam(HttpServletRequest request, String parameter, boolean defaultIfMissing) {
        String parameterString = ParameterUtils.caseSensitiveParameterName(request.getParameterMap(), parameter);
        return parameterString == null ? defaultIfMissing : Boolean.parseBoolean(request.getParameter(parameterString));
    }

    public static Long getLongParam(HttpServletRequest request, String parameter, @Nullable Long defaultIfMissing) {
        String parameterString = ParameterUtils.caseSensitiveParameterName(request.getParameterMap(), parameter);
        return parameterString == null ? defaultIfMissing : Long.valueOf(request.getParameter(parameterString));
    }

    public static List<String> getListParam(HttpServletRequest request, String parameter) throws UnsupportedEncodingException {
        String parameterString = ParameterUtils.caseSensitiveParameterName(request.getParameterMap(), parameter);
        ArrayList<String> retList = parameterString == null ? new ArrayList<String>() : Arrays.asList(ParameterUtils.urlDecode(request.getParameter(parameterString)).split(","));
        retList.removeIf(String::isEmpty);
        return Collections.unmodifiableList(retList);
    }

    public static boolean wantJSON(HttpServletRequest request) {
        return ParameterUtils.getBooleanParam(request, JSON_PARAM, false);
    }

    public static boolean wantResponseSchema(HttpServletRequest request) {
        return ParameterUtils.getBooleanParam(request, GET_RESPONSE_SCHEMA, false);
    }

    static boolean allowCapacityEstimation(HttpServletRequest request) {
        return ParameterUtils.getBooleanParam(request, ALLOW_CAPACITY_ESTIMATION_PARAM, true);
    }

    static boolean skipRackAwarenessCheck(HttpServletRequest request) {
        return ParameterUtils.getBooleanParam(request, SKIP_RACK_AWARENESS_CHECK_PARAM, false);
    }

    static boolean stopOngoingExecution(HttpServletRequest request) {
        return ParameterUtils.getBooleanParam(request, STOP_ONGOING_EXECUTION_PARAM, false);
    }

    private static boolean excludeBrokers(HttpServletRequest request, String parameter, boolean defaultIfMissing) {
        boolean isKafkaAssignerMode = ParameterUtils.isKafkaAssignerMode(request);
        boolean excludeBrokers = ParameterUtils.getBooleanParam(request, parameter, defaultIfMissing);
        if (isKafkaAssignerMode && excludeBrokers) {
            throw new UserRequestException("Kafka assigner mode does not support excluding brokers.");
        }
        return excludeBrokers;
    }

    private static boolean getBooleanExcludeGiven(HttpServletRequest request, String getParameter, Set<String> excludeParameters) {
        boolean booleanParam = ParameterUtils.getBooleanParam(request, getParameter, false);
        if (booleanParam) {
            for (String excludeParameter : excludeParameters) {
                if (ParameterUtils.caseSensitiveParameterName(request.getParameterMap(), excludeParameter) == null) continue;
                throw new UserRequestException("Cannot set " + getParameter + " parameter to true when explicitly specifying " + excludeParameter + " in the request.");
            }
        }
        return booleanParam;
    }

    static boolean excludeRecentlyDemotedBrokers(HttpServletRequest request) {
        return ParameterUtils.excludeBrokers(request, EXCLUDE_RECENTLY_DEMOTED_BROKERS_PARAM, false);
    }

    static boolean excludeRecentlyRemovedBrokers(HttpServletRequest request) {
        return ParameterUtils.excludeBrokers(request, EXCLUDE_RECENTLY_REMOVED_BROKERS_PARAM, true);
    }

    static boolean wantMaxLoad(HttpServletRequest request) {
        return ParameterUtils.getBooleanParam(request, MAX_LOAD_PARAM, false);
    }

    static boolean wantAvgLoad(HttpServletRequest request) {
        return ParameterUtils.getBooleanParam(request, AVG_LOAD_PARAM, false);
    }

    static boolean isVerbose(HttpServletRequest request) {
        return ParameterUtils.getBooleanParam(request, VERBOSE_PARAM, false);
    }

    static boolean isSuperVerbose(HttpServletRequest request) {
        return ParameterUtils.getBooleanParam(request, SUPER_VERBOSE_PARAM, false);
    }

    static boolean clearMetrics(HttpServletRequest request) {
        return ParameterUtils.getBooleanParam(request, CLEAR_METRICS_PARAM, true);
    }

    private static boolean isKafkaAssignerMode(HttpServletRequest request) {
        return ParameterUtils.getBooleanParam(request, KAFKA_ASSIGNER_MODE_PARAM, false);
    }

    static boolean isRebalanceDiskMode(HttpServletRequest request) {
        return ParameterUtils.getBooleanParam(request, REBALANCE_DISK_MODE_PARAM, false);
    }

    static boolean populateDiskInfo(HttpServletRequest request) {
        return ParameterUtils.getBooleanParam(request, POPULATE_DISK_INFO_PARAM, false);
    }

    static boolean capacityOnly(HttpServletRequest request) {
        Set<String> excludeParameters = Collections.unmodifiableSet(new HashSet<String>(Arrays.asList(TIME_PARAM, END_MS_PARAM, START_MS_PARAM, ALLOW_CAPACITY_ESTIMATION_PARAM, POPULATE_DISK_INFO_PARAM)));
        return ParameterUtils.getBooleanExcludeGiven(request, CAPACITY_ONLY_PARAM, excludeParameters);
    }

    static boolean ignoreProposalCache(HttpServletRequest request) {
        return ParameterUtils.getBooleanExcludeGiven(request, IGNORE_PROPOSAL_CACHE_PARAM, Collections.singleton(GOALS_PARAM));
    }

    static boolean useReadyDefaultGoals(HttpServletRequest request) {
        return ParameterUtils.getBooleanExcludeGiven(request, USE_READY_DEFAULT_GOALS_PARAM, Collections.singleton(GOALS_PARAM));
    }

    static boolean getDryRun(HttpServletRequest request) {
        return ParameterUtils.getBooleanParam(request, DRY_RUN_PARAM, true);
    }

    static boolean forceExecutionStop(HttpServletRequest request) {
        return ParameterUtils.getBooleanParam(request, FORCE_STOP_PARAM, false);
    }

    static boolean throttleAddedOrRemovedBrokers(HttpServletRequest request, EndPoint endPoint) {
        return endPoint == CruiseControlEndPoint.ADD_BROKER ? ParameterUtils.getBooleanParam(request, THROTTLE_ADDED_BROKER_PARAM, true) : ParameterUtils.getBooleanParam(request, THROTTLE_REMOVED_BROKER_PARAM, true);
    }

    static Long replicationThrottle(HttpServletRequest request, KafkaCruiseControlConfig config) {
        Long value = ParameterUtils.getLongParam(request, REPLICATION_THROTTLE_PARAM, config.getLong("default.replication.throttle"));
        if (value != null && value < 0L) {
            throw new UserRequestException(String.format("Requested rebalance throttle must be non-negative (Requested: %s).", value));
        }
        return value;
    }

    static Long time(HttpServletRequest request) {
        String parameterString = ParameterUtils.caseSensitiveParameterName(request.getParameterMap(), TIME_PARAM);
        if (parameterString == null) {
            return null;
        }
        if (ParameterUtils.caseSensitiveParameterName(request.getParameterMap(), END_MS_PARAM) != null) {
            throw new UserRequestException(String.format("Parameter %s and parameter %s are mutually exclusive and should not be specified in the same request.", TIME_PARAM, END_MS_PARAM));
        }
        String timeString = request.getParameter(parameterString);
        return timeString.toUpperCase().equals("NOW") ? System.currentTimeMillis() : Long.parseLong(timeString);
    }

    static Long startMsOrDefault(HttpServletRequest request, @Nullable Long defaultIfMissing) {
        return ParameterUtils.getLongParam(request, START_MS_PARAM, defaultIfMissing);
    }

    static Long endMsOrDefault(HttpServletRequest request, @Nullable Long defaultIfMissing) {
        return ParameterUtils.getLongParam(request, END_MS_PARAM, defaultIfMissing);
    }

    static void validateTimeRange(long startMs, long endMs) {
        if (startMs >= endMs) {
            throw new UserRequestException(String.format("Invalid time range. Start time must be smaller than end time. Got: [%d, %d]", startMs, endMs));
        }
    }

    static Pattern topic(HttpServletRequest request) {
        String parameterString = ParameterUtils.caseSensitiveParameterName(request.getParameterMap(), TOPIC_PARAM);
        return parameterString == null ? null : Pattern.compile(request.getParameter(parameterString));
    }

    private static Map<Short, Pattern> topicPatternByReplicationFactorFromBody(HttpServletRequest request) {
        HashMap<Short, Pattern> topicPatternByReplicationFactor;
        try {
            Gson gson = new Gson();
            Map json = (Map)gson.fromJson((Reader)request.getReader(), Map.class);
            if (json == null) {
                return null;
            }
            String replicationFactorKey = TopicConfigurationParameters.TopicConfigurationType.REPLICATION_FACTOR.name().toLowerCase();
            if (!json.containsKey(replicationFactorKey)) {
                return null;
            }
            Map replicationFactorParams = (Map)json.get(replicationFactorKey);
            if (!replicationFactorParams.containsKey(TOPIC_BY_REPLICATION_FACTOR)) {
                return null;
            }
            topicPatternByReplicationFactor = new HashMap<Short, Pattern>();
            for (Map.Entry entry : ((Map)replicationFactorParams.get(TOPIC_BY_REPLICATION_FACTOR)).entrySet()) {
                short replicationFactor = Short.parseShort(((String)entry.getKey()).trim());
                Pattern topicPattern = Pattern.compile(((String)entry.getValue()).trim());
                topicPatternByReplicationFactor.putIfAbsent(replicationFactor, topicPattern);
            }
        }
        catch (IOException ioe) {
            throw new UserRequestException(String.format("Illegal value for field %s in body, please specify in pairs of \"target replication factor\" : \"topic name regex\".", TOPIC_BY_REPLICATION_FACTOR));
        }
        return topicPatternByReplicationFactor;
    }

    static Map<Short, Pattern> topicPatternByReplicationFactor(HttpServletRequest request) {
        Pattern topic = ParameterUtils.topic(request);
        Short replicationFactor = ParameterUtils.replicationFactor(request);
        Map<Short, Pattern> topicPatternByReplicationFactorFromBody = ParameterUtils.topicPatternByReplicationFactorFromBody(request);
        if (topicPatternByReplicationFactorFromBody != null) {
            if (topic != null || replicationFactor != null) {
                throw new UserRequestException("Requesting topic replication factor change from both HTTP request parameter and body is forbidden.");
            }
            return topicPatternByReplicationFactorFromBody;
        }
        if (topic == null && replicationFactor != null) {
            throw new UserRequestException("Topic is not specified in URL while target replication factor is specified.");
        }
        if (topic != null && replicationFactor == null) {
            throw new UserRequestException("Topic's replication factor is not specified in URL while subject topic is specified.");
        }
        if (topic != null) {
            return Collections.singletonMap(replicationFactor, topic);
        }
        return Collections.emptyMap();
    }

    static Double minValidPartitionRatio(HttpServletRequest request) {
        String parameterString = ParameterUtils.caseSensitiveParameterName(request.getParameterMap(), MIN_VALID_PARTITION_RATIO_PARAM);
        if (parameterString == null) {
            return null;
        }
        double minValidPartitionRatio = Double.parseDouble(request.getParameter(parameterString));
        if (minValidPartitionRatio > 1.0 || minValidPartitionRatio < 0.0) {
            throw new UserRequestException("The requested minimum partition ratio must be in range [0.0, 1.0] (Requested: " + minValidPartitionRatio + ").");
        }
        return minValidPartitionRatio;
    }

    public static String resourceString(HttpServletRequest request) {
        String parameterString = ParameterUtils.caseSensitiveParameterName(request.getParameterMap(), RESOURCE_PARAM);
        return parameterString == null ? DEFAULT_PARTITION_LOAD_RESOURCE : request.getParameter(parameterString);
    }

    public static String reason(HttpServletRequest request, boolean reasonRequired) {
        String parameterString = ParameterUtils.caseSensitiveParameterName(request.getParameterMap(), REASON_PARAM);
        if (parameterString != null && parameterString.length() > 50) {
            throw new UserRequestException(String.format("Reason cannot be longer than %d characters (attempted: %d).", 50, parameterString.length()));
        }
        String ip = KafkaCruiseControlServletUtils.getClientIpAddress(request);
        if (parameterString == null && reasonRequired) {
            throw new UserRequestException("Reason is missing in request.");
        }
        return String.format("%s (Client: %s, Date: %s)", parameterString == null ? NO_REASON_PROVIDED : request.getParameter(parameterString), ip, KafkaCruiseControlUtils.currentUtcDate());
    }

    public static Set<String> parseParamToStringSet(HttpServletRequest request, String param) throws UnsupportedEncodingException {
        String parameterString = ParameterUtils.caseSensitiveParameterName(request.getParameterMap(), param);
        HashSet<String> paramsString = parameterString == null ? new HashSet<String>(0) : new HashSet<String>(Arrays.asList(ParameterUtils.urlDecode(request.getParameter(parameterString)).split(",")));
        paramsString.removeIf(String::isEmpty);
        return paramsString;
    }

    public static Set<Integer> parseParamToIntegerSet(HttpServletRequest request, String param) throws UnsupportedEncodingException {
        String parameterString = ParameterUtils.caseSensitiveParameterName(request.getParameterMap(), param);
        return parameterString == null ? new HashSet<Integer>(0) : Arrays.stream(ParameterUtils.urlDecode(request.getParameter(parameterString)).split(",")).map(Integer::parseInt).collect(Collectors.toSet());
    }

    static Set<CruiseControlState.SubState> substates(HttpServletRequest request) throws UnsupportedEncodingException {
        Set<String> substatesString = ParameterUtils.parseParamToStringSet(request, SUBSTATES_PARAM);
        HashSet<CruiseControlState.SubState> substates = new HashSet<CruiseControlState.SubState>(substatesString.size());
        try {
            for (String substateString : substatesString) {
                substates.add(CruiseControlState.SubState.valueOf(substateString.toUpperCase()));
            }
        }
        catch (IllegalArgumentException iae) {
            throw new UserRequestException(String.format("Unsupported substates in %s. Supported: %s", substatesString, CruiseControlState.SubState.cachedValues()));
        }
        return Collections.unmodifiableSet(substates);
    }

    private static Set<AnomalyType> anomalyTypes(HttpServletRequest request, boolean isEnable) throws UnsupportedEncodingException {
        Set<String> selfHealingForString = ParameterUtils.parseParamToStringSet(request, isEnable ? ENABLE_SELF_HEALING_FOR_PARAM : DISABLE_SELF_HEALING_FOR_PARAM);
        HashSet<KafkaAnomalyType> anomalyTypes = new HashSet<KafkaAnomalyType>(selfHealingForString.size());
        try {
            for (String shfString : selfHealingForString) {
                anomalyTypes.add(KafkaAnomalyType.valueOf(shfString.toUpperCase()));
            }
        }
        catch (IllegalArgumentException iae) {
            throw new UserRequestException(String.format("Unsupported anomaly types in %s. Supported: %s", selfHealingForString, KafkaAnomalyType.cachedValues()));
        }
        return Collections.unmodifiableSet(anomalyTypes);
    }

    private static Set<ConcurrencyType> concurrencyTypes(HttpServletRequest request, boolean isEnable) throws UnsupportedEncodingException {
        Set<String> concurrencyForStringSet = ParameterUtils.parseParamToStringSet(request, isEnable ? ENABLE_CONCURRENCY_ADJUSTER_FOR_PARAM : DISABLE_CONCURRENCY_ADJUSTER_FOR_PARAM);
        HashSet<ConcurrencyType> concurrencyTypes = new HashSet<ConcurrencyType>(concurrencyForStringSet.size());
        try {
            for (String concurrencyForString : concurrencyForStringSet) {
                concurrencyTypes.add(ConcurrencyType.valueOf(concurrencyForString.toUpperCase()));
            }
        }
        catch (IllegalArgumentException iae) {
            throw new UserRequestException(String.format("Unsupported concurrency types in %s. Supported: %s", concurrencyForStringSet, ConcurrencyType.cachedValues()));
        }
        return Collections.unmodifiableSet(concurrencyTypes);
    }

    static Map<Boolean, Set<AnomalyType>> selfHealingFor(HttpServletRequest request) throws UnsupportedEncodingException {
        Set<AnomalyType> enableSelfHealingFor = ParameterUtils.anomalyTypes(request, true);
        Set<AnomalyType> disableSelfHealingFor = ParameterUtils.anomalyTypes(request, false);
        ParameterUtils.ensureDisjoint(enableSelfHealingFor, disableSelfHealingFor, "The same anomaly cannot be specified in both disable and enable parameters");
        HashMap<Boolean, Set<AnomalyType>> selfHealingFor = new HashMap<Boolean, Set<AnomalyType>>(2);
        selfHealingFor.put(true, enableSelfHealingFor);
        selfHealingFor.put(false, disableSelfHealingFor);
        return selfHealingFor;
    }

    static Map<Boolean, Set<ConcurrencyType>> concurrencyAdjusterFor(HttpServletRequest request) throws UnsupportedEncodingException {
        Set<ConcurrencyType> enableConcurrencyAdjusterFor = ParameterUtils.concurrencyTypes(request, true);
        Set<ConcurrencyType> disableConcurrencyAdjusterFor = ParameterUtils.concurrencyTypes(request, false);
        ParameterUtils.ensureDisjoint(enableConcurrencyAdjusterFor, disableConcurrencyAdjusterFor, "The same concurrency type cannot be specified in both disable and enable parameters");
        HashMap<Boolean, Set<ConcurrencyType>> concurrencyAdjusterFor = new HashMap<Boolean, Set<ConcurrencyType>>(2);
        concurrencyAdjusterFor.put(true, enableConcurrencyAdjusterFor);
        concurrencyAdjusterFor.put(false, disableConcurrencyAdjusterFor);
        return concurrencyAdjusterFor;
    }

    static <E> void ensureDisjoint(Set<E> set1, Set<E> set2, String errorMessage) {
        HashSet<E> intersection = new HashSet<E>(set1);
        intersection.retainAll(set2);
        if (!intersection.isEmpty()) {
            throw new UserRequestException(String.format("%s. Intersection: %s.", errorMessage, intersection));
        }
    }

    static String urlDecode(String s) throws UnsupportedEncodingException {
        return s == null ? null : URLDecoder.decode(s, StandardCharsets.UTF_8.name());
    }

    static ReplicaMovementStrategy getReplicaMovementStrategy(HttpServletRequest request, KafkaCruiseControlConfig config) throws UnsupportedEncodingException {
        if (ParameterUtils.getDryRun(request)) {
            return null;
        }
        List<String> strategies = ParameterUtils.getListParam(request, REPLICA_MOVEMENT_STRATEGIES_PARAM);
        if (strategies.isEmpty()) {
            return null;
        }
        List<ReplicaMovementStrategy> supportedStrategies = config.getConfiguredInstances("replica.movement.strategies", ReplicaMovementStrategy.class);
        TreeMap<String, ReplicaMovementStrategy> supportedStrategiesByName = new TreeMap<String, ReplicaMovementStrategy>(String.CASE_INSENSITIVE_ORDER);
        for (ReplicaMovementStrategy strategy : supportedStrategies) {
            supportedStrategiesByName.put(strategy.name(), strategy);
        }
        ReplicaMovementStrategy strategy = null;
        for (String strategyName : strategies) {
            if (supportedStrategiesByName.containsKey(strategyName)) {
                strategy = strategy == null ? (ReplicaMovementStrategy)supportedStrategiesByName.get(strategyName) : strategy.chain((ReplicaMovementStrategy)supportedStrategiesByName.get(strategyName));
                continue;
            }
            throw new UserRequestException("Strategy " + strategyName + " is not supported. Supported: " + supportedStrategiesByName.keySet());
        }
        return strategy.chain(new BaseReplicaMovementStrategy());
    }

    static List<String> getGoals(HttpServletRequest request) throws UnsupportedEncodingException {
        boolean isKafkaAssignerMode = ParameterUtils.isKafkaAssignerMode(request);
        boolean isRebalanceDiskMode = ParameterUtils.isRebalanceDiskMode(request);
        if (isKafkaAssignerMode && isRebalanceDiskMode) {
            throw new UserRequestException("Kafka assigner mode and rebalance disk mode cannot be set the at the same time.");
        }
        List<String> goals = ParameterUtils.getListParam(request, GOALS_PARAM);
        if (isKafkaAssignerMode) {
            if (!goals.isEmpty()) {
                throw new UserRequestException("Kafka assigner mode does not support explicitly specifying goals in request.");
            }
            return Collections.unmodifiableList(Arrays.asList(KafkaAssignerEvenRackAwareGoal.class.getSimpleName(), KafkaAssignerDiskUsageDistributionGoal.class.getSimpleName()));
        }
        if (isRebalanceDiskMode) {
            if (!goals.isEmpty()) {
                throw new UserRequestException("Rebalance disk mode does not support explicitly specifying goals in request.");
            }
            return Collections.unmodifiableList(Arrays.asList(IntraBrokerDiskCapacityGoal.class.getSimpleName(), IntraBrokerDiskUsageDistributionGoal.class.getSimpleName()));
        }
        return goals;
    }

    public static int entries(HttpServletRequest request) {
        int entries;
        String parameterString = ParameterUtils.caseSensitiveParameterName(request.getParameterMap(), ENTRIES_PARAM);
        int n = entries = parameterString == null ? Integer.MAX_VALUE : Integer.parseInt(request.getParameter(parameterString));
        if (entries <= 0) {
            throw new UserRequestException("The requested entries must be positive (Requested: " + entries + ").");
        }
        return entries;
    }

    private static Set<Integer> getNegatives(Set<Integer> values) {
        return values.stream().filter(v -> v < 0).collect(Collectors.toCollection(() -> new HashSet(values.size())));
    }

    public static Set<Integer> reviewIds(HttpServletRequest request) throws UnsupportedEncodingException {
        Set<Integer> reviewIds = ParameterUtils.parseParamToIntegerSet(request, REVIEW_IDS_PARAM);
        Set<Integer> negativeReviewIds = ParameterUtils.getNegatives(reviewIds);
        if (!negativeReviewIds.isEmpty()) {
            throw new UserRequestException(String.format("%s cannot contain negative values (requested: %s).", REVIEW_IDS_PARAM, negativeReviewIds));
        }
        return Collections.unmodifiableSet(reviewIds);
    }

    public static Integer reviewId(HttpServletRequest request, boolean twoStepVerificationEnabled) {
        String parameterString = ParameterUtils.caseSensitiveParameterName(request.getParameterMap(), REVIEW_ID_PARAM);
        if (parameterString == null) {
            return null;
        }
        if (!twoStepVerificationEnabled) {
            throw new UserRequestException(String.format("%s parameter is not relevant when two-step verification is disabled.", REVIEW_ID_PARAM));
        }
        Integer reviewId = Integer.parseInt(request.getParameter(parameterString));
        if (request.getParameterMap().size() != 1) {
            throw new UserRequestException(String.format("%s parameter must be mutually exclusive with other parameters (Request parameters: %s).", REVIEW_ID_PARAM, request.getParameterMap()));
        }
        if (reviewId < 0) {
            throw new UserRequestException(String.format("%s cannot be negative (requested: %d).", REVIEW_ID_PARAM, reviewId));
        }
        return reviewId;
    }

    static Long executionProgressCheckIntervalMs(HttpServletRequest request) {
        return ParameterUtils.getLongParam(request, EXECUTION_PROGRESS_CHECK_INTERVAL_MS_PARAM, null);
    }

    static Integer concurrentMovements(HttpServletRequest request, boolean isInterBrokerPartitionMovement, boolean isIntraBrokerPartitionMovement) {
        String parameterString = ParameterUtils.caseSensitiveParameterName(request.getParameterMap(), isInterBrokerPartitionMovement ? CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_PARAM : (isIntraBrokerPartitionMovement ? CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_PARAM : CONCURRENT_LEADER_MOVEMENTS_PARAM));
        if (parameterString == null) {
            return null;
        }
        int concurrentMovementsPerBroker = Integer.parseInt(request.getParameter(parameterString));
        if (concurrentMovementsPerBroker <= 0) {
            throw new UserRequestException("The requested movement concurrency must be positive (Requested: " + concurrentMovementsPerBroker + ").");
        }
        return concurrentMovementsPerBroker;
    }

    static Pattern excludedTopics(HttpServletRequest request) {
        String parameterString = ParameterUtils.caseSensitiveParameterName(request.getParameterMap(), EXCLUDED_TOPICS_PARAM);
        return parameterString == null ? null : Pattern.compile(request.getParameter(parameterString));
    }

    static int partitionBoundary(HttpServletRequest request, boolean isUpperBound) {
        String parameterString = ParameterUtils.caseSensitiveParameterName(request.getParameterMap(), PARTITION_PARAM);
        if (parameterString == null) {
            return isUpperBound ? Integer.MAX_VALUE : Integer.MIN_VALUE;
        }
        String partitionString = request.getParameter(parameterString);
        if (!partitionString.contains(DELIMITER_BETWEEN_BROKER_ID_AND_LOGDIR)) {
            return Integer.parseInt(partitionString);
        }
        String[] boundaries = partitionString.split(DELIMITER_BETWEEN_BROKER_ID_AND_LOGDIR);
        if (boundaries.length > 2) {
            throw new UserRequestException("The partition parameter cannot contain multiple dashes.");
        }
        return Integer.parseInt(boundaries[isUpperBound ? 1 : 0]);
    }

    static Set<Integer> brokerIds(HttpServletRequest request, boolean isOptional) throws UnsupportedEncodingException {
        Set<Integer> brokerIds = ParameterUtils.parseParamToIntegerSet(request, BROKER_ID_PARAM);
        if (!isOptional && brokerIds.isEmpty()) {
            CruiseControlEndPoint endpoint = ParameterUtils.endPoint(request);
            if (endpoint == CruiseControlEndPoint.DEMOTE_BROKER) {
                if (ParameterUtils.brokerIdAndLogdirs(request).isEmpty()) {
                    throw new UserRequestException("No target broker ID or target disk logdir is specified to demote.");
                }
            } else if (endpoint != CruiseControlEndPoint.FIX_OFFLINE_REPLICAS) {
                throw new UserRequestException(String.format("Target broker ID is not provided for %s request", new Object[]{endpoint}));
            }
        }
        return Collections.unmodifiableSet(brokerIds);
    }

    static Set<Integer> dropRecentlyRemovedBrokers(HttpServletRequest request) throws UnsupportedEncodingException {
        return Collections.unmodifiableSet(ParameterUtils.parseParamToIntegerSet(request, DROP_RECENTLY_REMOVED_BROKERS_PARAM));
    }

    static Set<Integer> dropRecentlyDemotedBrokers(HttpServletRequest request) throws UnsupportedEncodingException {
        return Collections.unmodifiableSet(ParameterUtils.parseParamToIntegerSet(request, DROP_RECENTLY_DEMOTED_BROKERS_PARAM));
    }

    static Set<Integer> destinationBrokerIds(HttpServletRequest request) throws UnsupportedEncodingException {
        Set<Integer> brokerIds = Collections.unmodifiableSet(ParameterUtils.parseParamToIntegerSet(request, DESTINATION_BROKER_IDS_PARAM));
        if (!brokerIds.isEmpty()) {
            if (ParameterUtils.isKafkaAssignerMode(request)) {
                throw new UserRequestException("Kafka assigner mode does not support explicitly specifying destination broker ids.");
            }
            ParameterUtils.ensureDisjoint(ParameterUtils.parseParamToIntegerSet(request, BROKER_ID_PARAM), brokerIds, "No overlap is allowed between the specified destination broker ids and broker ids");
        }
        return brokerIds;
    }

    private static Set<Integer> review(HttpServletRequest request, boolean isApprove) throws UnsupportedEncodingException {
        Set<Integer> parsedReview = ParameterUtils.parseParamToIntegerSet(request, isApprove ? APPROVE_PARAM : DISCARD_PARAM);
        return Collections.unmodifiableSet(parsedReview);
    }

    static Map<ReviewStatus, Set<Integer>> reviewRequests(HttpServletRequest request) throws UnsupportedEncodingException {
        Set<Integer> approve = ParameterUtils.review(request, true);
        Set<Integer> discard = ParameterUtils.review(request, false);
        ParameterUtils.ensureDisjoint(approve, discard, "The same request cannot be specified in both approve and discard parameters");
        if (approve.isEmpty() && discard.isEmpty()) {
            throw new UserRequestException(String.format("%s endpoint requires at least one of '%s' or '%s' parameter.", new Object[]{CruiseControlEndPoint.REVIEW, APPROVE_PARAM, DISCARD_PARAM}));
        }
        HashMap<ReviewStatus, Set<Integer>> reviewRequest = new HashMap<ReviewStatus, Set<Integer>>(2);
        reviewRequest.put(ReviewStatus.APPROVED, approve);
        reviewRequest.put(ReviewStatus.DISCARDED, discard);
        return reviewRequest;
    }

    static Map<Integer, Set<String>> brokerIdAndLogdirs(HttpServletRequest request) throws UnsupportedEncodingException {
        HashMap brokerIdAndLogdirs = new HashMap();
        String parameterString = ParameterUtils.caseSensitiveParameterName(request.getParameterMap(), BROKER_ID_AND_LOGDIRS_PARAM);
        if (parameterString != null) {
            Arrays.stream(ParameterUtils.urlDecode(request.getParameter(parameterString)).split(",")).forEach(e -> {
                int index = e.indexOf(DELIMITER_BETWEEN_BROKER_ID_AND_LOGDIR);
                int brokerId = Integer.parseInt(e.substring(0, index));
                brokerIdAndLogdirs.putIfAbsent(brokerId, new HashSet());
                ((Set)brokerIdAndLogdirs.get(brokerId)).add(e.substring(index + 1));
            });
        }
        return Collections.unmodifiableMap(brokerIdAndLogdirs);
    }

    public static Set<UUID> userTaskIds(HttpServletRequest request) throws UnsupportedEncodingException {
        String parameterString = ParameterUtils.caseSensitiveParameterName(request.getParameterMap(), USER_TASK_IDS_PARAM);
        return parameterString == null ? Collections.emptySet() : Arrays.stream(ParameterUtils.urlDecode(request.getParameter(parameterString)).split(",")).map(UUID::fromString).collect(Collectors.toSet());
    }

    public static Set<String> clientIds(HttpServletRequest request) throws UnsupportedEncodingException {
        Set<String> parsedClientIds = ParameterUtils.parseParamToStringSet(request, CLIENT_IDS_PARAM);
        return Collections.unmodifiableSet(parsedClientIds);
    }

    public static Set<CruiseControlEndPoint> endPoints(HttpServletRequest request) throws UnsupportedEncodingException {
        Set parsedEndPoints = ParameterUtils.parseParamToStringSet(request, ENDPOINTS_PARAM).stream().map(String::toUpperCase).collect(Collectors.toSet());
        HashSet<CruiseControlEndPoint> endPoints = new HashSet<CruiseControlEndPoint>();
        for (CruiseControlEndPoint endPoint : CruiseControlEndPoint.cachedValues()) {
            if (!parsedEndPoints.contains(endPoint.toString())) continue;
            endPoints.add(endPoint);
        }
        return Collections.unmodifiableSet(endPoints);
    }

    public static Set<UserTaskManager.TaskState> types(HttpServletRequest request) throws UnsupportedEncodingException {
        Set<String> parsedTaskStates = ParameterUtils.parseParamToStringSet(request, TYPES_PARAM);
        HashSet<UserTaskManager.TaskState> taskStates = new HashSet<UserTaskManager.TaskState>();
        for (UserTaskManager.TaskState state : UserTaskManager.TaskState.cachedValues()) {
            if (!parsedTaskStates.contains(state.toString())) continue;
            taskStates.add(state);
        }
        return Collections.unmodifiableSet(taskStates);
    }

    static DataFrom getDataFrom(HttpServletRequest request) {
        DataFrom dataFrom = DataFrom.VALID_WINDOWS;
        String parameterString = ParameterUtils.caseSensitiveParameterName(request.getParameterMap(), DATA_FROM_PARAM);
        if (parameterString != null) {
            dataFrom = DataFrom.valueOf(request.getParameter(parameterString).toUpperCase());
        }
        return dataFrom;
    }

    static boolean skipHardGoalCheck(HttpServletRequest request) {
        return ParameterUtils.isKafkaAssignerMode(request) || ParameterUtils.isRebalanceDiskMode(request) || ParameterUtils.getBooleanParam(request, SKIP_HARD_GOAL_CHECK_PARAM, false);
    }

    static boolean skipUrpDemotion(HttpServletRequest request) {
        return ParameterUtils.getBooleanParam(request, SKIP_URP_DEMOTION_PARAM, false);
    }

    static boolean excludeFollowerDemotion(HttpServletRequest request) {
        return ParameterUtils.getBooleanParam(request, EXCLUDE_FOLLOWER_DEMOTION_PARAM, true);
    }

    static Short replicationFactor(HttpServletRequest request) {
        String parameterString = ParameterUtils.caseSensitiveParameterName(request.getParameterMap(), REPLICATION_FACTOR_PARAM);
        if (parameterString == null) {
            return null;
        }
        return Short.parseShort(request.getParameter(parameterString));
    }

    static boolean fetchCompletedTask(HttpServletRequest request) {
        return ParameterUtils.getBooleanParam(request, FETCH_COMPLETED_TASK_PARAM, false);
    }

    public static boolean areAllParametersNull(CruiseControlParameters ... parameters) {
        return Arrays.stream(parameters).allMatch(Objects::isNull);
    }

    public static enum DataFrom {
        VALID_WINDOWS,
        VALID_PARTITIONS;

    }
}

