/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark;

import com.aliyun.emr_serverless_spark20230808.Client;
import com.aliyun.emr_serverless_spark20230808.models.CancelJobRunRequest;
import com.aliyun.emr_serverless_spark20230808.models.GetJobRunRequest;
import com.aliyun.emr_serverless_spark20230808.models.GetJobRunResponse;
import com.aliyun.emr_serverless_spark20230808.models.GetTemplateRequest;
import com.aliyun.emr_serverless_spark20230808.models.GetTemplateResponse;
import com.aliyun.emr_serverless_spark20230808.models.JobDriver;
import com.aliyun.emr_serverless_spark20230808.models.StartJobRunRequest;
import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponse;
import com.aliyun.emr_serverless_spark20230808.models.Tag;
import com.aliyun.teaopenapi.models.Config;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.AliyunServerlessSparkConstants;
import org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param.AliyunServerlessSparkConnectionParam;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark.AliyunServerlessSparkParameters;
import org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark.AliyunServerlessSparkTaskException;
import org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark.RunState;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.dolphinscheduler.plugin.task.api.utils.RetryUtils;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AliyunServerlessSparkTask
extends AbstractRemoteTask {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(AliyunServerlessSparkTask.class);
    private final TaskExecutionContext taskExecutionContext;
    private Client aliyunServerlessSparkClient;
    private AliyunServerlessSparkParameters aliyunServerlessSparkParameters;
    private AliyunServerlessSparkConnectionParam aliyunServerlessSparkConnectionParam;
    private String templateConf;
    private String templateDisplayReleaseVersion;
    private Boolean templateFusion;
    private String jobRunId;
    private RunState currentState;
    private String accessKeyId;
    private String accessKeySecret;
    private String regionId;
    private String endpoint;
    private RetryUtils.RetryPolicy retryPolicy = new RetryUtils.RetryPolicy(10, 1000L);

    protected AliyunServerlessSparkTask(TaskExecutionContext taskExecutionContext) {
        super(taskExecutionContext);
        this.taskExecutionContext = taskExecutionContext;
    }

    public void init() {
        String taskParams = this.taskExecutionContext.getTaskParams();
        this.aliyunServerlessSparkParameters = (AliyunServerlessSparkParameters)((Object)JSONUtils.parseObject((String)taskParams, AliyunServerlessSparkParameters.class));
        log.info("aliyunServerlessSparkParameters - {}", (Object)this.aliyunServerlessSparkParameters);
        if (this.aliyunServerlessSparkParameters == null || !this.aliyunServerlessSparkParameters.checkParameters()) {
            throw new AliyunServerlessSparkTaskException("Aliyun-Serverless-Spark task parameters are not valid!");
        }
        ResourceParametersHelper resourceParametersHelper = this.taskExecutionContext.getResourceParametersHelper();
        DataSourceParameters dataSourceParameters = (DataSourceParameters)resourceParametersHelper.getResourceParameters(ResourceType.DATASOURCE, Integer.valueOf(this.aliyunServerlessSparkParameters.getDatasource()));
        this.aliyunServerlessSparkConnectionParam = (AliyunServerlessSparkConnectionParam)DataSourceUtils.buildConnectionParams((DbType)DbType.valueOf((String)this.aliyunServerlessSparkParameters.getType()), (String)dataSourceParameters.getConnectionParams());
        this.accessKeyId = this.aliyunServerlessSparkConnectionParam.getAccessKeyId();
        this.accessKeySecret = this.aliyunServerlessSparkConnectionParam.getAccessKeySecret();
        this.regionId = this.aliyunServerlessSparkConnectionParam.getRegionId();
        this.endpoint = this.aliyunServerlessSparkConnectionParam.getEndpoint();
        try {
            this.aliyunServerlessSparkClient = this.buildAliyunServerlessSparkClient(this.accessKeyId, this.accessKeySecret, this.regionId, this.endpoint);
        }
        catch (Exception e) {
            log.error("Failed to build Aliyun-Serverless-Spark client!", (Throwable)e);
            throw new AliyunServerlessSparkTaskException("Failed to build Aliyun-Serverless-Spark client!");
        }
        this.currentState = RunState.Submitted;
    }

    public void handle(TaskCallBack taskCallBack) throws TaskException {
        GetTemplateResponse getTemplateResponse = (GetTemplateResponse)RetryUtils.retryFunction(() -> {
            try {
                return this.aliyunServerlessSparkClient.getTemplate(this.aliyunServerlessSparkParameters.getWorkspaceId(), this.buildGetTemplateRequest());
            }
            catch (Exception e) {
                throw new TaskException("Failed to get template info", (Throwable)e);
            }
        }, (RetryUtils.RetryPolicy)this.retryPolicy);
        if (getTemplateResponse != null) {
            this.templateConf = getTemplateResponse.getBody().getData().getSparkConf().stream().map(item -> "--conf " + item.getKey() + "=" + item.getValue()).collect(Collectors.joining(" "));
            this.templateDisplayReleaseVersion = getTemplateResponse.getBody().getData().getDisplaySparkVersion();
            this.templateFusion = getTemplateResponse.getBody().getData().getFusion();
        }
        StartJobRunRequest startJobRunRequest = this.buildStartJobRunRequest(this.aliyunServerlessSparkParameters);
        StartJobRunResponse startJobRunResponse = (StartJobRunResponse)RetryUtils.retryFunction(() -> {
            try {
                return this.aliyunServerlessSparkClient.startJobRun(this.aliyunServerlessSparkParameters.getWorkspaceId(), startJobRunRequest);
            }
            catch (Exception e) {
                throw new AliyunServerlessSparkTaskException("Failed to start job run!");
            }
        }, (RetryUtils.RetryPolicy)this.retryPolicy);
        this.jobRunId = startJobRunResponse.getBody().getJobRunId();
        this.setAppIds(this.jobRunId);
        log.info("Successfully submitted serverless spark job, jobRunId - {}", (Object)this.jobRunId);
        while (!RunState.isFinal(this.currentState)) {
            GetJobRunRequest getJobRunRequest = this.buildGetJobRunRequest();
            GetJobRunResponse getJobRunResponse = (GetJobRunResponse)RetryUtils.retryFunction(() -> {
                try {
                    return this.aliyunServerlessSparkClient.getJobRun(this.aliyunServerlessSparkParameters.getWorkspaceId(), this.jobRunId, getJobRunRequest);
                }
                catch (Exception e) {
                    throw new AliyunServerlessSparkTaskException("Failed to get job run!", e);
                }
            }, (RetryUtils.RetryPolicy)this.retryPolicy);
            this.currentState = RunState.valueOf(getJobRunResponse.getBody().getJobRun().getState());
            log.info("job - {} state - {}", (Object)this.jobRunId, (Object)this.currentState);
            try {
                Thread.sleep(10000L);
            }
            catch (InterruptedException e) {
                break;
            }
        }
        this.setExitStatusCode(this.mapFinalStateToExitCode(this.currentState));
    }

    public void submitApplication() throws TaskException {
    }

    public void trackApplicationStatus() throws TaskException {
    }

    protected int mapFinalStateToExitCode(RunState state) {
        switch (state) {
            case Success: {
                return 0;
            }
            case Failed: {
                return -1;
            }
        }
        return -1;
    }

    public AbstractParameters getParameters() {
        return this.aliyunServerlessSparkParameters;
    }

    public void cancelApplication() throws TaskException {
        CancelJobRunRequest cancelJobRunRequest = this.buildCancelJobRunRequest();
        RetryUtils.retryFunction(() -> {
            try {
                return this.aliyunServerlessSparkClient.cancelJobRun(this.aliyunServerlessSparkParameters.getWorkspaceId(), this.jobRunId, cancelJobRunRequest);
            }
            catch (Exception e) {
                throw new AliyunServerlessSparkTaskException("Failed to cancel job run!");
            }
        }, (RetryUtils.RetryPolicy)this.retryPolicy);
    }

    public List<String> getApplicationIds() throws TaskException {
        return Collections.emptyList();
    }

    protected Client buildAliyunServerlessSparkClient(String accessKeyId, String accessKeySecret, String regionId, String endpoint) throws Exception {
        if (StringUtils.isEmpty((CharSequence)endpoint)) {
            endpoint = String.format(AliyunServerlessSparkConstants.ENDPOINT_TEMPLATE, regionId);
        }
        Config config = new Config().setEndpoint(endpoint).setAccessKeyId(accessKeyId).setAccessKeySecret(accessKeySecret);
        return new Client(config);
    }

    protected StartJobRunRequest buildStartJobRunRequest(AliyunServerlessSparkParameters aliyunServerlessSparkParameters) {
        if (this.templateConf != null) {
            aliyunServerlessSparkParameters.setSparkSubmitParameters(this.templateConf + " " + aliyunServerlessSparkParameters.getSparkSubmitParameters());
        }
        StartJobRunRequest startJobRunRequest = new StartJobRunRequest();
        startJobRunRequest.setClientToken(this.genereteClientToken());
        startJobRunRequest.setRegionId(this.regionId);
        startJobRunRequest.setResourceQueueId(aliyunServerlessSparkParameters.getResourceQueueId());
        startJobRunRequest.setCodeType(aliyunServerlessSparkParameters.getCodeType());
        startJobRunRequest.setName(aliyunServerlessSparkParameters.getJobName());
        String engineReleaseVersion = aliyunServerlessSparkParameters.getEngineReleaseVersion();
        if (engineReleaseVersion != null && !engineReleaseVersion.isEmpty()) {
            startJobRunRequest.setReleaseVersion(engineReleaseVersion);
        } else if (this.templateDisplayReleaseVersion != null && this.templateFusion != null) {
            startJobRunRequest.setDisplayReleaseVersion(this.templateDisplayReleaseVersion);
            startJobRunRequest.setFusion(this.templateFusion);
        }
        Tag envTag = new Tag();
        envTag.setKey(AliyunServerlessSparkConstants.ENV_KEY);
        String envType = aliyunServerlessSparkParameters.isProduction() ? AliyunServerlessSparkConstants.ENV_PROD : AliyunServerlessSparkConstants.ENV_DEV;
        envTag.setValue(envType);
        Tag workflowTag = new Tag();
        workflowTag.setKey(AliyunServerlessSparkConstants.WORKFLOW_KEY);
        workflowTag.setValue(AliyunServerlessSparkConstants.WORKFLOW_VALUE);
        startJobRunRequest.setTags(Arrays.asList(envTag, workflowTag));
        List entryPointArguments = StringUtils.isEmpty((CharSequence)aliyunServerlessSparkParameters.getEntryPointArguments()) ? Collections.emptyList() : Arrays.asList(aliyunServerlessSparkParameters.getEntryPointArguments().split(AliyunServerlessSparkConstants.ENTRY_POINT_ARGUMENTS_DELIMITER));
        JobDriver.JobDriverSparkSubmit jobDriverSparkSubmit = new JobDriver.JobDriverSparkSubmit().setEntryPoint(aliyunServerlessSparkParameters.getEntryPoint()).setEntryPointArguments(entryPointArguments).setSparkSubmitParameters(aliyunServerlessSparkParameters.getSparkSubmitParameters());
        JobDriver jobDriver = new JobDriver().setSparkSubmit(jobDriverSparkSubmit);
        startJobRunRequest.setJobDriver(jobDriver);
        return startJobRunRequest;
    }

    protected GetJobRunRequest buildGetJobRunRequest() {
        GetJobRunRequest getJobRunRequest = new GetJobRunRequest();
        getJobRunRequest.setRegionId(this.regionId);
        return getJobRunRequest;
    }

    protected CancelJobRunRequest buildCancelJobRunRequest() {
        CancelJobRunRequest cancelJobRunRequest = new CancelJobRunRequest();
        cancelJobRunRequest.setRegionId(this.regionId);
        return cancelJobRunRequest;
    }

    protected GetTemplateRequest buildGetTemplateRequest() {
        GetTemplateRequest getTemplateRequest = new GetTemplateRequest();
        if (this.aliyunServerlessSparkParameters.getTemplateId() != null && !this.aliyunServerlessSparkParameters.getTemplateId().isEmpty()) {
            getTemplateRequest.setTemplateBizId(this.aliyunServerlessSparkParameters.getTemplateId());
        }
        return getTemplateRequest;
    }

    protected String genereteClientToken() {
        return this.taskExecutionContext.getTaskInstanceId() + "-" + UUID.randomUUID();
    }
}

