/*
 * Decompiled with CFR 0.152.
 */
package org.apache.linkis.engineplugin.openlookeng.executor;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.airlift.units.Duration;
import io.prestosql.client.ClientSession;
import io.prestosql.client.QueryError;
import io.prestosql.client.QueryStatusInfo;
import io.prestosql.client.SocketChannelSocketFactory;
import io.prestosql.client.StatementClient;
import io.prestosql.client.StatementClientFactory;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import javax.net.SocketFactory;
import okhttp3.OkHttpClient;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.linkis.common.io.MetaData;
import org.apache.linkis.common.io.Record;
import org.apache.linkis.common.io.resultset.ResultSetWriter;
import org.apache.linkis.common.log.LogUtils;
import org.apache.linkis.engineconn.common.conf.EngineConnConf;
import org.apache.linkis.engineconn.common.conf.EngineConnConstant;
import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask;
import org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
import org.apache.linkis.engineconn.core.EngineConnObject;
import org.apache.linkis.engineplugin.openlookeng.conf.OpenLooKengConfiguration;
import org.apache.linkis.engineplugin.openlookeng.conf.OpenLooKengEngineConfCache;
import org.apache.linkis.engineplugin.openlookeng.errorcode.OpenLooKengErrorCodeSummary;
import org.apache.linkis.engineplugin.openlookeng.exception.OpenLooKengClientException;
import org.apache.linkis.engineplugin.openlookeng.exception.OpenLooKengStateInvalidException;
import org.apache.linkis.governance.common.paser.CodeParser;
import org.apache.linkis.governance.common.paser.SQLCodeParser;
import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
import org.apache.linkis.manager.common.entity.resource.NodeResource;
import org.apache.linkis.manager.common.entity.resource.Resource;
import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel;
import org.apache.linkis.manager.label.utils.LabelUtil;
import org.apache.linkis.protocol.engine.JobProgressInfo;
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
import org.apache.linkis.scheduler.executer.ExecuteResponse;
import org.apache.linkis.scheduler.executer.SuccessExecuteResponse;
import org.apache.linkis.storage.domain.Column;
import org.apache.linkis.storage.domain.DataType;
import org.apache.linkis.storage.resultset.ResultSetFactory$;
import org.apache.linkis.storage.resultset.table.TableMetaData;
import org.apache.linkis.storage.resultset.table.TableRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OpenLooKengEngineConnExecutor
extends ConcurrentComputationExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(OpenLooKengEngineConnExecutor.class);
    private int id;
    private OkHttpClient okHttpClient = new OkHttpClient.Builder().socketFactory((SocketFactory)new SocketChannelSocketFactory()).connectTimeout(((Long)OpenLooKengConfiguration.OPENLOOKENG_HTTP_CONNECT_TIME_OUT.getValue()).longValue(), TimeUnit.SECONDS).readTimeout(((Long)OpenLooKengConfiguration.OPENLOOKENG_HTTP_READ_TIME_OUT.getValue()).longValue(), TimeUnit.SECONDS).build();
    private List<Label<?>> executorLabels = new ArrayList();
    private Cache<String, ClientSession> clientSessionCache = CacheBuilder.newBuilder().expireAfterAccess(Long.valueOf(EngineConnConf.ENGINE_TASK_EXPIRE_TIME().getValue().toString()).longValue(), TimeUnit.MILLISECONDS).maximumSize((long)EngineConnConstant.MAX_TASK_NUM()).build();

    public OpenLooKengEngineConnExecutor(int outputPrintLimit, int id) {
        super(outputPrintLimit);
        this.id = id;
    }

    public void init() {
        this.setCodeParser((CodeParser)new SQLCodeParser());
        super.init();
    }

    public ExecuteResponse execute(EngineConnTask engineConnTask) {
        List<Label> labelList = Arrays.asList(engineConnTask.getLables());
        UserCreatorLabel userCreatorLabel = LabelUtil.getUserCreatorLabel(labelList);
        String user = userCreatorLabel.getUser();
        EngineTypeLabel engineTypeLabel = LabelUtil.getEngineTypeLabel(labelList);
        this.clientSessionCache.put((Object)engineConnTask.getTaskId(), (Object)this.getClientSession(user, engineConnTask.getProperties(), OpenLooKengEngineConfCache.getConfMap(userCreatorLabel, engineTypeLabel)));
        return super.execute(engineConnTask);
    }

    public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, String code) {
        String taskId = (String)engineExecutorContext.getJobId().get();
        ClientSession clientSession = (ClientSession)this.clientSessionCache.getIfPresent((Object)taskId);
        StatementClient statement = StatementClientFactory.newStatementClient((OkHttpClient)this.okHttpClient, (ClientSession)clientSession, (String)code);
        this.initialStatusUpdates(taskId, engineExecutorContext, statement);
        try {
            ErrorExecuteResponse errorResponse;
            if (statement.isRunning() || statement.isFinished() && statement.finalStatusInfo().getError() == null) {
                this.queryOutput(taskId, engineExecutorContext, statement);
            }
            if ((errorResponse = this.verifyServerError(taskId, engineExecutorContext, statement)) == null) {
                this.clientSessionCache.put((Object)taskId, (Object)this.updateSession(clientSession, statement));
                return new SuccessExecuteResponse();
            }
            return errorResponse;
        }
        catch (Exception e) {
            return new ErrorExecuteResponse(e.getMessage(), (Throwable)e);
        }
    }

    public ExecuteResponse executeCompletely(EngineExecutionContext engineExecutorContext, String code, String completedLine) {
        return null;
    }

    public float progress(String taskID) {
        return 0.0f;
    }

    public JobProgressInfo[] getProgressInfo(String taskID) {
        return new JobProgressInfo[0];
    }

    public boolean supportCallBackLogs() {
        return false;
    }

    public String getId() {
        return Sender.getThisServiceInstance().getInstance() + this.id;
    }

    public int getConcurrentLimit() {
        return (Integer)OpenLooKengConfiguration.OPENLOOKENG_CONCURRENT_LIMIT.getValue();
    }

    public void killAll() {
    }

    public List<Label<?>> getExecutorLabels() {
        return this.executorLabels;
    }

    public void setExecutorLabels(List<Label<?>> labels) {
        if (null != labels && !labels.isEmpty()) {
            this.executorLabels.clear();
            this.executorLabels.addAll(labels);
        }
    }

    public NodeResource requestExpectedResource(NodeResource expectedResource) {
        return null;
    }

    public NodeResource getCurrentNodeResource() {
        CommonNodeResource resource = new CommonNodeResource();
        resource.setUsedResource((Resource)NodeResourceUtils.applyAsLoadInstanceResource((Map)EngineConnObject.getEngineCreationContext().getOptions()));
        return resource;
    }

    private ClientSession getClientSession(String user, Map<String, Object> taskParams, Map<String, String> cacheMap) {
        HashMap<String, String> configMap = new HashMap<String, String>();
        if (null != cacheMap && !cacheMap.isEmpty()) {
            configMap.putAll(cacheMap);
        }
        for (Map.Entry<String, Object> keyValue : taskParams.entrySet()) {
            configMap.put(keyValue.getKey(), String.valueOf(keyValue.getValue()));
        }
        URI httpUri = URI.create((String)OpenLooKengConfiguration.OPENLOOKENG_URL.getValue(configMap));
        String source = (String)OpenLooKengConfiguration.OPENLOOKENG_SOURCE.getValue(configMap);
        String catalog = (String)OpenLooKengConfiguration.OPENLOOKENG_CATALOG.getValue(configMap);
        String schema = (String)OpenLooKengConfiguration.OPENLOOKENG_SCHEMA.getValue(configMap);
        HashMap properties = new HashMap();
        for (Map.Entry keyValue : configMap.entrySet()) {
            if (!((String)keyValue.getKey()).startsWith("presto.session.")) continue;
            properties.put(((String)keyValue.getKey()).substring("presto.session.".length()), keyValue.getValue());
        }
        String clientInfo = "Linkis";
        String transactionId = null;
        Optional traceToken = Optional.empty();
        Set clientTags = Collections.emptySet();
        ZoneId timeZonId = TimeZone.getDefault().toZoneId();
        Locale locale = Locale.getDefault();
        Map resourceEstimates = Collections.emptyMap();
        Map preparedStatements = Collections.emptyMap();
        Map roles = Collections.emptyMap();
        Map extraCredentials = Collections.emptyMap();
        Duration clientRequestTimeout = new Duration(0.0, TimeUnit.MILLISECONDS);
        ClientSession session = new ClientSession(httpUri, user, source, traceToken, clientTags, clientInfo, catalog, schema, "", timeZonId, locale, resourceEstimates, properties, preparedStatements, roles, extraCredentials, transactionId, clientRequestTimeout);
        return session;
    }

    private void initialStatusUpdates(String taskId, EngineExecutionContext engineExecutorContext, StatementClient statement) {
        while (statement.isRunning() && (statement.currentData().getData() == null || statement.currentStatusInfo().getUpdateType() != null)) {
            engineExecutorContext.pushProgress(this.progress(taskId), this.getProgressInfo(taskId));
            statement.advance();
        }
    }

    private void queryOutput(String taskId, EngineExecutionContext engineExecutorContext, StatementClient statement) throws IOException {
        int columnCount = 0;
        int rows = 0;
        ResultSetWriter resultSetWriter = engineExecutorContext.createResultSetWriter(ResultSetFactory$.MODULE$.TABLE_TYPE());
        try {
            QueryStatusInfo results = null;
            results = statement.isRunning() ? statement.currentStatusInfo() : statement.finalStatusInfo();
            if (results.getColumns() == null) {
                throw new RuntimeException("openlookeng columns is null.");
            }
            Column[] columns = (Column[])results.getColumns().stream().map(column -> new Column(column.getName(), DataType.toDataType((String)column.getType()), "")).toArray(Column[]::new);
            columnCount = columns.length;
            resultSetWriter.addMetaData((MetaData)new TableMetaData(columns));
            while (statement.isRunning()) {
                Iterable data = statement.currentData().getData();
                if (data != null) {
                    for (List row : data) {
                        Object[] rowArray = row.stream().map(String::valueOf).toArray();
                        resultSetWriter.addRecord((Record)new TableRecord(rowArray));
                        ++rows;
                    }
                }
                engineExecutorContext.pushProgress(this.progress(taskId), this.getProgressInfo(taskId));
                statement.advance();
            }
            LOG.warn("Fetched {} col(s) : {} row(s) in openlookeng", (Object)columnCount, (Object)rows);
            engineExecutorContext.sendResultSet(resultSetWriter);
        }
        catch (Exception e) {
            IOUtils.closeQuietly((Closeable)resultSetWriter);
            throw e;
        }
    }

    private ErrorExecuteResponse verifyServerError(String taskId, EngineExecutionContext engineExecutorContext, StatementClient statement) throws OpenLooKengClientException, OpenLooKengStateInvalidException {
        engineExecutorContext.pushProgress(this.progress(taskId), this.getProgressInfo(taskId));
        if (statement.isFinished()) {
            QueryStatusInfo info = statement.finalStatusInfo();
            if (info.getError() != null) {
                QueryError error = Objects.requireNonNull(info.getError());
                String message = "openlookeng execute failed (#" + info.getId() + "):" + error.getMessage();
                RuntimeException cause = null;
                if (error.getFailureInfo() != null) {
                    cause = error.getFailureInfo().toException();
                }
                String errorString = "";
                if (cause == null) {
                    errorString = ExceptionUtils.getStackTrace((Throwable)cause);
                }
                engineExecutorContext.appendStdout(LogUtils.generateERROR((String)errorString));
                return new ErrorExecuteResponse(ExceptionUtils.getMessage((Throwable)cause), (Throwable)cause);
            }
        } else if (statement.isClientAborted()) {
            LOG.warn("openlookeng statement is killed.");
        } else {
            if (statement.isClientError()) {
                throw new OpenLooKengClientException(OpenLooKengErrorCodeSummary.OPENLOOKENG_CLIENT_ERROR.getErrorCode(), OpenLooKengErrorCodeSummary.OPENLOOKENG_CLIENT_ERROR.getErrorDesc());
            }
            throw new OpenLooKengStateInvalidException(OpenLooKengErrorCodeSummary.OPENLOOKENG_STATUS_ERROR.getErrorCode(), OpenLooKengErrorCodeSummary.OPENLOOKENG_STATUS_ERROR.getErrorDesc());
        }
        return null;
    }

    private ClientSession updateSession(ClientSession clientSession, StatementClient statement) {
        ClientSession newSession = clientSession;
        if (statement.getSetCatalog().isPresent() || statement.getSetSchema().isPresent()) {
            newSession = ClientSession.builder((ClientSession)newSession).withCatalog(statement.getSetCatalog().orElse(newSession.getCatalog())).withSchema(statement.getSetSchema().orElse(newSession.getSchema())).build();
        }
        if (statement.isClearTransactionId()) {
            newSession = ClientSession.stripTransactionId((ClientSession)newSession);
        }
        ClientSession.Builder builder = ClientSession.builder((ClientSession)newSession);
        if (statement.getStartedTransactionId() != null) {
            builder = builder.withTransactionId(statement.getStartedTransactionId());
        }
        if (!statement.getSetSessionProperties().isEmpty() || !statement.getResetSessionProperties().isEmpty()) {
            HashMap sessionProperties = new HashMap(newSession.getProperties());
            sessionProperties.putAll(statement.getSetSessionProperties());
            sessionProperties.keySet().removeAll(statement.getResetSessionProperties());
            builder = builder.withProperties(sessionProperties);
        }
        if (!statement.getSetRoles().isEmpty()) {
            HashMap roles = new HashMap(newSession.getRoles());
            roles.putAll(statement.getSetRoles());
            builder = builder.withRoles(roles);
        }
        if (!statement.getAddedPreparedStatements().isEmpty() || !statement.getDeallocatedPreparedStatements().isEmpty()) {
            HashMap preparedStatements = new HashMap(newSession.getPreparedStatements());
            preparedStatements.putAll(statement.getAddedPreparedStatements());
            preparedStatements.keySet().removeAll(statement.getDeallocatedPreparedStatements());
            builder = builder.withPreparedStatements(preparedStatements);
        }
        return builder.build();
    }
}

