/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskmanager;

import akka.actor.ActorNotFound;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Props$;
import grizzled.slf4j.Logger;
import grizzled.slf4j.Logger$;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.net.BindException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.akka.AkkaUtils$;
import org.apache.flink.runtime.akka.DefaultQuarantineHandler;
import org.apache.flink.runtime.akka.QuarantineMonitor;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
import org.apache.flink.runtime.taskmanager.MemoryLogger;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.taskmanager.TaskManagerCliOptions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.util.NetUtils;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.mutable.ArrayOps;
import scala.concurrent.Await$;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.Duration$;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.Nothing$;
import scala.runtime.ObjectRef;
import scala.runtime.java8.JFunction0;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scopt.OptionParser;
import scopt.Read$;

public final class TaskManager$ {
    public static TaskManager$ MODULE$;
    private final Logger LOG;
    private final int STARTUP_FAILURE_RETURN_CODE;
    private final int RUNTIME_FAILURE_RETURN_CODE;
    private final long MAX_STARTUP_CONNECT_TIME;
    private final long STARTUP_CONNECT_LOG_SUPPRESS;
    private final FiniteDuration HEARTBEAT_INTERVAL;

    static {
        new TaskManager$();
    }

    public Logger LOG() {
        return this.LOG;
    }

    public int STARTUP_FAILURE_RETURN_CODE() {
        return this.STARTUP_FAILURE_RETURN_CODE;
    }

    public int RUNTIME_FAILURE_RETURN_CODE() {
        return this.RUNTIME_FAILURE_RETURN_CODE;
    }

    public long MAX_STARTUP_CONNECT_TIME() {
        return this.MAX_STARTUP_CONNECT_TIME;
    }

    public long STARTUP_CONNECT_LOG_SUPPRESS() {
        return this.STARTUP_CONNECT_LOG_SUPPRESS;
    }

    public FiniteDuration HEARTBEAT_INTERVAL() {
        return this.HEARTBEAT_INTERVAL;
    }

    public void main(String[] args) {
        Configuration configuration;
        EnvironmentInformation.logEnvironmentInfo(this.LOG().logger(), "TaskManager", args);
        SignalHandler.register(this.LOG().logger());
        JvmShutdownSafeguard.installAsShutdownHook(this.LOG().logger());
        long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();
        if (maxOpenFileHandles != -1L) {
            this.LOG().info((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(43).append("Maximum number of open file descriptors is ").append(maxOpenFileHandles).toString());
        } else {
            this.LOG().info((Function0 & Serializable & scala.Serializable)() -> "Cannot determine the maximum number of open file descriptors");
        }
        try {
            configuration = this.parseArgsAndLoadConfig(args);
        }
        catch (Throwable t) {
            this.LOG().error((Function0 & Serializable & scala.Serializable)() -> t.getMessage(), (Function0 & Serializable & scala.Serializable)() -> t);
            System.exit(this.STARTUP_FAILURE_RETURN_CODE());
            configuration = null;
        }
        Configuration configuration2 = configuration;
        ResourceID resourceId = ResourceID.generate();
        SecurityUtils.install(new SecurityConfiguration(configuration2));
        try {
            SecurityUtils.getInstalledContext().runSecured(new Callable<BoxedUnit>(configuration2, resourceId){
                private final Configuration configuration$1;
                private final ResourceID resourceId$1;

                public void call() {
                    TaskManager$.MODULE$.selectNetworkInterfaceAndRunTaskManager(this.configuration$1, this.resourceId$1, TaskManager.class);
                }
                {
                    this.configuration$1 = configuration$1;
                    this.resourceId$1 = resourceId$1;
                }
            });
        }
        catch (Throwable t) {
            this.LOG().error((Function0 & Serializable & scala.Serializable)() -> "Failed to run TaskManager.", (Function0 & Serializable & scala.Serializable)() -> t);
            System.exit(this.STARTUP_FAILURE_RETURN_CODE());
        }
    }

    public Configuration parseArgsAndLoadConfig(String[] args) throws Exception {
        Configuration configuration;
        OptionParser<TaskManagerCliOptions> parser = new OptionParser<TaskManagerCliOptions>(){
            {
                this.head((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Flink TaskManager"}));
                this.opt("configDir", Read$.MODULE$.stringRead()).action((Function2 & Serializable & scala.Serializable)(param, conf) -> {
                    conf.setConfigDir((String)param);
                    return conf;
                }).text("Specify configuration directory.");
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$new$1(java.lang.String org.apache.flink.runtime.taskmanager.TaskManagerCliOptions )}, serializedLambda);
            }
        };
        TaskManagerCliOptions cliConfig = (TaskManagerCliOptions)parser.parse((Seq)Predef$.MODULE$.wrapRefArray((Object[])args), (Object)new TaskManagerCliOptions()).getOrElse(() -> TaskManager$.$anonfun$parseArgsAndLoadConfig$1(args, (OptionParser)parser));
        try {
            this.LOG().info((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(27).append("Loading configuration from ").append(cliConfig.getConfigDir()).toString());
            configuration = GlobalConfiguration.loadConfiguration((String)cliConfig.getConfigDir());
        }
        catch (Exception e) {
            throw new Exception("Could not load configuration", e);
        }
        Configuration conf = configuration;
        try {
            FileSystem.initialize((Configuration)conf);
        }
        catch (IOException e) {
            throw new Exception("Error while setting the default filesystem scheme from configuration.", e);
        }
        return conf;
    }

    public void selectNetworkInterfaceAndRunTaskManager(Configuration configuration, ResourceID resourceID, Class<? extends TaskManager> taskManagerClass) throws Exception {
        HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration, Executors.directExecutor(), HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
        Tuple2<String, Iterator<Integer>> tuple2 = this.selectNetworkInterfaceAndPortRange(configuration, highAvailabilityServices);
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        String taskManagerHostname = (String)tuple2._1();
        Iterator actorSystemPortRange = (Iterator)tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)taskManagerHostname, (Object)actorSystemPortRange);
        Tuple2 tuple23 = tuple22;
        String taskManagerHostname2 = (String)tuple23._1();
        Iterator actorSystemPortRange2 = (Iterator)tuple23._2();
        try {
            this.runTaskManager(taskManagerHostname2, resourceID, actorSystemPortRange2, configuration, highAvailabilityServices, taskManagerClass);
        }
        finally {
            try {
                highAvailabilityServices.close();
            }
            catch (Throwable t) {
                this.LOG().warn((Function0 & Serializable & scala.Serializable)() -> "Could not properly stop the high availability services.", (Function0 & Serializable & scala.Serializable)() -> t);
            }
        }
    }

    public Tuple2<String, Iterator<Integer>> selectNetworkInterfaceAndPortRange(Configuration configuration, HighAvailabilityServices highAvailabilityServices) throws IOException, IllegalConfigurationException {
        Iterator iterator;
        ObjectRef taskManagerHostname = ObjectRef.create((Object)configuration.getString(TaskManagerOptions.HOST));
        if ((String)taskManagerHostname.elem != null) {
            this.LOG().info((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(51).append("Using configured hostname/address for TaskManager: ").append((String)taskManagerHostname$3.elem).toString());
        } else {
            FiniteDuration lookupTimeout = AkkaUtils$.MODULE$.getLookupTimeout(configuration);
            InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), lookupTimeout);
            taskManagerHostname.elem = taskManagerAddress.getHostName();
            this.LOG().info((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(41).append("TaskManager will use hostname/address '").append((String)taskManagerHostname$3.elem).append("' ").append(new StringBuilder(21).append("(").append(taskManagerAddress.getHostAddress()).append(") for communication.").toString()).toString());
        }
        String portRange = configuration.getString(TaskManagerOptions.RPC_PORT);
        try {
            iterator = NetUtils.getPortRangeFromString((String)portRange);
        }
        catch (NumberFormatException numberFormatException) {
            throw new IllegalConfigurationException(new StringBuilder(150).append("Invalid value for '").append(TaskManagerOptions.RPC_PORT.key()).append("' (port for the TaskManager actor system) : ").append(portRange).append(" - Leave config parameter empty or use 0 to let the system choose a port automatically.").toString());
        }
        Iterator portRangeIterator = iterator;
        return new Tuple2((Object)((String)taskManagerHostname.elem), (Object)portRangeIterator);
    }

    public void runTaskManager(String taskManagerHostname, ResourceID resourceID, int actorSystemPort, Configuration configuration, HighAvailabilityServices highAvailabilityServices) throws Exception {
        this.runTaskManager(taskManagerHostname, resourceID, actorSystemPort, configuration, highAvailabilityServices, TaskManager.class);
    }

    public void runTaskManager(String taskManagerHostname, ResourceID resourceID, int actorSystemPort, Configuration configuration, HighAvailabilityServices highAvailabilityServices, Class<? extends TaskManager> taskManagerClass) throws Exception {
        this.LOG().info((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(38).append("Starting TaskManager with ResourceID: ").append(resourceID).toString());
        this.LOG().info((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(39).append("Starting TaskManager actor system at ").append(taskManagerHostname).append(":").append(actorSystemPort).append(".").toString());
        ActorSystem taskManagerSystem = BootstrapTools.startActorSystem(configuration, taskManagerHostname, actorSystemPort, this.LOG().logger());
        MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
        metricRegistry.startQueryService(taskManagerSystem, resourceID);
        try {
            BoxedUnit boxedUnit;
            this.LOG().info((Function0 & Serializable & scala.Serializable)() -> "Starting TaskManager actor");
            ActorRef taskManager = this.startTaskManagerComponentsAndActor(configuration, resourceID, taskManagerSystem, highAvailabilityServices, metricRegistry, taskManagerHostname, (Option<String>)new Some((Object)"taskmanager"), false, taskManagerClass);
            this.LOG().debug((Function0 & Serializable & scala.Serializable)() -> "Starting TaskManager process reaper");
            taskManagerSystem.actorOf(Props$.MODULE$.apply(ProcessReaper.class, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{taskManager, this.LOG().logger(), BoxesRunTime.boxToInteger((int)this.RUNTIME_FAILURE_RETURN_CODE())})), "TaskManager_Process_Reaper");
            if (configuration.getBoolean(TaskManagerOptions.EXIT_ON_FATAL_AKKA_ERROR)) {
                DefaultQuarantineHandler quarantineHandler = new DefaultQuarantineHandler(Time.milliseconds((long)AkkaUtils$.MODULE$.getTimeout(configuration).toMillis()), this.RUNTIME_FAILURE_RETURN_CODE(), this.LOG().logger());
                this.LOG().debug((Function0 & Serializable & scala.Serializable)() -> "Starting TaskManager quarantine monitor");
                boxedUnit = taskManagerSystem.actorOf(Props$.MODULE$.apply(QuarantineMonitor.class, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{quarantineHandler, this.LOG().logger()})));
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            MemoryLogger.startIfConfigured(this.LOG().logger(), configuration, taskManagerSystem);
            Await$.MODULE$.ready((Awaitable)taskManagerSystem.whenTerminated(), (Duration)Duration$.MODULE$.Inf());
        }
        catch (Throwable t) {
            this.LOG().error((Function0 & Serializable & scala.Serializable)() -> "Error while starting up taskManager", (Function0 & Serializable & scala.Serializable)() -> t);
            taskManagerSystem.terminate().onComplete((Function1 & Serializable & scala.Serializable)x0$5 -> {
                TaskManager$.$anonfun$runTaskManager$8(x0$5);
                return BoxedUnit.UNIT;
            }, Executors.directExecutionContext());
            throw t;
        }
        try {
            metricRegistry.shutdown().get();
        }
        catch (Throwable t) {
            this.LOG().error((Function0 & Serializable & scala.Serializable)() -> "Could not properly shut down the metric registry.", (Function0 & Serializable & scala.Serializable)() -> t);
        }
    }

    public void runTaskManager(String taskManagerHostname, ResourceID resourceID, Iterator<Integer> actorSystemPortRange, Configuration configuration, HighAvailabilityServices highAvailabilityServices, Class<? extends TaskManager> taskManagerClass) throws Exception {
        Try result = AkkaUtils$.MODULE$.retryOnBindException((JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            int n;
            ServerSocket socket = NetUtils.createSocketFromPorts((Iterator)actorSystemPortRange, (NetUtils.SocketFactory)new NetUtils.SocketFactory(){

                public ServerSocket createSocket(int port) {
                    return new ServerSocket(port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress()));
                }
            });
            if (socket == null) {
                throw new BindException("Unable to allocate port for TaskManager.");
            }
            try {
                n = socket.getLocalPort();
            }
            finally {
                socket.close();
            }
            int port = n;
            MODULE$.runTaskManager(taskManagerHostname, resourceID, port, configuration, highAvailabilityServices, taskManagerClass);
        }, (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> !actorSystemPortRange.hasNext(), 5000L);
        Try try_ = result;
        if (try_ instanceof Failure) {
            Failure failure = (Failure)try_;
            Throwable f = failure.exception();
            throw f;
        }
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public ActorRef startTaskManagerComponentsAndActor(Configuration configuration, ResourceID resourceID, ActorSystem actorSystem, HighAvailabilityServices highAvailabilityServices, MetricRegistry metricRegistry, String taskManagerHostname, Option<String> taskManagerActorName, boolean localTaskManagerCommunication, Class<? extends TaskManager> taskManagerClass) throws IllegalConfigurationException, IOException, Exception {
        ActorRef actorRef;
        InetAddress taskManagerAddress = InetAddress.getByName(taskManagerHostname);
        TaskManagerServicesConfiguration taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration(configuration, taskManagerAddress, false);
        TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
        TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, resourceID, (Executor)actorSystem.dispatcher(), EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(), EnvironmentInformation.getMaxJvmHeapMemory());
        TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(metricRegistry, taskManagerServices.getTaskManagerLocation(), taskManagerServices.getNetworkEnvironment(), taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
        Props tmProps = this.getTaskManagerProps(taskManagerClass, taskManagerConfiguration, resourceID, taskManagerServices.getTaskManagerLocation(), taskManagerServices.getMemoryManager(), taskManagerServices.getIOManager(), taskManagerServices.getNetworkEnvironment(), taskManagerServices.getTaskManagerStateStore(), highAvailabilityServices, taskManagerMetricGroup);
        Option<String> option = taskManagerActorName;
        if (option instanceof Some) {
            Some some = (Some)option;
            String actorName = (String)some.value();
            actorRef = actorSystem.actorOf(tmProps, actorName);
        } else if (None$.MODULE$.equals(option)) {
            actorRef = actorSystem.actorOf(tmProps);
        } else {
            throw new MatchError(option);
        }
        return actorRef;
    }

    public Props getTaskManagerProps(Class<? extends TaskManager> taskManagerClass, TaskManagerConfiguration taskManagerConfig, ResourceID resourceID, TaskManagerLocation taskManagerLocation, MemoryManager memoryManager, IOManager ioManager, NetworkEnvironment networkEnvironment, TaskExecutorLocalStateStoresManager taskStateManager, HighAvailabilityServices highAvailabilityServices, TaskManagerMetricGroup taskManagerMetricGroup) {
        return Props$.MODULE$.apply(taskManagerClass, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{taskManagerConfig, resourceID, taskManagerLocation, memoryManager, ioManager, networkEnvironment, taskStateManager, BoxesRunTime.boxToInteger((int)taskManagerConfig.getNumberSlots()), highAvailabilityServices, taskManagerMetricGroup}));
    }

    public ActorRef getTaskManagerRemoteReference(String taskManagerUrl, ActorSystem system, FiniteDuration timeout) throws IOException {
        ActorRef actorRef;
        try {
            Future<ActorRef> future = AkkaUtils$.MODULE$.getActorRefFuture(taskManagerUrl, system, timeout);
            actorRef = (ActorRef)Await$.MODULE$.result(future, (Duration)timeout);
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            boolean bl = throwable2 instanceof ActorNotFound ? true : throwable2 instanceof TimeoutException;
            if (bl) {
                throw new IOException(new StringBuilder(106).append("TaskManager at ").append(taskManagerUrl).append(" not reachable. ").append("Please make sure that the TaskManager is running and its port is reachable.").toString(), throwable2);
            }
            if (throwable2 instanceof IOException) {
                IOException iOException = (IOException)throwable2;
                throw new IOException(new StringBuilder(36).append("Could not connect to TaskManager at ").append(taskManagerUrl).toString(), iOException);
            }
            throw throwable;
        }
        return actorRef;
    }

    public static final /* synthetic */ Nothing$ $anonfun$parseArgsAndLoadConfig$1(String[] args$1, OptionParser parser$1) {
        throw new Exception(new StringBuilder(41).append("Invalid command line arguments: ").append(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])args$1)).mkString(" ")).append(". Usage: ").append(parser$1.usage()).toString());
    }

    public static final /* synthetic */ void $anonfun$runTaskManager$8(Try x0$5) {
        Try try_ = x0$5;
        if (try_ instanceof Success) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if (try_ instanceof Failure) {
            Failure failure = (Failure)try_;
            Throwable tt = failure.exception();
            MODULE$.LOG().warn((Function0 & Serializable & scala.Serializable)() -> "Could not cleanly shut down actor system", (Function0 & Serializable & scala.Serializable)() -> tt);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            throw new MatchError((Object)try_);
        }
    }

    private TaskManager$() {
        MODULE$ = this;
        this.LOG = Logger$.MODULE$.apply(TaskManager.class);
        this.STARTUP_FAILURE_RETURN_CODE = 1;
        this.RUNTIME_FAILURE_RETURN_CODE = 2;
        this.MAX_STARTUP_CONNECT_TIME = 120000L;
        this.STARTUP_CONNECT_LOG_SUPPRESS = 10000L;
        this.HEARTBEAT_INTERVAL = new package.DurationInt(package$.MODULE$.DurationInt(5000)).milliseconds();
    }
}

