/*
 * Decompiled with CFR 0.152.
 */
package org.apache.linkis.engineplugin.elasticsearch.executor;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.IOUtils;
import org.apache.linkis.common.io.MetaData;
import org.apache.linkis.common.io.Record;
import org.apache.linkis.common.io.resultset.ResultSetWriter;
import org.apache.linkis.common.utils.OverloadUtils;
import org.apache.linkis.engineconn.common.conf.EngineConnConf;
import org.apache.linkis.engineconn.common.conf.EngineConnConstant;
import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask;
import org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
import org.apache.linkis.engineconn.core.EngineConnObject;
import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchConfiguration;
import org.apache.linkis.engineplugin.elasticsearch.conf.ElasticSearchEngineConsoleConf;
import org.apache.linkis.engineplugin.elasticsearch.executor.client.ElasticSearchErrorResponse;
import org.apache.linkis.engineplugin.elasticsearch.executor.client.ElasticSearchExecutor;
import org.apache.linkis.engineplugin.elasticsearch.executor.client.ElasticSearchJsonResponse;
import org.apache.linkis.engineplugin.elasticsearch.executor.client.ElasticSearchResponse;
import org.apache.linkis.engineplugin.elasticsearch.executor.client.ElasticSearchTableResponse;
import org.apache.linkis.engineplugin.elasticsearch.executor.client.impl.ElasticSearchExecutorImpl;
import org.apache.linkis.governance.common.entity.ExecutionNodeStatus;
import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
import org.apache.linkis.manager.common.entity.resource.LoadResource;
import org.apache.linkis.manager.common.entity.resource.NodeResource;
import org.apache.linkis.manager.common.entity.resource.Resource;
import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.protocol.engine.JobProgressInfo;
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.scheduler.executer.AliasOutputExecuteResponse;
import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
import org.apache.linkis.scheduler.executer.ExecuteResponse;
import org.apache.linkis.storage.LineRecord;
import org.apache.linkis.storage.resultset.ResultSetFactory$;
import org.apache.linkis.storage.resultset.table.TableMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

public class ElasticSearchEngineConnExecutor
extends ConcurrentComputationExecutor {
    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchEngineConnExecutor.class);
    private int id;
    private String runType;
    private List<Label<?>> executorLabels = new ArrayList(2);
    private Cache<String, ElasticSearchExecutor> elasticSearchExecutorCache = CacheBuilder.newBuilder().expireAfterAccess(Long.valueOf(EngineConnConf.ENGINE_TASK_EXPIRE_TIME().getValue().toString()).longValue(), TimeUnit.MILLISECONDS).removalListener((RemovalListener)new RemovalListener<String, ElasticSearchExecutor>(){

        public void onRemoval(RemovalNotification<String, ElasticSearchExecutor> notification) {
            ((ElasticSearchExecutor)notification.getValue()).close();
            EngineConnTask task = ElasticSearchEngineConnExecutor.this.getTaskById((String)notification.getKey());
            if (!ExecutionNodeStatus.isCompleted((ExecutionNodeStatus)task.getStatus())) {
                ElasticSearchEngineConnExecutor.this.killTask((String)notification.getKey());
            }
        }
    }).maximumSize((long)EngineConnConstant.MAX_TASK_NUM()).build();

    public ElasticSearchEngineConnExecutor(int outputPrintLimit, int id, String runType) {
        super(outputPrintLimit);
        this.id = id;
        this.runType = runType;
    }

    public void init() {
        super.init();
    }

    public ExecuteResponse execute(EngineConnTask engineConnTask) {
        Map<String, String> properties = this.buildRuntimeParams(engineConnTask);
        logger.info("The elasticsearch properties is: {}", properties);
        ElasticSearchExecutorImpl elasticSearchExecutor = new ElasticSearchExecutorImpl(this.runType, properties);
        try {
            elasticSearchExecutor.open();
        }
        catch (Exception e) {
            logger.error("Execute es code failed, reason:", (Throwable)e);
            return new ErrorExecuteResponse("run es failed", (Throwable)e);
        }
        this.elasticSearchExecutorCache.put((Object)engineConnTask.getTaskId(), (Object)elasticSearchExecutor);
        return super.execute(engineConnTask);
    }

    public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, String code) {
        String taskId = (String)engineExecutorContext.getJobId().get();
        ElasticSearchExecutor elasticSearchExecutor = (ElasticSearchExecutor)this.elasticSearchExecutorCache.getIfPresent((Object)taskId);
        ElasticSearchResponse elasticSearchResponse = elasticSearchExecutor.executeLine(code);
        try {
            if (elasticSearchResponse instanceof ElasticSearchTableResponse) {
                ElasticSearchTableResponse tableResponse = (ElasticSearchTableResponse)elasticSearchResponse;
                TableMetaData metaData = new TableMetaData(tableResponse.columns());
                ResultSetWriter resultSetWriter = engineExecutorContext.createResultSetWriter(ResultSetFactory$.MODULE$.TABLE_TYPE());
                resultSetWriter.addMetaData((MetaData)metaData);
                Arrays.asList(tableResponse.records()).forEach(record -> {
                    try {
                        resultSetWriter.addRecord((Record)record);
                    }
                    catch (IOException e) {
                        logger.warn("es addRecord failed", (Throwable)e);
                        throw new RuntimeException("es addRecord failed", e);
                    }
                });
                String output = resultSetWriter.toString();
                IOUtils.closeQuietly((Closeable)resultSetWriter);
                return new AliasOutputExecuteResponse(null, output);
            }
            if (elasticSearchResponse instanceof ElasticSearchJsonResponse) {
                ElasticSearchJsonResponse jsonResponse = (ElasticSearchJsonResponse)elasticSearchResponse;
                ResultSetWriter resultSetWriter = engineExecutorContext.createResultSetWriter(ResultSetFactory$.MODULE$.TABLE_TYPE());
                resultSetWriter.addMetaData(null);
                Arrays.stream(jsonResponse.value().split("\\n")).forEach(item -> {
                    try {
                        resultSetWriter.addRecord((Record)new LineRecord(item));
                    }
                    catch (IOException e) {
                        logger.warn("es addRecord failed", (Throwable)e);
                        throw new RuntimeException("es addRecord failed", e);
                    }
                });
                String output = resultSetWriter.toString();
                IOUtils.closeQuietly((Closeable)resultSetWriter);
                return new AliasOutputExecuteResponse(null, output);
            }
            if (elasticSearchResponse instanceof ElasticSearchErrorResponse) {
                ElasticSearchErrorResponse errorResponse = (ElasticSearchErrorResponse)elasticSearchResponse;
                return new ErrorExecuteResponse(errorResponse.message(), errorResponse.cause());
            }
        }
        catch (IOException e) {
            logger.warn("es addMetaData failed", (Throwable)e);
            return new ErrorExecuteResponse("es addMetaData failed", (Throwable)e);
        }
        return new ErrorExecuteResponse("es executeLine failed", null);
    }

    private Map<String, String> buildRuntimeParams(EngineConnTask engineConnTask) {
        Map<String, String> executorProperties = engineConnTask.getProperties().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> Objects.toString(entry.getValue(), null)));
        Map globalConfig = new ElasticSearchEngineConsoleConf().getCacheMap(engineConnTask.getLables());
        if (MapUtils.isNotEmpty(executorProperties)) {
            globalConfig.putAll(executorProperties);
        }
        return globalConfig;
    }

    public ExecuteResponse executeCompletely(EngineExecutionContext engineExecutorContext, String code, String completedLine) {
        return null;
    }

    public float progress(String taskId) {
        return 0.0f;
    }

    public JobProgressInfo[] getProgressInfo(String taskId) {
        return new JobProgressInfo[0];
    }

    public List<Label<?>> getExecutorLabels() {
        return this.executorLabels;
    }

    public void setExecutorLabels(List<Label<?>> labels) {
        if (!CollectionUtils.isEmpty(labels)) {
            this.executorLabels.clear();
            this.executorLabels.addAll(labels);
        }
    }

    public NodeResource requestExpectedResource(NodeResource expectedResource) {
        return null;
    }

    public NodeResource getCurrentNodeResource() {
        NodeResourceUtils.appendMemoryUnitIfMissing((Map)EngineConnObject.getEngineCreationContext().getOptions());
        CommonNodeResource resource = new CommonNodeResource();
        LoadResource usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory(), 1);
        resource.setUsedResource((Resource)usedResource);
        return resource;
    }

    public String getId() {
        return Sender.getThisServiceInstance().getInstance() + "_" + this.id;
    }

    public int getConcurrentLimit() {
        return (Integer)ElasticSearchConfiguration.ENGINE_CONCURRENT_LIMIT.getValue();
    }

    public void killTask(String taskId) {
        ElasticSearchExecutor elasticSearchExecutor = (ElasticSearchExecutor)this.elasticSearchExecutorCache.getIfPresent((Object)taskId);
        if (elasticSearchExecutor != null) {
            elasticSearchExecutor.close();
        }
        super.killTask(taskId);
    }

    public void killAll() {
        this.elasticSearchExecutorCache.asMap().values().forEach(e -> e.close());
    }

    public void transformTaskStatus(EngineConnTask task, ExecutionNodeStatus newStatus) {
        super.transformTaskStatus(task, newStatus);
        if (ExecutionNodeStatus.isCompleted((ExecutionNodeStatus)newStatus)) {
            this.elasticSearchExecutorCache.invalidate((Object)task.getTaskId());
        }
    }

    public boolean supportCallBackLogs() {
        return false;
    }
}

