/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.client;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Identify;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.util.Timeout;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.client.ClientUtils;
import org.apache.flink.runtime.client.JobAttachmentClientActor;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobListeningContext;
import org.apache.flink.runtime.client.JobRetrievalException;
import org.apache.flink.runtime.client.JobSubmissionClientActor;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.client.JobTimeoutException;
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class JobClient {
    private static final Logger LOG = LoggerFactory.getLogger(JobClient.class);

    public static ActorSystem startJobClientActorSystem(Configuration config, String hostname) throws Exception {
        LOG.info("Starting JobClient actor system");
        ActorSystem system = BootstrapTools.startActorSystem(config, hostname, 0, LOG);
        return system;
    }

    public static JobListeningContext submitJob(ActorSystem actorSystem, Configuration config, HighAvailabilityServices highAvailabilityServices, JobGraph jobGraph, FiniteDuration timeout, boolean sysoutLogUpdates, ClassLoader classLoader) {
        Preconditions.checkNotNull((Object)actorSystem, (String)"The actorSystem must not be null.");
        Preconditions.checkNotNull((Object)highAvailabilityServices, (String)"The high availability services must not be null.");
        Preconditions.checkNotNull((Object)jobGraph, (String)"The jobGraph must not be null.");
        Preconditions.checkNotNull((Object)timeout, (String)"The timeout must not be null.");
        Props jobClientActorProps = JobSubmissionClientActor.createActorProps(highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), timeout, sysoutLogUpdates, config);
        ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
        Future submissionFuture = Patterns.ask((ActorRef)jobClientActor, (Object)new JobClientMessages.SubmitJobAndWait(jobGraph), (Timeout)new Timeout(AkkaUtils.INF_TIMEOUT()));
        return new JobListeningContext(jobGraph.getJobID(), (Future<Object>)submissionFuture, jobClientActor, timeout, classLoader, highAvailabilityServices);
    }

    public static JobListeningContext attachToRunningJob(JobID jobID, Configuration configuration, ActorSystem actorSystem, HighAvailabilityServices highAvailabilityServices, FiniteDuration timeout, boolean sysoutLogUpdates) {
        Preconditions.checkNotNull((Object)jobID, (String)"The jobID must not be null.");
        Preconditions.checkNotNull((Object)configuration, (String)"The configuration must not be null.");
        Preconditions.checkNotNull((Object)actorSystem, (String)"The actorSystem must not be null.");
        Preconditions.checkNotNull((Object)highAvailabilityServices, (String)"The high availability services must not be null.");
        Preconditions.checkNotNull((Object)timeout, (String)"The timeout must not be null.");
        Props jobClientActorProps = JobAttachmentClientActor.createActorProps(highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), timeout, sysoutLogUpdates);
        ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
        Future attachmentFuture = Patterns.ask((ActorRef)jobClientActor, (Object)new JobClientMessages.AttachToJobAndWait(jobID), (Timeout)new Timeout(AkkaUtils.INF_TIMEOUT()));
        return new JobListeningContext(jobID, (Future<Object>)attachmentFuture, jobClientActor, timeout, actorSystem, configuration, highAvailabilityServices);
    }

    public static ClassLoader retrieveClassLoader(JobID jobID, JobManagerGateway jobManager, Configuration config, HighAvailabilityServices highAvailabilityServices, Time timeout) throws JobRetrievalException {
        Optional<JobManagerMessages.ClassloadingProps> optProps;
        CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> clPropsFuture = jobManager.requestClassloadingProps(jobID, timeout);
        try {
            optProps = clPropsFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            throw new JobRetrievalException(jobID, "Could not retrieve the class loading properties from JobManager.", e);
        }
        if (optProps.isPresent()) {
            PermanentBlobCache permanentBlobCache;
            JobManagerMessages.ClassloadingProps props = optProps.get();
            InetSocketAddress serverAddress = new InetSocketAddress(jobManager.getHostname(), (int)props.blobManagerPort());
            try {
                permanentBlobCache = new PermanentBlobCache(config, highAvailabilityServices.createBlobStore(), serverAddress);
            }
            catch (IOException e) {
                throw new JobRetrievalException(jobID, "Failed to setup BlobCache.", e);
            }
            Collection<PermanentBlobKey> requiredJarFiles = props.requiredJarFiles();
            Collection<URL> requiredClasspaths = props.requiredClasspaths();
            URL[] allURLs = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
            int pos = 0;
            for (PermanentBlobKey blobKey : props.requiredJarFiles()) {
                try {
                    allURLs[pos++] = permanentBlobCache.getFile(jobID, blobKey).toURI().toURL();
                }
                catch (Exception e) {
                    try {
                        permanentBlobCache.close();
                    }
                    catch (IOException ioe) {
                        LOG.warn("Could not properly close the BlobClient.", (Throwable)ioe);
                    }
                    throw new JobRetrievalException(jobID, "Failed to download BlobKey " + blobKey, e);
                }
            }
            for (URL url : requiredClasspaths) {
                allURLs[pos++] = url;
            }
            return FlinkUserCodeClassLoaders.parentFirst(allURLs, JobClient.class.getClassLoader());
        }
        throw new JobRetrievalException(jobID, "Couldn't retrieve class loader. Job " + jobID + " not found");
    }

    public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException {
        Object answer;
        JobID jobID = listeningContext.getJobID();
        ActorRef jobClientActor = listeningContext.getJobClientActor();
        Future<Object> jobSubmissionFuture = listeningContext.getJobResultFuture();
        FiniteDuration askTimeout = listeningContext.getTimeout();
        ClassLoader classLoader = listeningContext.getClassLoader();
        while (!jobSubmissionFuture.isCompleted()) {
            try {
                Await.ready(jobSubmissionFuture, (Duration)askTimeout);
            }
            catch (InterruptedException e) {
                throw new JobExecutionException(jobID, "Interrupted while waiting for job completion.");
            }
            catch (TimeoutException e) {
                try {
                    Await.result((Awaitable)Patterns.ask((ActorRef)jobClientActor, (Object)new Identify((Object)true), (Timeout)Timeout.durationToTimeout((FiniteDuration)askTimeout)), (Duration)askTimeout);
                }
                catch (Exception eInner) {
                    if (jobSubmissionFuture.isCompleted()) continue;
                    throw new JobExecutionException(jobID, "JobClientActor seems to have died before the JobExecutionResult could be retrieved.", eInner);
                }
            }
        }
        try {
            answer = Await.result(jobSubmissionFuture, (Duration)Duration.Zero());
        }
        catch (Throwable throwable) {
            throw new JobExecutionException(jobID, "Couldn't retrieve the JobExecutionResult from the JobManager.", throwable);
        }
        finally {
            jobClientActor.tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
        }
        if (answer instanceof JobManagerMessages.JobResultSuccess) {
            LOG.info("Job execution complete");
            SerializedJobExecutionResult result = ((JobManagerMessages.JobResultSuccess)answer).result();
            if (result != null) {
                try {
                    return result.toJobExecutionResult(classLoader);
                }
                catch (Throwable t) {
                    throw new JobExecutionException(jobID, "Job was successfully executed but JobExecutionResult could not be deserialized.");
                }
            }
            throw new JobExecutionException(jobID, "Job was successfully executed but result contained a null JobExecutionResult.");
        }
        if (answer instanceof JobManagerMessages.JobResultFailure) {
            LOG.info("Job execution failed");
            SerializedThrowable serThrowable = ((JobManagerMessages.JobResultFailure)answer).cause();
            if (serThrowable != null) {
                Throwable cause = serThrowable.deserializeError(classLoader);
                if (cause instanceof JobExecutionException) {
                    throw (JobExecutionException)((Object)cause);
                }
                throw new JobExecutionException(jobID, "Job execution failed", cause);
            }
            throw new JobExecutionException(jobID, "Job execution failed with null as failure cause.");
        }
        if (answer instanceof JobManagerMessages.JobNotFound) {
            throw new JobRetrievalException(((JobManagerMessages.JobNotFound)answer).jobID(), "Couldn't retrieve Job " + jobID + " because it was not running.");
        }
        throw new JobExecutionException(jobID, "Unknown answer from JobManager after submitting the job: " + answer);
    }

    public static JobExecutionResult submitJobAndWait(ActorSystem actorSystem, Configuration config, HighAvailabilityServices highAvailabilityServices, JobGraph jobGraph, FiniteDuration timeout, boolean sysoutLogUpdates, ClassLoader classLoader) throws JobExecutionException {
        JobListeningContext jobListeningContext = JobClient.submitJob(actorSystem, config, highAvailabilityServices, jobGraph, timeout, sysoutLogUpdates, classLoader);
        return JobClient.awaitJobResult(jobListeningContext);
    }

    public static void submitJobDetached(JobManagerGateway jobManagerGateway, Configuration config, JobGraph jobGraph, Time timeout, ClassLoader classLoader) throws JobExecutionException {
        InetSocketAddress blobServerAddress;
        Preconditions.checkNotNull((Object)jobManagerGateway, (String)"The jobManagerGateway must not be null.");
        Preconditions.checkNotNull((Object)jobGraph, (String)"The jobGraph must not be null.");
        Preconditions.checkNotNull((Object)timeout, (String)"The timeout must not be null.");
        LOG.info("Checking and uploading JAR files");
        CompletableFuture<InetSocketAddress> blobServerAddressFuture = JobClient.retrieveBlobServerAddress(jobManagerGateway, timeout);
        try {
            blobServerAddress = blobServerAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            throw new JobSubmissionException(jobGraph.getJobID(), "Could not retrieve BlobServer address.", e);
        }
        try {
            ClientUtils.extractAndUploadJobGraphFiles(jobGraph, (SupplierWithException<BlobClient, IOException>)((SupplierWithException)() -> new BlobClient(blobServerAddress, config)));
        }
        catch (FlinkException e) {
            throw new JobSubmissionException(jobGraph.getJobID(), "Could not upload job files.", e);
        }
        CompletableFuture<Acknowledge> submissionFuture = jobManagerGateway.submitJob(jobGraph, ListeningBehaviour.DETACHED, timeout);
        try {
            submissionFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
        catch (TimeoutException e) {
            throw new JobTimeoutException(jobGraph.getJobID(), "JobManager did not respond within " + timeout, e);
        }
        catch (Throwable throwable) {
            Throwable stripped = ExceptionUtils.stripExecutionException((Throwable)throwable);
            try {
                ExceptionUtils.tryDeserializeAndThrow((Throwable)stripped, (ClassLoader)classLoader);
            }
            catch (JobExecutionException jee) {
                throw jee;
            }
            catch (Throwable t) {
                throw new JobExecutionException(jobGraph.getJobID(), "JobSubmission failed.", t);
            }
        }
    }

    public static CompletableFuture<InetSocketAddress> retrieveBlobServerAddress(JobManagerGateway jobManagerGateway, Time timeout) {
        CompletableFuture<Integer> futureBlobPort = jobManagerGateway.requestBlobServerPort(timeout);
        String jmHostname = jobManagerGateway.getHostname();
        return futureBlobPort.thenApply(blobPort -> new InetSocketAddress(jmHostname, (int)blobPort));
    }
}

