package org.streampipes.wrapper.flink;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/streampipes/wrapper/flink/FlinkJobController.class */
public class FlinkJobController {
    private static final FiniteDuration askTimeout = new FiniteDuration(120000, TimeUnit.MILLISECONDS);
    private static final FiniteDuration lookupTimeout = new FiniteDuration(120000, TimeUnit.MILLISECONDS);
    private final Configuration config;

    public FlinkJobController(String str, int i) {
        this.config = getConfig(str, i);
    }

    public ActorGateway getJobManagerGateway() throws Exception {
        try {
            return LeaderRetrievalUtils.retrieveLeaderGateway(HighAvailabilityServicesUtils.createHighAvailabilityServices(this.config, Executors.directExecutor(), HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION).getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), AkkaUtils.createActorSystem(this.config, new Some(new Tuple2("", 0))), lookupTimeout);
        } catch (Exception e) {
            throw new IOException("Could not start actor system to communicate with JobManager", e);
        }
    }

    public JobID findJobId(ActorGateway actorGateway, String str) throws Exception {
        try {
            Object result = Await.result(actorGateway.ask(JobManagerMessages.getRequestRunningJobsStatus(), askTimeout), askTimeout);
            if (result instanceof JobManagerMessages.RunningJobsStatus) {
                for (JobStatusMessage jobStatusMessage : ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages()) {
                    if (jobStatusMessage.getJobState().equals(JobStatus.RUNNING) || jobStatusMessage.getJobState().equals(JobStatus.RESTARTING)) {
                        if (jobStatusMessage.getJobName().equals(str)) {
                            return jobStatusMessage.getJobId();
                        }
                    }
                }
            }
            throw new Exception("Could not find job");
        } catch (Exception e) {
            throw new Exception("Could not retrieve running jobs from the JobManager.", e);
        }
    }

    public boolean deleteJob(JobID jobID) {
        try {
            Await.result(getJobManagerGateway().ask(new JobManagerMessages.CancelJob(jobID), askTimeout), askTimeout);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    private final Configuration getConfig(String str, int i) {
        Configuration configuration = new Configuration();
        configuration.setString("jobmanager.rpc.address", str);
        configuration.setInteger("jobmanager.rpc.port", i);
        return configuration;
    }
}
