/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.api.resources;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiKeyAuthDefinition;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Executor;
import javax.inject.Inject;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.model.InstanceConfig;
import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.pinot.common.http.MultiHttpRequest;
import org.apache.pinot.common.http.MultiHttpRequestResponse;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.resources.InstanceInfo;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Api(tags={"Query"}, authorizations={@Authorization(value="oauth")})
@SwaggerDefinition(securityDefinition=@SecurityDefinition(apiKeyAuthDefinitions={@ApiKeyAuthDefinition(name="Authorization", in=ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key="oauth")}))
@Path(value="/")
public class PinotRunningQueryResource {
    private static final Logger LOGGER = LoggerFactory.getLogger(PinotRunningQueryResource.class);
    @Inject
    PinotHelixResourceManager _pinotHelixResourceManager;
    @Inject
    ControllerConf _controllerConf;
    @Inject
    private Executor _executor;
    @Inject
    private HttpClientConnectionManager _httpConnMgr;

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @DELETE
    @Path(value="query/{brokerId}/{queryId}")
    @Authorize(targetType=TargetType.CLUSTER, action="CancelQuery")
    @Produces(value={"application/json"})
    @ApiOperation(value="Cancel a query as identified by the queryId", notes="No effect if no query exists for the given queryId on the requested broker. Query may continue to run for a short while after calling cancel as it's done in a non-blocking manner. The cancel method can be called multiple times.")
    @ApiResponses(value={@ApiResponse(code=200, message="Success"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=404, message="Query not found on the requested broker")})
    public String cancelQuery(@ApiParam(value="Broker that's running the query", required=true) @PathParam(value="brokerId") String brokerId, @ApiParam(value="QueryId as assigned by the broker", required=true) @PathParam(value="queryId") long queryId, @ApiParam(value="Timeout for servers to respond the cancel request") @QueryParam(value="timeoutMs") @DefaultValue(value="3000") int timeoutMs, @ApiParam(value="Return verbose responses for troubleshooting") @QueryParam(value="verbose") @DefaultValue(value="false") boolean verbose, @Context HttpHeaders httpHeaders) {
        InstanceConfig broker = this._pinotHelixResourceManager.getHelixInstanceConfig(brokerId);
        if (broker == null) {
            throw new WebApplicationException(Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)("Unknown broker: " + brokerId)).build());
        }
        try {
            RequestConfig defaultRequestConfig = RequestConfig.custom().setConnectionRequestTimeout(timeoutMs).setSocketTimeout(timeoutMs).build();
            CloseableHttpClient client = HttpClients.custom().setConnectionManager(this._httpConnMgr).setDefaultRequestConfig(defaultRequestConfig).build();
            String protocol = this._controllerConf.getControllerBrokerProtocol();
            int portOverride = this._controllerConf.getControllerBrokerPortOverride();
            int port = portOverride > 0 ? portOverride : Integer.parseInt(broker.getPort());
            HttpDelete deleteMethod = new HttpDelete(String.format("%s://%s:%d/query/%d?verbose=%b", protocol, broker.getHostName(), port, queryId, verbose));
            try {
                Map<String, String> requestHeaders = PinotRunningQueryResource.createRequestHeaders(httpHeaders);
                requestHeaders.forEach((arg_0, arg_1) -> ((HttpDelete)deleteMethod).setHeader(arg_0, arg_1));
                CloseableHttpResponse response = client.execute((HttpUriRequest)deleteMethod);
                int status = response.getStatusLine().getStatusCode();
                String responseContent = EntityUtils.toString((HttpEntity)response.getEntity());
                if (status == 200) {
                    String string = responseContent;
                    return string;
                }
                if (status == 404) {
                    throw new WebApplicationException(Response.status((Response.Status)Response.Status.NOT_FOUND).entity((Object)String.format("Query: %s not found on the broker: %s", queryId, brokerId)).build());
                }
                throw new WebApplicationException(Response.status((Response.Status)Response.Status.INTERNAL_SERVER_ERROR).entity((Object)String.format("Failed to cancel query: %s on the broker: %s with unexpected status=%d and resp='%s'", queryId, brokerId, status, responseContent)).build());
            }
            finally {
                deleteMethod.releaseConnection();
            }
        }
        catch (WebApplicationException e) {
            throw e;
        }
        catch (Exception e) {
            throw new WebApplicationException(Response.status((Response.Status)Response.Status.INTERNAL_SERVER_ERROR).entity((Object)String.format("Failed to cancel query: %s on the broker: %s due to error: %s", queryId, brokerId, e.getMessage())).build());
        }
    }

    @GET
    @Path(value="/queries")
    @Authorize(targetType=TargetType.CLUSTER, action="GetRunningQuery")
    @Produces(value={"application/json"})
    @ApiOperation(value="Get running queries from all brokers", notes="The queries are returned with brokers running them")
    @ApiResponses(value={@ApiResponse(code=200, message="Success"), @ApiResponse(code=500, message="Internal server error")})
    public Map<String, Map<String, String>> getRunningQueries(@ApiParam(value="Timeout for brokers to return running queries") @QueryParam(value="timeoutMs") @DefaultValue(value="3000") int timeoutMs, @Context HttpHeaders httpHeaders) {
        try {
            Map<String, List<InstanceInfo>> tableBrokers = this._pinotHelixResourceManager.getTableToLiveBrokersMapping();
            HashMap<String, InstanceInfo> brokers = new HashMap<String, InstanceInfo>();
            tableBrokers.values().forEach(list -> list.forEach(info -> brokers.putIfAbsent(PinotRunningQueryResource.getInstanceKey(info), (InstanceInfo)info)));
            return this.getRunningQueries(brokers, timeoutMs, PinotRunningQueryResource.createRequestHeaders(httpHeaders));
        }
        catch (Exception e) {
            throw new WebApplicationException(Response.status((Response.Status)Response.Status.INTERNAL_SERVER_ERROR).entity((Object)("Failed to get running queries due to error: " + e.getMessage())).build());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Map<String, Map<String, String>> getRunningQueries(Map<String, InstanceInfo> brokers, int timeoutMs, Map<String, String> requestHeaders) throws Exception {
        String protocol = this._controllerConf.getControllerBrokerProtocol();
        int portOverride = this._controllerConf.getControllerBrokerPortOverride();
        ArrayList<String> brokerUrls = new ArrayList<String>();
        for (InstanceInfo broker : brokers.values()) {
            int port = portOverride > 0 ? portOverride : broker.getPort();
            brokerUrls.add(String.format("%s://%s:%d/queries", protocol, broker.getHost(), port));
        }
        LOGGER.debug("Getting running queries via broker urls: {}", brokerUrls);
        CompletionService completionService = new MultiHttpRequest(this._executor, this._httpConnMgr).execute(brokerUrls, requestHeaders, timeoutMs);
        HashMap<String, Map<String, String>> queriesByBroker = new HashMap<String, Map<String, String>>();
        ArrayList<String> errMsgs = new ArrayList<String>(brokerUrls.size());
        for (int i = 0; i < brokerUrls.size(); ++i) {
            try (MultiHttpRequestResponse httpRequestResponse = null;){
                httpRequestResponse = (MultiHttpRequestResponse)completionService.take().get();
                URI uri = httpRequestResponse.getURI();
                int status = httpRequestResponse.getResponse().getStatusLine().getStatusCode();
                String responseString = EntityUtils.toString((HttpEntity)httpRequestResponse.getResponse().getEntity());
                if (status != 200) {
                    throw new Exception(String.format("Unexpected status=%d and response='%s' from uri='%s'", status, responseString, uri));
                }
                queriesByBroker.put(brokers.get(PinotRunningQueryResource.getInstanceKey(uri)).getInstanceName(), (Map)JsonUtils.stringToObject((String)responseString, Map.class));
                continue;
            }
        }
        if (errMsgs.size() > 0) {
            throw new Exception("Unexpected responses from brokers: " + StringUtils.join(errMsgs, (String)","));
        }
        return queriesByBroker;
    }

    private static String getInstanceKey(InstanceInfo info) {
        return info.getHost() + ":" + info.getPort();
    }

    private static String getInstanceKey(URI uri) throws Exception {
        return uri.getHost() + ":" + uri.getPort();
    }

    private static Map<String, String> createRequestHeaders(HttpHeaders httpHeaders) {
        HashMap<String, String> requestHeaders = new HashMap<String, String>();
        httpHeaders.getRequestHeaders().keySet().forEach(header -> requestHeaders.put((String)header, httpHeaders.getHeaderString(header)));
        return requestHeaders;
    }
}

