/*
 * Decompiled with CFR 0.152.
 */
package org.apache.linkis.engineplugin.presto.executor;

import com.facebook.presto.client.ClientSession;
import com.facebook.presto.client.QueryError;
import com.facebook.presto.client.QueryStatusInfo;
import com.facebook.presto.client.SocketChannelSocketFactory;
import com.facebook.presto.client.StatementClient;
import com.facebook.presto.client.StatementClientFactory;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.airlift.units.Duration;
import java.io.Closeable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.net.SocketFactory;
import okhttp3.OkHttpClient;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.linkis.common.exception.ErrorException;
import org.apache.linkis.common.io.MetaData;
import org.apache.linkis.common.io.Record;
import org.apache.linkis.common.io.resultset.ResultSetWriter;
import org.apache.linkis.common.log.LogUtils;
import org.apache.linkis.common.utils.OverloadUtils;
import org.apache.linkis.engineconn.common.conf.EngineConnConf;
import org.apache.linkis.engineconn.common.conf.EngineConnConstant;
import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask;
import org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
import org.apache.linkis.engineconn.core.EngineConnObject;
import org.apache.linkis.engineplugin.presto.conf.PrestoConfiguration;
import org.apache.linkis.engineplugin.presto.conf.PrestoEngineConf;
import org.apache.linkis.engineplugin.presto.errorcode.PrestoErrorCodeSummary;
import org.apache.linkis.engineplugin.presto.exception.PrestoClientException;
import org.apache.linkis.engineplugin.presto.exception.PrestoStateInvalidException;
import org.apache.linkis.engineplugin.presto.utils.PrestoSQLHook;
import org.apache.linkis.governance.common.paser.CodeParser;
import org.apache.linkis.governance.common.paser.SQLCodeParser;
import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
import org.apache.linkis.manager.common.entity.resource.LoadResource;
import org.apache.linkis.manager.common.entity.resource.NodeResource;
import org.apache.linkis.manager.common.entity.resource.Resource;
import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel;
import org.apache.linkis.protocol.engine.JobProgressInfo;
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
import org.apache.linkis.scheduler.executer.ExecuteResponse;
import org.apache.linkis.scheduler.executer.SuccessExecuteResponse;
import org.apache.linkis.storage.domain.Column;
import org.apache.linkis.storage.domain.DataType;
import org.apache.linkis.storage.resultset.ResultSetFactory$;
import org.apache.linkis.storage.resultset.table.TableMetaData;
import org.apache.linkis.storage.resultset.table.TableRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import scala.Tuple2;

public class PrestoEngineConnExecutor
extends ConcurrentComputationExecutor {
    private static final Logger logger = LoggerFactory.getLogger(PrestoEngineConnExecutor.class);
    private static OkHttpClient okHttpClient = new OkHttpClient.Builder().socketFactory((SocketFactory)new SocketChannelSocketFactory()).connectTimeout(((Long)PrestoConfiguration.PRESTO_HTTP_CONNECT_TIME_OUT.getValue()).longValue(), TimeUnit.SECONDS).readTimeout(((Long)PrestoConfiguration.PRESTO_HTTP_READ_TIME_OUT.getValue()).longValue(), TimeUnit.SECONDS).build();
    private int id;
    private List<Label<?>> executorLabels = new ArrayList(2);
    private Map<String, StatementClient> statementClientCache = new ConcurrentHashMap<String, StatementClient>();
    private Cache<String, ClientSession> clientSessionCache = CacheBuilder.newBuilder().expireAfterAccess(Long.valueOf(EngineConnConf.ENGINE_TASK_EXPIRE_TIME().getValue().toString()).longValue(), TimeUnit.MILLISECONDS).maximumSize((long)EngineConnConstant.MAX_TASK_NUM()).build();

    public PrestoEngineConnExecutor(int outputPrintLimit, int id) {
        super(outputPrintLimit);
        this.id = id;
    }

    public void init() {
        this.setCodeParser((CodeParser)new SQLCodeParser());
        super.init();
    }

    public ExecuteResponse execute(EngineConnTask engineConnTask) {
        String user = this.getUserCreatorLabel(engineConnTask.getLables()).getUser();
        Optional<Label> userCreatorLabelOp = Arrays.stream(engineConnTask.getLables()).filter(label -> label instanceof UserCreatorLabel).findFirst();
        Optional<Label> engineTypeLabelOp = Arrays.stream(engineConnTask.getLables()).filter(label -> label instanceof EngineTypeLabel).findFirst();
        Map configMap = null;
        if (userCreatorLabelOp.isPresent() && engineTypeLabelOp.isPresent()) {
            UserCreatorLabel userCreatorLabel = (UserCreatorLabel)userCreatorLabelOp.get();
            EngineTypeLabel engineTypeLabel = (EngineTypeLabel)engineTypeLabelOp.get();
            configMap = new PrestoEngineConf().getCacheMap(new Tuple2((Object)userCreatorLabel, (Object)engineTypeLabel));
        }
        this.clientSessionCache.put((Object)engineConnTask.getTaskId(), (Object)this.getClientSession(user, engineConnTask.getProperties(), configMap));
        return super.execute(engineConnTask);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, String code) {
        boolean enableSqlHook = (Boolean)PrestoConfiguration.PRESTO_SQL_HOOK_ENABLED.getValue();
        String realCode = StringUtils.isBlank((CharSequence)code) ? "SELECT 1" : (enableSqlHook ? PrestoSQLHook.preExecuteHook(code.trim()) : code.trim());
        logger.info("presto client begins to run psql code:\n {}", (Object)realCode);
        String taskId = (String)engineExecutorContext.getJobId().get();
        ClientSession clientSession = (ClientSession)this.clientSessionCache.getIfPresent((Object)taskId);
        StatementClient statement = StatementClientFactory.newStatementClient((OkHttpClient)okHttpClient, (ClientSession)clientSession, (String)realCode);
        this.statementClientCache.put(taskId, statement);
        try {
            this.initialStatusUpdates(taskId, engineExecutorContext, statement);
            if (statement.isRunning() || statement.isFinished() && statement.finalStatusInfo().getError() == null) {
                this.queryOutput(taskId, engineExecutorContext, statement);
            }
            ErrorExecuteResponse errorResponse = null;
            try {
                errorResponse = this.verifyServerError(taskId, engineExecutorContext, statement);
            }
            catch (ErrorException e) {
                logger.error("Presto execute failed (#{}): {}", (Object)e.getErrCode(), (Object)e.getMessage());
            }
            if (errorResponse == null) {
                this.clientSessionCache.put((Object)taskId, (Object)this.updateSession(clientSession, statement));
                SuccessExecuteResponse successExecuteResponse = new SuccessExecuteResponse();
                return successExecuteResponse;
            }
            ErrorExecuteResponse errorExecuteResponse = errorResponse;
            return errorExecuteResponse;
        }
        finally {
            this.statementClientCache.remove(taskId);
        }
    }

    public ExecuteResponse executeCompletely(EngineExecutionContext engineExecutorContext, String code, String completedLine) {
        return null;
    }

    public float progress(String taskID) {
        return 0.0f;
    }

    public JobProgressInfo[] getProgressInfo(String taskID) {
        return new JobProgressInfo[0];
    }

    public void killTask(String taskId) {
        StatementClient statement = this.statementClientCache.remove(taskId);
        if (null != statement) {
            statement.cancelLeafStage();
        }
        super.killTask(taskId);
    }

    public List<Label<?>> getExecutorLabels() {
        return this.executorLabels;
    }

    public void setExecutorLabels(List<Label<?>> labels) {
        if (!CollectionUtils.isEmpty(labels)) {
            this.executorLabels.clear();
            this.executorLabels.addAll(labels);
        }
    }

    public boolean supportCallBackLogs() {
        return false;
    }

    public NodeResource requestExpectedResource(NodeResource expectedResource) {
        return null;
    }

    public NodeResource getCurrentNodeResource() {
        NodeResourceUtils.appendMemoryUnitIfMissing((Map)EngineConnObject.getEngineCreationContext().getOptions());
        CommonNodeResource resource = new CommonNodeResource();
        LoadResource usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory(), 1);
        resource.setUsedResource((Resource)usedResource);
        return resource;
    }

    public String getId() {
        return Sender.getThisServiceInstance().getInstance() + "_" + this.id;
    }

    public int getConcurrentLimit() {
        return (Integer)PrestoConfiguration.ENGINE_CONCURRENT_LIMIT.getValue();
    }

    private ClientSession getClientSession(String user, Map<String, Object> taskParams, Map<String, String> cacheMap) {
        HashMap<String, String> configMap = new HashMap<String, String>();
        if (!CollectionUtils.isEmpty(cacheMap)) {
            configMap.putAll(cacheMap);
        }
        taskParams.entrySet().stream().filter(entry -> entry.getValue() != null).forEach(entry -> configMap.put((String)entry.getKey(), String.valueOf(entry.getValue())));
        URI httpUri = URI.create((String)PrestoConfiguration.PRESTO_URL.getValue(configMap));
        String source = (String)PrestoConfiguration.PRESTO_SOURCE.getValue(configMap);
        String catalog = (String)PrestoConfiguration.PRESTO_CATALOG.getValue(configMap);
        String schema = (String)PrestoConfiguration.PRESTO_SCHEMA.getValue(configMap);
        Map<String, String> properties = configMap.entrySet().stream().filter(entry -> ((String)entry.getKey()).startsWith("presto.session.")).collect(Collectors.toMap(entry -> ((String)entry.getKey()).substring("presto.session.".length()), Map.Entry::getValue));
        String clientInfo = "Linkis";
        String transactionId = null;
        Optional traceToken = Optional.empty();
        Set clientTags = Collections.emptySet();
        String timeZonId = TimeZone.getDefault().getID();
        Locale locale = Locale.getDefault();
        Map resourceEstimates = Collections.emptyMap();
        Map preparedStatements = Collections.emptyMap();
        Map roles = Collections.emptyMap();
        Map extraCredentials = Collections.emptyMap();
        Duration clientRequestTimeout = new Duration(0.0, TimeUnit.MILLISECONDS);
        return new ClientSession(httpUri, user, source, traceToken, clientTags, clientInfo, catalog, schema, timeZonId, locale, resourceEstimates, properties, preparedStatements, roles, extraCredentials, transactionId, clientRequestTimeout);
    }

    private UserCreatorLabel getUserCreatorLabel(Label<?>[] labels) {
        return (UserCreatorLabel)Arrays.stream(labels).filter(label -> label instanceof UserCreatorLabel).findFirst().get();
    }

    private void initialStatusUpdates(String taskId, EngineExecutionContext engineExecutorContext, StatementClient statement) {
        while (statement.isRunning() && (statement.currentData().getData() == null || statement.currentStatusInfo().getUpdateType() != null)) {
            engineExecutorContext.pushProgress(this.progress(taskId), this.getProgressInfo(taskId));
            statement.advance();
        }
    }

    private void queryOutput(String taskId, EngineExecutionContext engineExecutorContext, StatementClient statement) {
        int columnCount = 0;
        int rows = 0;
        ResultSetWriter resultSetWriter = engineExecutorContext.createResultSetWriter(ResultSetFactory$.MODULE$.TABLE_TYPE());
        try {
            QueryStatusInfo results = null;
            results = statement.isRunning() ? statement.currentStatusInfo() : statement.finalStatusInfo();
            if (results.getColumns() == null) {
                throw new RuntimeException("presto columns is null.");
            }
            List<Column> columns = results.getColumns().stream().map(column -> new Column(column.getName(), DataType.toDataType((String)column.getType()), "")).collect(Collectors.toList());
            columnCount = columns.size();
            resultSetWriter.addMetaData((MetaData)new TableMetaData(columns.toArray(new Column[0])));
            while (statement.isRunning()) {
                Iterable data = statement.currentData().getData();
                if (data != null) {
                    for (List row : data) {
                        Object[] rowArray = (String[])row.stream().map(r -> String.valueOf(r)).toArray(String[]::new);
                        resultSetWriter.addRecord((Record)new TableRecord(rowArray));
                        ++rows;
                    }
                }
                engineExecutorContext.pushProgress(this.progress(taskId), this.getProgressInfo(taskId));
                statement.advance();
            }
        }
        catch (Exception e) {
            IOUtils.closeQuietly((Closeable)resultSetWriter);
        }
        String message = String.format("Fetched %d col(s) : %d row(s) in presto", columnCount, rows);
        logger.info(message);
        engineExecutorContext.appendStdout(LogUtils.generateInfo((String)message));
        engineExecutorContext.sendResultSet(resultSetWriter);
    }

    private ErrorExecuteResponse verifyServerError(String taskId, EngineExecutionContext engineExecutorContext, StatementClient statement) throws ErrorException {
        engineExecutorContext.pushProgress(this.progress(taskId), this.getProgressInfo(taskId));
        if (statement.isFinished()) {
            QueryStatusInfo info = statement.finalStatusInfo();
            if (info.getError() != null) {
                QueryError error = Objects.requireNonNull(info.getError());
                logger.error("Presto execute failed (#{}): {}", (Object)info.getId(), (Object)error.getMessage());
                RuntimeException cause = null;
                if (error.getFailureInfo() != null) {
                    cause = error.getFailureInfo().toException();
                }
                engineExecutorContext.appendStdout(LogUtils.generateERROR((String)ExceptionUtils.getStackTrace(cause)));
                return new ErrorExecuteResponse(ExceptionUtils.getMessage((Throwable)cause), (Throwable)cause);
            }
            return null;
        }
        if (statement.isClientAborted()) {
            logger.warn("Presto statement is killed.");
            return null;
        }
        if (statement.isClientError()) {
            throw new PrestoClientException(PrestoErrorCodeSummary.PRESTO_CLIENT_ERROR.getErrorCode(), PrestoErrorCodeSummary.PRESTO_CLIENT_ERROR.getErrorDesc());
        }
        throw new PrestoStateInvalidException(PrestoErrorCodeSummary.PRESTO_STATE_INVALID.getErrorCode(), PrestoErrorCodeSummary.PRESTO_STATE_INVALID.getErrorDesc());
    }

    private ClientSession updateSession(ClientSession clientSession, StatementClient statement) {
        ClientSession newSession = clientSession;
        if (statement.getSetCatalog().isPresent() || statement.getSetSchema().isPresent()) {
            newSession = ClientSession.builder((ClientSession)newSession).withCatalog(statement.getSetCatalog().orElse(newSession.getCatalog())).withSchema(statement.getSetSchema().orElse(newSession.getSchema())).build();
        }
        if (statement.isClearTransactionId()) {
            newSession = ClientSession.stripTransactionId((ClientSession)newSession);
        }
        ClientSession.Builder builder = ClientSession.builder((ClientSession)newSession);
        if (statement.getStartedTransactionId() != null) {
            builder = builder.withTransactionId(statement.getStartedTransactionId());
        }
        if (!statement.getSetSessionProperties().isEmpty() || !statement.getResetSessionProperties().isEmpty()) {
            HashMap sessionProperties = new HashMap(newSession.getProperties());
            sessionProperties.putAll(statement.getSetSessionProperties());
            sessionProperties.keySet().removeAll(statement.getResetSessionProperties());
            builder = builder.withProperties(sessionProperties);
        }
        if (!statement.getSetRoles().isEmpty()) {
            HashMap roles = new HashMap(newSession.getRoles());
            roles.putAll(statement.getSetRoles());
            builder = builder.withRoles(roles);
        }
        if (!statement.getAddedPreparedStatements().isEmpty() || !statement.getDeallocatedPreparedStatements().isEmpty()) {
            HashMap preparedStatements = new HashMap(newSession.getPreparedStatements());
            preparedStatements.putAll(statement.getAddedPreparedStatements());
            preparedStatements.keySet().removeAll(statement.getDeallocatedPreparedStatements());
            builder = builder.withPreparedStatements(preparedStatements);
        }
        return builder.build();
    }

    public void killAll() {
        for (StatementClient statement : this.statementClientCache.values()) {
            if (statement == null) continue;
            statement.cancelLeafStage();
        }
        this.statementClientCache.clear();
    }

    public void close() {
        this.killAll();
        super.close();
    }
}

