/*
 * Decompiled with CFR 0.152.
 */
package org.apache.linkis.engineplugin.repl.executor;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.linkis.common.io.MetaData;
import org.apache.linkis.common.io.Record;
import org.apache.linkis.common.io.resultset.ResultSetWriter;
import org.apache.linkis.common.log.LogUtils;
import org.apache.linkis.common.utils.OverloadUtils;
import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask;
import org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
import org.apache.linkis.engineconn.core.EngineConnObject;
import org.apache.linkis.engineplugin.repl.conf.ReplConfiguration;
import org.apache.linkis.engineplugin.repl.conf.ReplEngineConf;
import org.apache.linkis.engineplugin.repl.errorcode.ReplErrorCodeSummary;
import org.apache.linkis.engineplugin.repl.exception.ReplException;
import org.apache.linkis.engineplugin.repl.executor.ReplAdapter;
import org.apache.linkis.engineplugin.repl.executor.ReplAdapterFactory;
import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
import org.apache.linkis.manager.common.entity.resource.LoadResource;
import org.apache.linkis.manager.common.entity.resource.NodeResource;
import org.apache.linkis.manager.common.entity.resource.Resource;
import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel;
import org.apache.linkis.protocol.engine.JobProgressInfo;
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
import org.apache.linkis.scheduler.executer.ExecuteResponse;
import org.apache.linkis.scheduler.executer.SuccessExecuteResponse;
import org.apache.linkis.storage.LineMetaData;
import org.apache.linkis.storage.LineRecord;
import org.apache.linkis.storage.resultset.ResultSetFactory$;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import scala.Tuple2;

public class ReplEngineConnExecutor
extends ConcurrentComputationExecutor {
    private static final Logger logger = LoggerFactory.getLogger(ReplEngineConnExecutor.class);
    private int id;
    private ReplAdapter replAdapter;
    private List<Label<?>> executorLabels = new ArrayList(2);
    private Map<String, Thread> threadCache = new ConcurrentHashMap<String, Thread>();
    private Map<String, String> configMap = new HashMap<String, String>();

    public ReplEngineConnExecutor(int outputPrintLimit, int id) {
        super(outputPrintLimit);
        this.id = id;
    }

    public void init() {
        super.init();
    }

    public ExecuteResponse execute(EngineConnTask engineConnTask) {
        Map taskParams;
        EngineTypeLabel engineTypeLabel;
        UserCreatorLabel userCreatorLabel;
        Map cacheMap;
        Optional<Label> userCreatorLabelOp = Arrays.stream(engineConnTask.getLables()).filter(label -> label instanceof UserCreatorLabel).findFirst();
        Optional<Label> engineTypeLabelOp = Arrays.stream(engineConnTask.getLables()).filter(label -> label instanceof EngineTypeLabel).findFirst();
        if (userCreatorLabelOp.isPresent() && engineTypeLabelOp.isPresent() && MapUtils.isNotEmpty((Map)(cacheMap = new ReplEngineConf().getCacheMap(new Tuple2((Object)(userCreatorLabel = (UserCreatorLabel)userCreatorLabelOp.get()), (Object)(engineTypeLabel = (EngineTypeLabel)engineTypeLabelOp.get())))))) {
            this.configMap.putAll(cacheMap);
        }
        if (MapUtils.isNotEmpty((Map)(taskParams = engineConnTask.getProperties()))) {
            taskParams.entrySet().stream().filter(entry -> entry.getValue() != null).forEach(entry -> this.configMap.put((String)entry.getKey(), String.valueOf(entry.getValue())));
        }
        String replType = (String)ReplConfiguration.REPL_TYPE.getValue(this.configMap);
        this.replAdapter = ReplAdapterFactory.create(replType);
        return super.execute(engineConnTask);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, String code) {
        if (StringUtils.isBlank((CharSequence)code)) {
            throw new ReplException(ReplErrorCodeSummary.REPL_CODE_IS_NOT_BLANK.getErrorCode(), ReplErrorCodeSummary.REPL_CODE_IS_NOT_BLANK.getErrorDesc());
        }
        String realCode = code.trim();
        logger.info("Repl engine begins to run code:\n {}", (Object)realCode);
        String taskId = (String)engineExecutorContext.getJobId().get();
        this.initialStatusUpdates(taskId, engineExecutorContext);
        String classpathDir = (String)ReplConfiguration.CLASSPATH_DIR.getValue(this.configMap);
        String methodName = (String)ReplConfiguration.METHOD_NAME.getValue(this.configMap);
        this.threadCache.put(taskId, Thread.currentThread());
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream(1024);
        PrintStream cacheStream = new PrintStream(outputStream);
        PrintStream oldStream = System.out;
        System.setOut(cacheStream);
        try {
            this.replAdapter.executorCode(realCode, classpathDir, methodName);
        }
        catch (Exception e) {
            String errorMessage = ExceptionUtils.getStackTrace((Throwable)e);
            logger.error("Repl engine execute failed : {}", (Object)errorMessage);
            engineExecutorContext.appendStdout(LogUtils.generateERROR((String)errorMessage));
            return new ErrorExecuteResponse(errorMessage, null);
        }
        String message = outputStream.toString();
        System.setOut(oldStream);
        engineExecutorContext.appendStdout(message);
        ResultSetWriter resultSetWriter = engineExecutorContext.createResultSetWriter(ResultSetFactory$.MODULE$.TEXT_TYPE());
        try {
            resultSetWriter.addMetaData((MetaData)new LineMetaData(null));
            resultSetWriter.addRecord((Record)new LineRecord(message));
        }
        catch (IOException e) {
            logger.error("Failed to get the task result");
        }
        finally {
            IOUtils.closeQuietly((Closeable)resultSetWriter);
        }
        return new SuccessExecuteResponse();
    }

    public ExecuteResponse executeCompletely(EngineExecutionContext engineExecutorContext, String code, String completedLine) {
        return null;
    }

    public float progress(String taskID) {
        return 0.0f;
    }

    public JobProgressInfo[] getProgressInfo(String taskID) {
        return new JobProgressInfo[0];
    }

    public void killTask(String taskId) {
        Thread thread = this.threadCache.remove(taskId);
        if (null != thread) {
            thread.interrupt();
        }
        super.killTask(taskId);
    }

    public List<Label<?>> getExecutorLabels() {
        return this.executorLabels;
    }

    public void setExecutorLabels(List<Label<?>> labels) {
        if (!CollectionUtils.isEmpty(labels)) {
            this.executorLabels.clear();
            this.executorLabels.addAll(labels);
        }
    }

    public boolean supportCallBackLogs() {
        return false;
    }

    public NodeResource requestExpectedResource(NodeResource expectedResource) {
        return null;
    }

    public NodeResource getCurrentNodeResource() {
        NodeResourceUtils.appendMemoryUnitIfMissing((Map)EngineConnObject.getEngineCreationContext().getOptions());
        CommonNodeResource resource = new CommonNodeResource();
        LoadResource usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory(), 1);
        resource.setUsedResource((Resource)usedResource);
        return resource;
    }

    public String getId() {
        return Sender.getThisServiceInstance().getInstance() + "_" + this.id;
    }

    public int getConcurrentLimit() {
        return (Integer)ReplConfiguration.ENGINE_CONCURRENT_LIMIT.getValue();
    }

    private void initialStatusUpdates(String taskId, EngineExecutionContext engineExecutorContext) {
        engineExecutorContext.pushProgress(this.progress(taskId), this.getProgressInfo(taskId));
    }

    public void killAll() {
        for (Thread thread : this.threadCache.values()) {
            if (thread == null) continue;
            thread.interrupt();
        }
        this.threadCache.clear();
    }

    public void close() {
        this.killAll();
        super.close();
    }
}

