/*
 * Decompiled with CFR 0.152.
 */
package org.apache.linkis.engineconnplugin.flink.client.deployment;

import java.io.Closeable;
import java.lang.reflect.Method;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.deployment.ClusterRetrieveException;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.linkis.common.conf.TimeType;
import org.apache.linkis.engineconnplugin.flink.client.context.ExecutionContext;
import org.apache.linkis.engineconnplugin.flink.client.shims.errorcode.FlinkErrorCodeSummary;
import org.apache.linkis.engineconnplugin.flink.client.shims.exception.JobExecutionException;
import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ClusterDescriptorAdapter
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(ClusterDescriptorAdapter.class);
    public static final long CLIENT_REQUEST_TIMEOUT = ((TimeType)FlinkEnvConfiguration.FLINK_CLIENT_REQUEST_TIMEOUT().getValue()).toLong();
    public final ExecutionContext executionContext;
    private JobID jobId;
    protected ApplicationId clusterID;
    protected String kubernetesClusterID;
    protected ClusterClient<ApplicationId> clusterClient;
    protected ClusterClient<String> kubernetesClusterClient;
    private YarnClusterDescriptor clusterDescriptor;
    protected String webInterfaceUrl;

    public void setJobId(JobID jobId) {
        this.jobId = jobId;
    }

    public JobID getJobId() {
        return this.jobId;
    }

    public ApplicationId getClusterID() {
        return this.clusterID;
    }

    public String getKubernetesClusterID() {
        return this.kubernetesClusterID;
    }

    public String getWebInterfaceUrl() {
        return this.webInterfaceUrl;
    }

    public ClusterDescriptorAdapter(ExecutionContext executionContext) {
        this.executionContext = executionContext;
    }

    public JobStatus getJobStatus() throws JobExecutionException {
        if (this.jobId == null) {
            try {
                LOG.info("flink getJobStatus jobId is null,sleep three seconds");
                Thread.sleep(3000L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        LOG.info("flink getJobStatus jobId:{}", (Object)this.jobId);
        if (this.jobId == null) {
            throw new JobExecutionException(FlinkErrorCodeSummary.NO_JOB_SUBMITTED.getErrorDesc());
        }
        return (JobStatus)this.bridgeClientRequest(this.executionContext, this.jobId, () -> this.clusterClient.getJobStatus(this.jobId), false);
    }

    public void cancelJob() throws JobExecutionException {
        if (this.jobId == null) {
            LOG.info("No job has been submitted, ignore the method of cancelJob.");
            return;
        }
        LOG.info("Start to cancel job {}.", (Object)this.jobId);
        this.bridgeClientRequest(this.executionContext, this.jobId, () -> this.clusterClient.cancel(this.jobId), true);
    }

    public String doSavepoint(String savepoint, String mode) throws JobExecutionException {
        Supplier function;
        LOG.info("try to {} savepoint in path {}.", (Object)mode, (Object)savepoint);
        switch (mode) {
            case "trigger": {
                function = () -> this.executionContext.triggerSavepoint(this.clusterClient, this.jobId, savepoint);
                break;
            }
            case "cancel": {
                function = () -> this.executionContext.cancelWithSavepoint(this.clusterClient, this.jobId, savepoint);
                break;
            }
            case "stop": {
                function = () -> this.executionContext.stopWithSavepoint(this.clusterClient, this.jobId, false, savepoint);
                break;
            }
            default: {
                throw new JobExecutionException(FlinkErrorCodeSummary.NOT_SAVEPOINT_MODE.getErrorDesc() + mode);
            }
        }
        return (String)this.bridgeClientRequest(this.executionContext, this.jobId, function, false);
    }

    protected <R> R bridgeClientRequest(ExecutionContext executionContext, JobID jobId, Supplier<CompletableFuture<R>> function, boolean ignoreError) throws JobExecutionException {
        if (this.clusterClient == null) {
            if (this.clusterID == null) {
                LOG.error("Cluster information don't exist.");
                throw new JobExecutionException(FlinkErrorCodeSummary.CLUSTER_NOT_EXIST.getErrorDesc());
            }
            this.clusterDescriptor = executionContext.createClusterDescriptor();
            try {
                this.clusterClient = this.clusterDescriptor.retrieve(this.clusterID).getClusterClient();
            }
            catch (ClusterRetrieveException e) {
                LOG.error(String.format("Job: %s could not retrieve or create a cluster.", jobId), (Throwable)e);
                throw new JobExecutionException(String.format("Job: %s could not retrieve or create a cluster.", jobId), (Throwable)e);
            }
        }
        try {
            return function.get().get(CLIENT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            if (ignoreError) {
                return null;
            }
            LOG.error(String.format("Job: %s operation failed!", jobId), (Throwable)e);
            throw new JobExecutionException(String.format("Job: %s operation failed!", jobId), (Throwable)e);
        }
    }

    protected void bindApplicationId() throws JobExecutionException {
        Configuration configuration;
        Method method = null;
        try {
            method = StreamExecutionEnvironment.class.getDeclaredMethod("getConfiguration", new Class[0]);
        }
        catch (NoSuchMethodException e) {
            throw new JobExecutionException(FlinkErrorCodeSummary.NOT_FLINK_VERSION.getErrorDesc(), (Throwable)e);
        }
        method.setAccessible(true);
        try {
            configuration = (Configuration)method.invoke((Object)this.executionContext.getStreamExecutionEnvironment(), new Object[0]);
        }
        catch (Exception e) {
            throw new JobExecutionException(FlinkErrorCodeSummary.EXECUTE_FAILED.getErrorDesc(), (Throwable)e);
        }
        String applicationId = configuration.getString(YarnConfigOptions.APPLICATION_ID);
        if (StringUtils.isNotBlank((CharSequence)applicationId)) {
            LOG.info("The applicationId {} is exists in StreamExecutionEnvironment, ignore to bind applicationId to StreamExecutionEnvironment.", (Object)applicationId);
            return;
        }
        applicationId = this.executionContext.getFlinkConfig().getString(YarnConfigOptions.APPLICATION_ID);
        if (StringUtils.isBlank((CharSequence)applicationId) && this.clusterID == null) {
            throw new JobExecutionException(FlinkErrorCodeSummary.APPLICATIONID_NOT_EXIST.getErrorDesc());
        }
        if (StringUtils.isNotBlank((CharSequence)applicationId)) {
            configuration.setString(YarnConfigOptions.APPLICATION_ID, applicationId);
            LOG.info("Bind applicationId {} to StreamExecutionEnvironment.", (Object)applicationId);
        } else {
            configuration.setString(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString((ApplicationId)this.clusterID));
            LOG.info("Bind applicationId {} to StreamExecutionEnvironment.", (Object)this.clusterID);
        }
    }

    public String toString() {
        return "ClusterDescriptorAdapter{jobId=" + this.jobId + ", clusterID=" + this.clusterID + '}';
    }

    @Override
    public void close() {
        if (this.clusterClient != null) {
            this.clusterClient.shutDownCluster();
            this.clusterClient.close();
        }
        if (this.clusterDescriptor != null) {
            this.clusterDescriptor.close();
        }
    }

    public abstract boolean isGloballyTerminalState() throws JobExecutionException;

    public boolean isGloballyTerminalStateByYarn() throws JobExecutionException {
        boolean isGloballyTerminalState;
        try {
            JobStatus jobStatus = this.getJobStatus();
            isGloballyTerminalState = jobStatus.isGloballyTerminalState();
        }
        catch (JobExecutionException e) {
            if (this.isYarnApplicationStopped(e)) {
                isGloballyTerminalState = true;
            }
            throw e;
        }
        return isGloballyTerminalState;
    }

    private boolean isYarnApplicationStopped(Throwable e) {
        do {
            String exceptionMessage;
            if (!StringUtils.equals((CharSequence)(exceptionMessage = e.getMessage()), (CharSequence)("The Yarn application " + this.clusterID + " doesn't run anymore."))) continue;
            LOG.info("{} is stopped.", (Object)this.clusterID);
            return true;
        } while ((e = e.getCause()) != null);
        return false;
    }
}

