/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.api.resources;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.calcite.jdbc.CalciteSchemaBuilder;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.sql.SqlNode;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.access.AccessControl;
import org.apache.pinot.controller.api.access.AccessControlFactory;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.core.auth.ManualAuthorization;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.catalog.PinotCatalog;
import org.apache.pinot.query.type.TypeFactory;
import org.apache.pinot.query.type.TypeSystem;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.sql.parsers.PinotSqlType;
import org.apache.pinot.sql.parsers.SqlCompilationException;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path(value="/")
public class PinotQueryResource {
    private static final Logger LOGGER = LoggerFactory.getLogger(PinotQueryResource.class);
    private static final Random RANDOM = new Random();
    @Inject
    SqlQueryExecutor _sqlQueryExecutor;
    @Inject
    PinotHelixResourceManager _pinotHelixResourceManager;
    @Inject
    AccessControlFactory _accessControlFactory;
    @Inject
    ControllerConf _controllerConf;

    @POST
    @Path(value="sql")
    @ManualAuthorization
    public String handlePostSql(String requestJsonStr, @Context HttpHeaders httpHeaders) {
        try {
            JsonNode requestJson = JsonUtils.stringToJsonNode((String)requestJsonStr);
            String sqlQuery = requestJson.get("sql").asText();
            String traceEnabled = "false";
            if (requestJson.has("trace")) {
                traceEnabled = requestJson.get("trace").toString();
            }
            String queryOptions = null;
            if (requestJson.has("queryOptions")) {
                queryOptions = requestJson.get("queryOptions").asText();
            }
            LOGGER.debug("Trace: {}, Running query: {}", (Object)traceEnabled, (Object)sqlQuery);
            return this.executeSqlQuery(httpHeaders, sqlQuery, traceEnabled, queryOptions, "/sql");
        }
        catch (ProcessingException pe) {
            LOGGER.error("Caught exception while processing post request {}", (Object)pe.getMessage());
            return pe.getMessage();
        }
        catch (WebApplicationException wae) {
            LOGGER.error("Caught exception while processing post request", (Throwable)wae);
            throw wae;
        }
        catch (Exception e) {
            LOGGER.error("Caught exception while processing post request", (Throwable)e);
            return QueryException.getException((ProcessingException)QueryException.INTERNAL_ERROR, (Throwable)e).toString();
        }
    }

    @GET
    @Path(value="sql")
    @ManualAuthorization
    public String handleGetSql(@QueryParam(value="sql") String sqlQuery, @QueryParam(value="trace") String traceEnabled, @QueryParam(value="queryOptions") String queryOptions, @Context HttpHeaders httpHeaders) {
        try {
            LOGGER.debug("Trace: {}, Running query: {}", (Object)traceEnabled, (Object)sqlQuery);
            return this.executeSqlQuery(httpHeaders, sqlQuery, traceEnabled, queryOptions, "/sql");
        }
        catch (ProcessingException pe) {
            LOGGER.error("Caught exception while processing get request {}", (Object)pe.getMessage());
            return pe.getMessage();
        }
        catch (WebApplicationException wae) {
            LOGGER.error("Caught exception while processing get request", (Throwable)wae);
            throw wae;
        }
        catch (Exception e) {
            LOGGER.error("Caught exception while processing get request", (Throwable)e);
            return QueryException.getException((ProcessingException)QueryException.INTERNAL_ERROR, (Throwable)e).toString();
        }
    }

    private String executeSqlQuery(@Context HttpHeaders httpHeaders, String sqlQuery, String traceEnabled, @Nullable String queryOptions, String endpointUrl) throws Exception {
        SqlNodeAndOptions sqlNodeAndOptions;
        try {
            sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions((String)sqlQuery);
        }
        catch (SqlCompilationException ex) {
            throw QueryException.getException((ProcessingException)QueryException.SQL_PARSING_ERROR, (Throwable)ex);
        }
        Map options = sqlNodeAndOptions.getOptions();
        if (queryOptions != null) {
            Map optionsFromString = RequestUtils.getOptionsFromString((String)queryOptions);
            sqlNodeAndOptions.setExtraOptions(optionsFromString);
        }
        if (Boolean.parseBoolean((String)options.get("useMultistageEngine"))) {
            if (this._controllerConf.getProperty("pinot.multistage.engine.enabled", true)) {
                return this.getMultiStageQueryResponse(sqlQuery, queryOptions, httpHeaders, endpointUrl, traceEnabled);
            }
            throw new UnsupportedOperationException("V2 Multi-Stage query engine not enabled. Please see https://docs.pinot.apache.org/ for instruction to enable V2 engine.");
        }
        PinotSqlType sqlType = sqlNodeAndOptions.getSqlType();
        switch (sqlType) {
            case DQL: {
                return this.getQueryResponse(sqlQuery, sqlNodeAndOptions.getSqlNode(), traceEnabled, queryOptions, httpHeaders);
            }
            case DML: {
                Map<String, String> headers = httpHeaders.getRequestHeaders().entrySet().stream().filter(entry -> !((List)entry.getValue()).isEmpty()).map(entry -> Pair.of((Object)((String)entry.getKey()), (Object)((String)((List)entry.getValue()).get(0)))).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
                return this._sqlQueryExecutor.executeDMLStatement(sqlNodeAndOptions, headers).toJsonString();
            }
        }
        throw new UnsupportedOperationException("Unsupported SQL type - " + sqlType);
    }

    private String getMultiStageQueryResponse(String query, String queryOptions, HttpHeaders httpHeaders, String endpointUrl, String traceEnabled) {
        List<String> instanceIds;
        AccessControl accessControl = this._accessControlFactory.create();
        if (!accessControl.hasAccess(null, AccessType.READ, httpHeaders, endpointUrl)) {
            throw new WebApplicationException("Permission denied", Response.Status.FORBIDDEN);
        }
        QueryEnvironment queryEnvironment = new QueryEnvironment(new TypeFactory((RelDataTypeSystem)new TypeSystem()), CalciteSchemaBuilder.asRootSchema((Schema)new PinotCatalog(this._pinotHelixResourceManager.getTableCache())), null, null);
        List tableNames = queryEnvironment.getTableNamesForQuery(query);
        if (tableNames.size() != 0) {
            List<TableConfig> tableConfigList = this.getListTableConfigs(tableNames);
            if (tableConfigList == null || tableConfigList.size() == 0) {
                return QueryException.getException((ProcessingException)QueryException.TABLE_DOES_NOT_EXIST_ERROR, (Throwable)new Exception("Unable to find table in cluster, table does not exist")).toString();
            }
            Set<String> brokerTenantsUnion = this.getBrokerTenantsUnion(tableConfigList);
            if (brokerTenantsUnion.isEmpty()) {
                return QueryException.getException((ProcessingException)QueryException.BROKER_REQUEST_SEND_ERROR, (Throwable)new Exception(String.format("Unable to dispatch multistage query for tables: [%s]", tableNames))).toString();
            }
            instanceIds = this.findCommonBrokerInstances(brokerTenantsUnion);
        } else {
            instanceIds = this._pinotHelixResourceManager.getAllBrokerInstances();
            LOGGER.error("Unable to find table name from SQL {} thus dispatching to random broker.", (Object)query);
        }
        String instanceId = this.selectRandomInstanceId(instanceIds);
        return this.sendRequestToBroker(query, instanceId, traceEnabled, queryOptions, httpHeaders);
    }

    private String getQueryResponse(String query, @Nullable SqlNode sqlNode, String traceEnabled, String queryOptions, HttpHeaders httpHeaders) {
        String tableName;
        try {
            String inputTableName = sqlNode != null ? (String)RequestUtils.getTableNames((PinotQuery)CalciteSqlParser.compileSqlNodeToPinotQuery((SqlNode)sqlNode)).iterator().next() : CalciteSqlCompiler.compileToBrokerRequest((String)query).getQuerySource().getTableName();
            tableName = this._pinotHelixResourceManager.getActualTableName(inputTableName);
        }
        catch (Exception e) {
            LOGGER.error("Caught exception while compiling query: {}", (Object)query, (Object)e);
            try {
                LOGGER.info("Trying to compile query {} using multi-stage engine", (Object)query);
                QueryEnvironment queryEnvironment = new QueryEnvironment(new TypeFactory((RelDataTypeSystem)new TypeSystem()), CalciteSchemaBuilder.asRootSchema((Schema)new PinotCatalog(this._pinotHelixResourceManager.getTableCache())), null, null);
                queryEnvironment.getTableNamesForQuery(query);
                LOGGER.info("Successfully compiled query using multi-stage engine: {}", (Object)query);
                return QueryException.getException((ProcessingException)QueryException.SQL_PARSING_ERROR, (Throwable)new Exception("It seems that the query is only supported by the multi-stage engine, please try it by checking the \"Use Multi-Stage Engine\" box above")).toString();
            }
            catch (Exception multipleTablesPassingException) {
                LOGGER.error("Caught exception while compiling query using multi-stage engine: {}", (Object)query, (Object)multipleTablesPassingException);
                return QueryException.getException((ProcessingException)QueryException.SQL_PARSING_ERROR, (Throwable)e).toString();
            }
        }
        String rawTableName = TableNameBuilder.extractRawTableName((String)tableName);
        AccessControl accessControl = this._accessControlFactory.create();
        if (!accessControl.hasDataAccess(httpHeaders, rawTableName)) {
            return QueryException.ACCESS_DENIED_ERROR.toString();
        }
        List<String> instanceIds = this._pinotHelixResourceManager.getBrokerInstancesFor(rawTableName);
        String instanceId = this.selectRandomInstanceId(instanceIds);
        return this.sendRequestToBroker(query, instanceId, traceEnabled, queryOptions, httpHeaders);
    }

    private List<TableConfig> getListTableConfigs(List<String> tableNames) {
        ArrayList<TableConfig> allTableConfigList = new ArrayList<TableConfig>();
        for (String tableName : tableNames) {
            ArrayList<TableConfig> tableConfigList = new ArrayList<TableConfig>();
            if (this._pinotHelixResourceManager.hasRealtimeTable(tableName)) {
                tableConfigList.add(Objects.requireNonNull(this._pinotHelixResourceManager.getRealtimeTableConfig(tableName)));
            }
            if (this._pinotHelixResourceManager.hasOfflineTable(tableName)) {
                tableConfigList.add(Objects.requireNonNull(this._pinotHelixResourceManager.getOfflineTableConfig(tableName)));
            }
            if (tableConfigList.size() == 0) {
                return null;
            }
            allTableConfigList.addAll(tableConfigList);
        }
        return allTableConfigList;
    }

    private String selectRandomInstanceId(List<String> instanceIds) {
        if (instanceIds.isEmpty()) {
            return QueryException.BROKER_RESOURCE_MISSING_ERROR.toString();
        }
        instanceIds.retainAll(this._pinotHelixResourceManager.getOnlineInstanceList());
        if (instanceIds.isEmpty()) {
            return QueryException.BROKER_INSTANCE_MISSING_ERROR.toString();
        }
        return instanceIds.get(RANDOM.nextInt(instanceIds.size()));
    }

    private List<String> findCommonBrokerInstances(Set<String> brokerTenants) {
        Stream<Object> brokerInstanceConfigs = this._pinotHelixResourceManager.getAllBrokerInstanceConfigs().stream();
        for (String brokerTenant : brokerTenants) {
            brokerInstanceConfigs = brokerInstanceConfigs.filter(instanceConfig -> instanceConfig.containsTag(TagNameUtils.getBrokerTagForTenant((String)brokerTenant)));
        }
        return brokerInstanceConfigs.map(InstanceConfig::getInstanceName).collect(Collectors.toList());
    }

    private Set<String> getBrokerTenantsUnion(List<TableConfig> tableConfigList) {
        HashSet<String> tableBrokerTenants = new HashSet<String>();
        for (TableConfig tableConfig : tableConfigList) {
            tableBrokerTenants.add(tableConfig.getTenantConfig().getBroker());
        }
        return tableBrokerTenants;
    }

    private String sendRequestToBroker(String query, String instanceId, String traceEnabled, String queryOptions, HttpHeaders httpHeaders) {
        InstanceConfig instanceConfig = this._pinotHelixResourceManager.getHelixInstanceConfig(instanceId);
        if (instanceConfig == null) {
            LOGGER.error("Instance {} not found", (Object)instanceId);
            return QueryException.INTERNAL_ERROR.toString();
        }
        String hostName = instanceConfig.getHostName();
        if (hostName.startsWith("Broker_")) {
            hostName = hostName.substring(CommonConstants.Helix.BROKER_INSTANCE_PREFIX_LENGTH);
        }
        String protocol = this._controllerConf.getControllerBrokerProtocol();
        int port = this._controllerConf.getControllerBrokerPortOverride() > 0 ? this._controllerConf.getControllerBrokerPortOverride() : Integer.parseInt(instanceConfig.getPort());
        String url = this.getQueryURL(protocol, hostName, port);
        ObjectNode requestJson = this.getRequestJson(query, traceEnabled, queryOptions);
        Map<String, String> headers = httpHeaders.getRequestHeaders().entrySet().stream().filter(entry -> !((List)entry.getValue()).isEmpty()).map(entry -> Pair.of((Object)((String)entry.getKey()), (Object)((String)((List)entry.getValue()).get(0)))).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
        return this.sendRequestRaw(url, query, requestJson, headers);
    }

    private ObjectNode getRequestJson(String query, String traceEnabled, String queryOptions) {
        ObjectNode requestJson = JsonUtils.newObjectNode();
        requestJson.put("sql", query);
        if (traceEnabled != null && !traceEnabled.isEmpty()) {
            requestJson.put("trace", traceEnabled);
        }
        if (queryOptions != null && !queryOptions.isEmpty()) {
            requestJson.put("queryOptions", queryOptions);
        }
        return requestJson;
    }

    private String getQueryURL(String protocol, String hostName, int port) {
        return String.format("%s://%s:%d/query/sql", protocol, hostName, port);
    }

    public String sendPostRaw(String urlStr, String requestStr, Map<String, String> headers) {
        HttpURLConnection conn = null;
        try {
            String output;
            LOGGER.info("url string passed is : " + urlStr);
            URL url = new URL(urlStr);
            conn = (HttpURLConnection)url.openConnection();
            conn.setDoOutput(true);
            conn.setRequestMethod("POST");
            conn.setRequestProperty("Accept-Encoding", "gzip");
            String string = requestStr;
            byte[] requestBytes = string.getBytes(StandardCharsets.UTF_8);
            conn.setRequestProperty("Content-Length", String.valueOf(requestBytes.length));
            conn.setRequestProperty("http.keepAlive", String.valueOf(true));
            conn.setRequestProperty("default", String.valueOf(true));
            if (headers != null && !headers.isEmpty()) {
                Set<Map.Entry<String, String>> entries = headers.entrySet();
                for (Map.Entry<String, String> entry : entries) {
                    conn.setRequestProperty(entry.getKey(), entry.getValue());
                }
            }
            BufferedOutputStream os = new BufferedOutputStream(conn.getOutputStream());
            ((OutputStream)os).write(requestBytes);
            ((OutputStream)os).flush();
            ((OutputStream)os).close();
            int responseCode = conn.getResponseCode();
            if (responseCode == 403) {
                throw new WebApplicationException("Permission denied", Response.Status.FORBIDDEN);
            }
            if (responseCode != 200) {
                InputStream errorStream = conn.getErrorStream();
                throw new IOException("Failed : HTTP error code : " + responseCode + ". Root Cause: " + (errorStream != null ? IOUtils.toString((InputStream)errorStream, (Charset)StandardCharsets.UTF_8) : "Unknown"));
            }
            byte[] bytes = this.drain(new BufferedInputStream(conn.getInputStream()));
            String string2 = output = new String(bytes, StandardCharsets.UTF_8);
            return string2;
        }
        catch (Exception ex) {
            LOGGER.error("Caught exception while sending query request", (Throwable)ex);
            Utils.rethrowException((Throwable)ex);
            throw new AssertionError((Object)"Should not reach this");
        }
        finally {
            if (conn != null) {
                conn.disconnect();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    byte[] drain(InputStream inputStream) throws IOException {
        try {
            int len;
            byte[] buf = new byte[1024];
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            while ((len = inputStream.read(buf)) > 0) {
                byteArrayOutputStream.write(buf, 0, len);
            }
            byte[] byArray = byteArrayOutputStream.toByteArray();
            return byArray;
        }
        finally {
            inputStream.close();
        }
    }

    public String sendRequestRaw(String url, String query, ObjectNode requestJson, Map<String, String> headers) {
        try {
            long startTime = System.currentTimeMillis();
            String pinotResultString = this.sendPostRaw(url, requestJson.toString(), headers);
            long queryTime = System.currentTimeMillis() - startTime;
            LOGGER.info("Query: " + query + " Time: " + queryTime);
            return pinotResultString;
        }
        catch (Exception ex) {
            LOGGER.error("Caught exception in sendQueryRaw", (Throwable)ex);
            Utils.rethrowException((Throwable)ex);
            throw new AssertionError((Object)"Should not reach this");
        }
    }
}

