/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.AddressFromURIString;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.akka.AkkaBootstrapTools;
import org.apache.flink.runtime.rpc.akka.CustomSSLEngineProvider;
import org.apache.flink.runtime.rpc.akka.EscalatingSupervisorStrategy;
import org.apache.flink.runtime.rpc.akka.HostAndPort;
import org.apache.flink.runtime.rpc.akka.RemoteAddressExtension;
import org.apache.flink.runtime.rpc.akka.RobustActorSystem;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TimeUtils;
import org.apache.flink.util.function.FunctionUtils;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.logging.Slf4JLoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class AkkaUtils {
    private static final Logger LOG = LoggerFactory.getLogger(AkkaUtils.class);
    private static final String FLINK_ACTOR_SYSTEM_NAME = "flink";

    AkkaUtils() {
    }

    public static String getFlinkActorSystemName() {
        return FLINK_ACTOR_SYSTEM_NAME;
    }

    private static Config getBasicAkkaConfig(Configuration configuration) {
        int akkaThroughput = configuration.getInteger(AkkaOptions.DISPATCHER_THROUGHPUT);
        String jvmExitOnFatalError = AkkaUtils.booleanToOnOrOff(configuration.getBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR));
        String logLifecycleEvents = AkkaUtils.booleanToOnOrOff(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS));
        String supervisorStrategy2 = EscalatingSupervisorStrategy.class.getCanonicalName();
        return new AkkaConfigBuilder().add("akka {").add("  daemonic = off").add("  loggers = [\"akka.event.slf4j.Slf4jLogger\"]").add("  logging-filter = \"akka.event.slf4j.Slf4jLoggingFilter\"").add("  log-config-on-start = off").add("  logger-startup-timeout = 50s").add("  loglevel = " + AkkaUtils.getLogLevel()).add("  stdout-loglevel = OFF").add("  log-dead-letters = " + logLifecycleEvents).add("  log-dead-letters-during-shutdown = " + logLifecycleEvents).add("  jvm-exit-on-fatal-error = " + jvmExitOnFatalError).add("  serialize-messages = off").add("  actor {").add("    guardian-supervisor-strategy = " + supervisorStrategy2).add("    warn-about-java-serializer-usage = off").add("    allow-java-serialization = on").add("    default-dispatcher {").add("      throughput = " + akkaThroughput).add("    }").add("    supervisor-dispatcher {").add("      type = Dispatcher").add("      executor = \"thread-pool-executor\"").add("      thread-pool-executor {").add("        core-pool-size-min = 1").add("        core-pool-size-max = 1").add("      }").add("    }").add("  }").add("}").build();
    }

    private static String getLogLevel() {
        if (LOG.isTraceEnabled()) {
            return "DEBUG";
        }
        if (LOG.isDebugEnabled()) {
            return "DEBUG";
        }
        if (LOG.isInfoEnabled()) {
            return "INFO";
        }
        if (LOG.isWarnEnabled()) {
            return "WARNING";
        }
        if (LOG.isErrorEnabled()) {
            return "ERROR";
        }
        return "OFF";
    }

    public static Config getThreadPoolExecutorConfig(RpcSystem.FixedThreadPoolExecutorConfiguration configuration) {
        int threadPriority = configuration.getThreadPriority();
        int minNumThreads = configuration.getMinNumThreads();
        int maxNumThreads = configuration.getMaxNumThreads();
        return new AkkaConfigBuilder().add("akka {").add("  actor {").add("    default-dispatcher {").add("      type = org.apache.flink.runtime.rpc.akka.PriorityThreadsDispatcher").add("      executor = thread-pool-executor").add("      thread-priority = " + threadPriority).add("      thread-pool-executor {").add("          core-pool-size-min = " + minNumThreads).add("          core-pool-size-max = " + maxNumThreads).add("      }").add("    }").add("  }").add("}").build();
    }

    public static Config getForkJoinExecutorConfig(RpcSystem.ForkJoinExecutorConfiguration configuration) {
        double parallelismFactor = configuration.getParallelismFactor();
        int minNumThreads = configuration.getMinParallelism();
        int maxNumThreads = configuration.getMaxParallelism();
        return new AkkaConfigBuilder().add("akka {").add("  actor {").add("    default-dispatcher {").add("      executor = fork-join-executor").add("      fork-join-executor {").add("          parallelism-factor = " + parallelismFactor).add("          parallelism-min = " + minNumThreads).add("          parallelism-max = " + maxNumThreads).add("      }").add("    }").add("  }").add("}").build();
    }

    private static Config getRemoteAkkaConfig(Configuration configuration, String bindAddress, int port, String externalHostname, int externalPort) {
        AkkaConfigBuilder builder = new AkkaConfigBuilder();
        AkkaUtils.addBaseRemoteAkkaConfig(builder, configuration, port, externalPort);
        AkkaUtils.addHostnameRemoteAkkaConfig(builder, bindAddress, externalHostname);
        AkkaUtils.addSslRemoteAkkaConfig(builder, configuration);
        return builder.build();
    }

    private static void addBaseRemoteAkkaConfig(AkkaConfigBuilder akkaConfigBuilder, Configuration configuration, int port, int externalPort) {
        Duration akkaAskTimeout = (Duration)configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
        String startupTimeout = TimeUtils.getStringInMillis((Duration)TimeUtils.parseDuration((String)configuration.getString(AkkaOptions.STARTUP_TIMEOUT, TimeUtils.getStringInMillis((Duration)akkaAskTimeout.multipliedBy(10L)))));
        String akkaTCPTimeout = TimeUtils.getStringInMillis((Duration)TimeUtils.parseDuration((String)configuration.getString(AkkaOptions.TCP_TIMEOUT)));
        String akkaFramesize = configuration.getString(AkkaOptions.FRAMESIZE);
        int clientSocketWorkerPoolPoolSizeMin = (Integer)configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN);
        int clientSocketWorkerPoolPoolSizeMax = (Integer)configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX);
        double clientSocketWorkerPoolPoolSizeFactor = (Double)configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR);
        int serverSocketWorkerPoolPoolSizeMin = (Integer)configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN);
        int serverSocketWorkerPoolPoolSizeMax = (Integer)configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX);
        double serverSocketWorkerPoolPoolSizeFactor = (Double)configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR);
        String logLifecycleEvents = AkkaUtils.booleanToOnOrOff(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS));
        long retryGateClosedFor = configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR);
        akkaConfigBuilder.add("akka {").add("  actor {").add("    provider = \"akka.remote.RemoteActorRefProvider\"").add("  }").add("  remote.artery.enabled = false").add("  remote.startup-timeout = " + startupTimeout).add("  remote.warn-about-direct-use = off").add("  remote.use-unsafe-remote-features-outside-cluster = on").add("  remote.classic {").add("    # disable the transport failure detector by setting very high values").add("    transport-failure-detector{").add("      acceptable-heartbeat-pause = 6000 s").add("      heartbeat-interval = 1000 s").add("      threshold = 300").add("    }").add("    enabled-transports = [\"akka.remote.classic.netty.tcp\"]").add("    netty {").add("      tcp {").add("        transport-class = \"akka.remote.transport.netty.NettyTransport\"").add("        port = " + externalPort).add("        bind-port = " + port).add("        connection-timeout = " + akkaTCPTimeout).add("        maximum-frame-size = " + akkaFramesize).add("        tcp-nodelay = on").add("        client-socket-worker-pool {").add("          pool-size-min = " + clientSocketWorkerPoolPoolSizeMin).add("          pool-size-max = " + clientSocketWorkerPoolPoolSizeMax).add("          pool-size-factor = " + clientSocketWorkerPoolPoolSizeFactor).add("        }").add("        server-socket-worker-pool {").add("          pool-size-min = " + serverSocketWorkerPoolPoolSizeMin).add("          pool-size-max = " + serverSocketWorkerPoolPoolSizeMax).add("          pool-size-factor = " + serverSocketWorkerPoolPoolSizeFactor).add("        }").add("      }").add("    }").add("    log-remote-lifecycle-events = " + logLifecycleEvents).add("    retry-gate-closed-for = " + retryGateClosedFor + " ms").add("  }").add("}");
    }

    private static void addHostnameRemoteAkkaConfig(AkkaConfigBuilder akkaConfigBuilder, String bindAddress, String externalHostname) {
        String normalizedExternalHostname = NetUtils.unresolvedHostToNormalizedString((String)externalHostname);
        String effectiveHostname = normalizedExternalHostname != null && !normalizedExternalHostname.isEmpty() ? normalizedExternalHostname : "";
        akkaConfigBuilder.add("akka {").add("  remote.classic {").add("    netty {").add("      tcp {").add("        hostname = \"" + effectiveHostname + "\"").add("        bind-hostname = \"" + bindAddress + "\"").add("      }").add("    }").add("  }").add("}");
    }

    private static void addSslRemoteAkkaConfig(AkkaConfigBuilder akkaConfigBuilder, Configuration configuration) {
        boolean akkaEnableSSLConfig = configuration.getBoolean(AkkaOptions.SSL_ENABLED) && SecurityOptions.isInternalSSLEnabled((Configuration)configuration);
        String akkaEnableSSL = AkkaUtils.booleanToOnOrOff(akkaEnableSSLConfig);
        String akkaSSLKeyStore = configuration.getString(SecurityOptions.SSL_INTERNAL_KEYSTORE, configuration.getString(SecurityOptions.SSL_KEYSTORE));
        String akkaSSLKeyStorePassword = configuration.getString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, configuration.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD));
        String akkaSSLKeyPassword = configuration.getString(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, configuration.getString(SecurityOptions.SSL_KEY_PASSWORD));
        String akkaSSLTrustStore = configuration.getString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE, configuration.getString(SecurityOptions.SSL_TRUSTSTORE));
        String akkaSSLTrustStorePassword = configuration.getString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, configuration.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD));
        String akkaSSLCertFingerprintString = configuration.getString(SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT);
        String akkaSSLCertFingerprints = akkaSSLCertFingerprintString != null ? Arrays.stream(akkaSSLCertFingerprintString.split(",")).collect(Collectors.joining("\",\"", "[\"", "\"]")) : "[]";
        String akkaSSLProtocol = configuration.getString(SecurityOptions.SSL_PROTOCOL);
        String akkaSSLAlgorithmsString = configuration.getString(SecurityOptions.SSL_ALGORITHMS);
        String akkaSSLAlgorithms = Arrays.stream(akkaSSLAlgorithmsString.split(",")).collect(Collectors.joining(",", "[", "]"));
        String sslEngineProviderName = CustomSSLEngineProvider.class.getCanonicalName();
        akkaConfigBuilder.add("akka {").add("  remote.classic {").add("    enabled-transports = [\"akka.remote.classic.netty.ssl\"]").add("    netty {").add("      ssl = ${akka.remote.classic.netty.tcp}").add("      ssl {").add("        enable-ssl = " + akkaEnableSSL).add("        ssl-engine-provider = " + sslEngineProviderName).add("        security {").add("          key-store = \"" + akkaSSLKeyStore + "\"").add("          key-store-password = \"" + akkaSSLKeyStorePassword + "\"").add("          key-password = \"" + akkaSSLKeyPassword + "\"").add("          trust-store = \"" + akkaSSLTrustStore + "\"").add("          trust-store-password = \"" + akkaSSLTrustStorePassword + "\"").add("          protocol = " + akkaSSLProtocol + "").add("          enabled-algorithms = " + akkaSSLAlgorithms + "").add("          random-number-generator = \"\"").add("          require-mutual-authentication = on").add("          cert-fingerprints = " + akkaSSLCertFingerprints + "").add("        }").add("      }").add("    }").add("  }").add("}");
    }

    public static ActorSystem createLocalActorSystem(Configuration configuration) {
        Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, null);
        return AkkaUtils.createActorSystem(akkaConfig);
    }

    private static ActorSystem createActorSystem(Config akkaConfig) {
        return AkkaUtils.createActorSystem(AkkaUtils.getFlinkActorSystemName(), akkaConfig);
    }

    public static ActorSystem createActorSystem(String actorSystemName, Config akkaConfig) {
        InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
        return RobustActorSystem.create(actorSystemName, akkaConfig);
    }

    @VisibleForTesting
    public static ActorSystem createDefaultActorSystem() {
        return AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
    }

    private static Config getDefaultAkkaConfig() {
        return AkkaUtils.getAkkaConfig(new Configuration(), new HostAndPort("", 0));
    }

    public static Config getAkkaConfig(Configuration configuration, @Nullable HostAndPort externalAddress) {
        return AkkaUtils.getAkkaConfig(configuration, externalAddress, null, AkkaUtils.getForkJoinExecutorConfig(AkkaBootstrapTools.getForkJoinExecutorConfiguration(configuration)));
    }

    public static Config getAkkaConfig(Configuration configuration, @Nullable HostAndPort externalAddress, @Nullable HostAndPort bindAddress, Config executorConfig) {
        Config defaultConfig = AkkaUtils.getBasicAkkaConfig(configuration).withFallback(executorConfig);
        if (externalAddress != null) {
            if (bindAddress != null) {
                Config remoteConfig = AkkaUtils.getRemoteAkkaConfig(configuration, bindAddress.getHost(), bindAddress.getPort(), externalAddress.getHost(), externalAddress.getPort());
                return remoteConfig.withFallback(defaultConfig);
            }
            Config remoteConfig = AkkaUtils.getRemoteAkkaConfig(configuration, NetUtils.getWildcardIPAddress(), externalAddress.getPort(), externalAddress.getHost(), externalAddress.getPort());
            return remoteConfig.withFallback(defaultConfig);
        }
        return defaultConfig;
    }

    public static Address getAddress(ActorSystem system) {
        return ((RemoteAddressExtension.RemoteAddressExtensionImpl)RemoteAddressExtension.INSTANCE.apply(system)).getAddress();
    }

    public static String getAkkaURL(ActorSystem system, ActorRef actor) {
        Address address = AkkaUtils.getAddress(system);
        return actor.path().toStringWithAddress(address);
    }

    public static Address getAddressFromAkkaURL(String akkaURL) throws MalformedURLException {
        return AddressFromURIString.apply(akkaURL);
    }

    public static InetSocketAddress getInetSocketAddressFromAkkaURL(String akkaURL) throws Exception {
        try {
            Address address = AkkaUtils.getAddressFromAkkaURL(akkaURL);
            if (address.host().isDefined() && address.port().isDefined()) {
                return new InetSocketAddress(address.host().get(), (int)((Integer)address.port().get()));
            }
            throw new MalformedURLException();
        }
        catch (MalformedURLException e) {
            throw new Exception("Could not retrieve InetSocketAddress from Akka URL " + akkaURL);
        }
    }

    public static CompletableFuture<Void> terminateActorSystem(ActorSystem actorSystem) {
        return AkkaFutureUtils.toJava(actorSystem.terminate()).thenAccept(FunctionUtils.ignoreFn());
    }

    private static String booleanToOnOrOff(boolean flag) {
        return flag ? "on" : "off";
    }

    private static class AkkaConfigBuilder {
        private final StringWriter stringWriter = new StringWriter();
        private final PrintWriter printWriter = new PrintWriter(this.stringWriter);

        private AkkaConfigBuilder() {
        }

        public AkkaConfigBuilder add(String configLine) {
            this.printWriter.println(configLine);
            return this;
        }

        public Config build() {
            return ConfigFactory.parseString(this.stringWriter.toString()).resolve();
        }
    }
}

