/*
 * Decompiled with CFR 0.152.
 */
package org.apache.linkis.engineplugin.nebula.executor;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.vesoft.nebula.ErrorCode;
import com.vesoft.nebula.client.graph.NebulaPoolConfig;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.data.ResultSet;
import com.vesoft.nebula.client.graph.net.NebulaPool;
import com.vesoft.nebula.client.graph.net.Session;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.linkis.common.exception.ErrorException;
import org.apache.linkis.common.io.MetaData;
import org.apache.linkis.common.io.Record;
import org.apache.linkis.common.io.resultset.ResultSetWriter;
import org.apache.linkis.common.log.LogUtils;
import org.apache.linkis.common.utils.OverloadUtils;
import org.apache.linkis.engineconn.common.conf.EngineConnConf;
import org.apache.linkis.engineconn.common.conf.EngineConnConstant;
import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask;
import org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
import org.apache.linkis.engineconn.core.EngineConnObject;
import org.apache.linkis.engineplugin.nebula.conf.NebulaConfiguration;
import org.apache.linkis.engineplugin.nebula.conf.NebulaEngineConf;
import org.apache.linkis.engineplugin.nebula.errorcode.NebulaErrorCodeSummary;
import org.apache.linkis.engineplugin.nebula.exception.NebulaClientException;
import org.apache.linkis.engineplugin.nebula.exception.NebulaExecuteError;
import org.apache.linkis.governance.common.paser.CodeParser;
import org.apache.linkis.governance.common.paser.SQLCodeParser;
import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
import org.apache.linkis.manager.common.entity.resource.LoadResource;
import org.apache.linkis.manager.common.entity.resource.NodeResource;
import org.apache.linkis.manager.common.entity.resource.Resource;
import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel;
import org.apache.linkis.protocol.engine.JobProgressInfo;
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
import org.apache.linkis.scheduler.executer.ExecuteResponse;
import org.apache.linkis.scheduler.executer.SuccessExecuteResponse;
import org.apache.linkis.storage.domain.Column;
import org.apache.linkis.storage.domain.DataType;
import org.apache.linkis.storage.resultset.table.TableMetaData;
import org.apache.linkis.storage.resultset.table.TableRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import scala.Tuple2;

public class NebulaEngineConnExecutor
extends ConcurrentComputationExecutor {
    private static final Logger logger = LoggerFactory.getLogger(NebulaEngineConnExecutor.class);
    private int id;
    private List<Label<?>> executorLabels = new ArrayList(2);
    private Map<String, Session> sessionCache = new ConcurrentHashMap<String, Session>();
    private Map<String, String> configMap = new HashMap<String, String>();
    private Cache<String, NebulaPool> nebulaPoolCache = CacheBuilder.newBuilder().expireAfterAccess(Long.valueOf(EngineConnConf.ENGINE_TASK_EXPIRE_TIME().getValue().toString()).longValue(), TimeUnit.MILLISECONDS).maximumSize((long)EngineConnConstant.MAX_TASK_NUM()).build();

    public NebulaEngineConnExecutor(int outputPrintLimit, int id) {
        super(outputPrintLimit);
        this.id = id;
    }

    public void init() {
        this.setCodeParser((CodeParser)new SQLCodeParser());
        super.init();
    }

    public ExecuteResponse execute(EngineConnTask engineConnTask) {
        Optional<Label> userCreatorLabelOp = Arrays.stream(engineConnTask.getLables()).filter(label -> label instanceof UserCreatorLabel).findFirst();
        Optional<Label> engineTypeLabelOp = Arrays.stream(engineConnTask.getLables()).filter(label -> label instanceof EngineTypeLabel).findFirst();
        Map configMap = null;
        if (userCreatorLabelOp.isPresent() && engineTypeLabelOp.isPresent()) {
            UserCreatorLabel userCreatorLabel = (UserCreatorLabel)userCreatorLabelOp.get();
            EngineTypeLabel engineTypeLabel = (EngineTypeLabel)engineTypeLabelOp.get();
            configMap = new NebulaEngineConf().getCacheMap(new Tuple2((Object)userCreatorLabel, (Object)engineTypeLabel));
        }
        this.nebulaPoolCache.put((Object)engineConnTask.getTaskId(), (Object)this.getNebulaPool(engineConnTask.getProperties(), configMap));
        return super.execute(engineConnTask);
    }

    public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, String code) {
        String realCode = StringUtils.isBlank((CharSequence)code) ? "SHOW SPACES" : code.trim();
        logger.info("Nebula client begins to run ngql code:\n {}", (Object)realCode);
        String taskId = (String)engineExecutorContext.getJobId().get();
        NebulaPool nebulaPool = (NebulaPool)this.nebulaPoolCache.getIfPresent((Object)taskId);
        Session session = this.getSession(taskId, nebulaPool);
        this.initialStatusUpdates(taskId, engineExecutorContext, session);
        ResultSet resultSet = null;
        try {
            resultSet = session.execute(code);
        }
        catch (Exception e) {
            logger.error("Nebula executor error.");
            throw new NebulaExecuteError(NebulaErrorCodeSummary.NEBULA_EXECUTOR_ERROR.getErrorCode(), NebulaErrorCodeSummary.NEBULA_EXECUTOR_ERROR.getErrorDesc());
        }
        if (resultSet.isSucceeded() && !resultSet.isEmpty()) {
            this.queryOutput(taskId, engineExecutorContext, resultSet);
        }
        ErrorExecuteResponse errorResponse = null;
        try {
            errorResponse = this.verifyServerError(taskId, engineExecutorContext, resultSet);
        }
        catch (ErrorException e) {
            logger.error("Nebula execute failed (#{}): {}", (Object)e.getErrCode(), (Object)e.getMessage());
        }
        if (errorResponse == null) {
            return new SuccessExecuteResponse();
        }
        return errorResponse;
    }

    public ExecuteResponse executeCompletely(EngineExecutionContext engineExecutorContext, String code, String completedLine) {
        return null;
    }

    public float progress(String taskID) {
        return 0.0f;
    }

    public JobProgressInfo[] getProgressInfo(String taskID) {
        return new JobProgressInfo[0];
    }

    public void killTask(String taskId) {
        Session session = this.sessionCache.remove(taskId);
        if (null != session) {
            session.release();
        }
        super.killTask(taskId);
    }

    public List<Label<?>> getExecutorLabels() {
        return this.executorLabels;
    }

    public void setExecutorLabels(List<Label<?>> labels) {
        if (!CollectionUtils.isEmpty(labels)) {
            this.executorLabels.clear();
            this.executorLabels.addAll(labels);
        }
    }

    public boolean supportCallBackLogs() {
        return false;
    }

    public NodeResource requestExpectedResource(NodeResource expectedResource) {
        return null;
    }

    public NodeResource getCurrentNodeResource() {
        NodeResourceUtils.appendMemoryUnitIfMissing((Map)EngineConnObject.getEngineCreationContext().getOptions());
        CommonNodeResource resource = new CommonNodeResource();
        LoadResource usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory(), 1);
        resource.setUsedResource((Resource)usedResource);
        return resource;
    }

    public String getId() {
        return Sender.getThisServiceInstance().getInstance() + "_" + this.id;
    }

    public int getConcurrentLimit() {
        return (Integer)NebulaConfiguration.ENGINE_CONCURRENT_LIMIT.getValue();
    }

    private NebulaPool getNebulaPool(Map<String, Object> taskParams, Map<String, String> cacheMap) {
        if (!CollectionUtils.isEmpty(cacheMap)) {
            this.configMap.putAll(cacheMap);
        }
        taskParams.entrySet().stream().filter(entry -> entry.getValue() != null).forEach(entry -> this.configMap.put((String)entry.getKey(), String.valueOf(entry.getValue())));
        String host = (String)NebulaConfiguration.NEBULA_HOST.getValue(this.configMap);
        Integer port = (Integer)NebulaConfiguration.NEBULA_PORT.getValue(this.configMap);
        Integer maxConnSize = (Integer)NebulaConfiguration.NEBULA_MAX_CONN_SIZE.getValue(this.configMap);
        NebulaPool nebulaPool = new NebulaPool();
        Boolean initResult = false;
        try {
            NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig();
            nebulaPoolConfig.setMaxConnSize(maxConnSize.intValue());
            List<HostAddress> addresses = Arrays.asList(new HostAddress(host, port.intValue()));
            initResult = nebulaPool.init(addresses, nebulaPoolConfig);
        }
        catch (Exception e) {
            logger.error("NebulaPool initialization failed.");
            throw new NebulaClientException(NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorCode(), NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorDesc());
        }
        if (!initResult.booleanValue()) {
            logger.error("NebulaPool initialization failed.");
            throw new NebulaClientException(NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorCode(), NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorDesc());
        }
        return nebulaPool;
    }

    private Session getSession(String taskId, NebulaPool nebulaPool) {
        if (this.sessionCache.containsKey(taskId) && this.sessionCache.get(taskId) != null && this.sessionCache.get(taskId).ping()) {
            return this.sessionCache.get(taskId);
        }
        Session session = null;
        String username = (String)NebulaConfiguration.NEBULA_USER_NAME.getValue(this.configMap);
        String password = (String)NebulaConfiguration.NEBULA_PASSWORD.getValue(this.configMap);
        Boolean reconnect = (Boolean)NebulaConfiguration.NEBULA_RECONNECT_ENABLED.getValue(this.configMap);
        String space = (String)NebulaConfiguration.NEBULA_SPACE.getValue(this.configMap);
        try {
            session = nebulaPool.getSession(username, password, reconnect.booleanValue());
            if (StringUtils.isNotBlank((CharSequence)space)) {
                session.execute("use " + space);
            }
        }
        catch (Exception e) {
            logger.error("Nebula Session initialization failed.");
            throw new NebulaClientException(NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorCode(), NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorDesc());
        }
        this.sessionCache.put(taskId, session);
        return session;
    }

    private void initialStatusUpdates(String taskId, EngineExecutionContext engineExecutorContext, Session session) {
        if (session.ping()) {
            engineExecutorContext.pushProgress(this.progress(taskId), this.getProgressInfo(taskId));
        }
    }

    private void queryOutput(String taskId, EngineExecutionContext engineExecutorContext, ResultSet resultSet) {
        int columnCount = 0;
        ResultSetWriter resultSetWriter = engineExecutorContext.createResultSetWriter("2");
        try {
            List colNames = resultSet.keys();
            if (CollectionUtils.isEmpty((Collection)colNames)) {
                throw new RuntimeException("Nebula columns is null.");
            }
            List<Column> columns = colNames.stream().map(column -> new Column(column, DataType.toDataType((String)"string"), "")).collect(Collectors.toList());
            columnCount = columns.size();
            resultSetWriter.addMetaData((MetaData)new TableMetaData(columns.toArray(new Column[0])));
            if (!resultSet.isEmpty()) {
                for (int i = 0; i < resultSet.rowsSize(); ++i) {
                    ResultSet.Record record = resultSet.rowValues(i);
                    if (record == null) continue;
                    Object[] rowArray = (String[])record.values().stream().map(x -> {
                        try {
                            return x.toString();
                        }
                        catch (Exception e) {
                            return "";
                        }
                    }).toArray(String[]::new);
                    resultSetWriter.addRecord((Record)new TableRecord(rowArray));
                }
                engineExecutorContext.pushProgress(this.progress(taskId), this.getProgressInfo(taskId));
            }
        }
        catch (Exception e) {
            IOUtils.closeQuietly((Closeable)resultSetWriter);
        }
        String message = String.format("Fetched %d col(s) : %d row(s) in Nebula", columnCount, resultSet.rowsSize());
        logger.info(message);
        engineExecutorContext.appendStdout(LogUtils.generateInfo((String)message));
        engineExecutorContext.sendResultSet(resultSetWriter);
    }

    private ErrorExecuteResponse verifyServerError(String taskId, EngineExecutionContext engineExecutorContext, ResultSet resultSet) throws ErrorException {
        engineExecutorContext.pushProgress(this.progress(taskId), this.getProgressInfo(taskId));
        if (!resultSet.isSucceeded() || resultSet.getErrorCode() != ErrorCode.SUCCEEDED.getValue()) {
            logger.error("Nebula execute failed (#{}): {}", (Object)resultSet.getErrorCode(), (Object)resultSet.getErrorMessage());
            engineExecutorContext.appendStdout(LogUtils.generateERROR((String)resultSet.getErrorMessage()));
            return new ErrorExecuteResponse(resultSet.getErrorMessage(), null);
        }
        return null;
    }

    public void killAll() {
        for (Session session : this.sessionCache.values()) {
            if (session == null) continue;
            session.release();
        }
        this.sessionCache.clear();
    }

    public void close() {
        this.killAll();
        super.close();
    }
}

