/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.deployment.executors;

import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.CacheSupportedPipelineExecutor;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionUtils;

@Internal
public class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>>
implements CacheSupportedPipelineExecutor {
    private final ClientFactory clusterClientFactory;

    public AbstractSessionClusterExecutor(@Nonnull ClientFactory clusterClientFactory) {
        this.clusterClientFactory = (ClusterClientFactory)Preconditions.checkNotNull(clusterClientFactory);
    }

    @Override
    public CompletableFuture<JobClient> execute(@Nonnull Pipeline pipeline, @Nonnull Configuration configuration, @Nonnull ClassLoader userCodeClassloader) throws Exception {
        JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);
        try (ClusterDescriptor clusterDescriptor = this.clusterClientFactory.createClusterDescriptor(configuration);){
            Object clusterID = this.clusterClientFactory.getClusterId(configuration);
            Preconditions.checkState(clusterID != null);
            ClusterClientProvider clusterClientProvider = clusterDescriptor.retrieve(clusterID);
            ClusterClient clusterClient = clusterClientProvider.getClusterClient();
            CompletionStage completionStage = ((CompletableFuture)((CompletableFuture)clusterClient.submitJob(jobGraph).thenApplyAsync(FunctionUtils.uncheckedFunction(jobId -> {
                ClientUtils.waitUntilJobInitializationFinished(() -> clusterClient.getJobStatus((JobID)jobId).get(), () -> clusterClient.requestJobResult((JobID)jobId).get(), userCodeClassloader);
                return jobId;
            }))).thenApplyAsync(jobID -> new ClusterClientJobClientAdapter(clusterClientProvider, (JobID)jobID, userCodeClassloader))).whenCompleteAsync((ignored1, ignored2) -> clusterClient.close());
            return completionStage;
        }
    }

    @Override
    public CompletableFuture<Set<AbstractID>> listCompletedClusterDatasetIds(Configuration configuration, ClassLoader userCodeClassloader) throws Exception {
        try (ClusterDescriptor clusterDescriptor = this.clusterClientFactory.createClusterDescriptor(configuration);){
            Object clusterID = this.clusterClientFactory.getClusterId(configuration);
            Preconditions.checkState(clusterID != null);
            ClusterClientProvider clusterClientProvider = clusterDescriptor.retrieve(clusterID);
            ClusterClient clusterClient = clusterClientProvider.getClusterClient();
            CompletableFuture<Set<AbstractID>> completableFuture = clusterClient.listCompletedClusterDatasetIds();
            return completableFuture;
        }
    }

    @Override
    public CompletableFuture<Void> invalidateClusterDataset(AbstractID clusterDatasetId, Configuration configuration, ClassLoader userCodeClassloader) throws Exception {
        try (ClusterDescriptor clusterDescriptor = this.clusterClientFactory.createClusterDescriptor(configuration);){
            Object clusterID = this.clusterClientFactory.getClusterId(configuration);
            Preconditions.checkState(clusterID != null);
            ClusterClientProvider clusterClientProvider = clusterDescriptor.retrieve(clusterID);
            ClusterClient clusterClient = clusterClientProvider.getClusterClient();
            CompletionStage completionStage = clusterClient.invalidateClusterDataset(new IntermediateDataSetID(clusterDatasetId)).thenApply(acknowledge -> null);
            return completionStage;
        }
    }
}

