/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.pekko;

import java.io.Serializable;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.ClassLoadingUtils;
import org.apache.flink.runtime.concurrent.pekko.ActorSystemScheduledExecutorAdapter;
import org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcServer;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.runtime.rpc.exceptions.RpcRuntimeException;
import org.apache.flink.runtime.rpc.messages.HandshakeSuccessMessage;
import org.apache.flink.runtime.rpc.messages.RemoteHandshakeMessage;
import org.apache.flink.runtime.rpc.pekko.ControlMessages;
import org.apache.flink.runtime.rpc.pekko.DeadLettersActor;
import org.apache.flink.runtime.rpc.pekko.FencedPekkoInvocationHandler;
import org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor;
import org.apache.flink.runtime.rpc.pekko.PekkoBasedEndpoint;
import org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcActor;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceConfiguration;
import org.apache.flink.runtime.rpc.pekko.PekkoUtils;
import org.apache.flink.runtime.rpc.pekko.SupervisorActor;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSelection;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Address;
import org.apache.pekko.actor.DeadLetter;
import org.apache.pekko.actor.Props;
import org.apache.pekko.pattern.Patterns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.reflect.ClassTag$;

@ThreadSafe
public class PekkoRpcService
implements RpcService {
    private static final Logger LOG = LoggerFactory.getLogger(PekkoRpcService.class);
    static final int VERSION = 2;
    private final Object lock = new Object();
    private final ActorSystem actorSystem;
    private final PekkoRpcServiceConfiguration configuration;
    private final ClassLoader flinkClassLoader;
    @GuardedBy(value="lock")
    private final Map<ActorRef, RpcEndpoint> actors = CollectionUtil.newHashMapWithExpectedSize((int)4);
    private final String address;
    private final int port;
    private final boolean captureAskCallstacks;
    private final ScheduledExecutor internalScheduledExecutor;
    private final CompletableFuture<Void> terminationFuture;
    private final Supervisor supervisor;
    private volatile boolean stopped;

    @VisibleForTesting
    public PekkoRpcService(ActorSystem actorSystem, PekkoRpcServiceConfiguration configuration) {
        this(actorSystem, configuration, PekkoRpcService.class.getClassLoader());
    }

    PekkoRpcService(ActorSystem actorSystem, PekkoRpcServiceConfiguration configuration, ClassLoader flinkClassLoader) {
        this.actorSystem = (ActorSystem)Preconditions.checkNotNull((Object)actorSystem, (String)"actor system");
        this.configuration = (PekkoRpcServiceConfiguration)Preconditions.checkNotNull((Object)configuration, (String)"pekko rpc service configuration");
        this.flinkClassLoader = (ClassLoader)Preconditions.checkNotNull((Object)flinkClassLoader, (String)"flinkClassLoader");
        Address actorSystemAddress = PekkoUtils.getAddress(actorSystem);
        this.address = actorSystemAddress.host().isDefined() ? actorSystemAddress.host().get() : "";
        this.port = actorSystemAddress.port().isDefined() ? (Integer)actorSystemAddress.port().get() : -1;
        this.captureAskCallstacks = configuration.captureAskCallStack();
        this.internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem, flinkClassLoader);
        this.terminationFuture = new CompletableFuture();
        this.stopped = false;
        this.supervisor = this.startSupervisorActor();
        this.startDeadLettersActor();
    }

    private void startDeadLettersActor() {
        ActorRef deadLettersActor = this.actorSystem.actorOf(DeadLettersActor.getProps(), "deadLettersActor");
        this.actorSystem.eventStream().subscribe(deadLettersActor, DeadLetter.class);
    }

    private Supervisor startSupervisorActor() {
        ExecutorService terminationFutureExecutor = Executors.newSingleThreadExecutor((ThreadFactory)new ExecutorThreadFactory("RpcService-Supervisor-Termination-Future-Executor"));
        ActorRef actorRef2 = SupervisorActor.startSupervisorActor(this.actorSystem, ClassLoadingUtils.withContextClassLoader((Executor)terminationFutureExecutor, (ClassLoader)this.flinkClassLoader));
        return Supervisor.create(actorRef2, terminationFutureExecutor);
    }

    public ActorSystem getActorSystem() {
        return this.actorSystem;
    }

    protected int getVersion() {
        return 2;
    }

    public String getAddress() {
        return this.address;
    }

    public int getPort() {
        return this.port;
    }

    public <C extends RpcGateway> C getSelfGateway(Class<C> selfGatewayType, RpcServer rpcServer) {
        if (selfGatewayType.isInstance(rpcServer)) {
            RpcServer selfGateway = rpcServer;
            return (C)selfGateway;
        }
        throw new ClassCastException("RpcEndpoint does not implement the RpcGateway interface of type " + String.valueOf(selfGatewayType) + ".");
    }

    public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) {
        return this.connectInternal(address, clazz, actorRef2 -> {
            Tuple2<String, String> addressHostname = this.extractAddressHostname((ActorRef)actorRef2);
            return new PekkoInvocationHandler((String)addressHostname.f0, (String)addressHostname.f1, (ActorRef)actorRef2, this.configuration.getTimeout(), this.configuration.getMaximumFramesize(), this.configuration.isForceRpcInvocationSerialization(), null, this.captureAskCallstacks, this.flinkClassLoader);
        });
    }

    public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(String address, F fencingToken, Class<C> clazz) {
        return this.connectInternal(address, clazz, actorRef2 -> {
            Tuple2<String, String> addressHostname = this.extractAddressHostname((ActorRef)actorRef2);
            return new FencedPekkoInvocationHandler<Serializable>((String)addressHostname.f0, (String)addressHostname.f1, (ActorRef)actorRef2, this.configuration.getTimeout(), this.configuration.getMaximumFramesize(), this.configuration.isForceRpcInvocationSerialization(), null, () -> fencingToken, this.captureAskCallstacks, this.flinkClassLoader);
        });
    }

    public <C extends RpcEndpoint> RpcServer startServer(C rpcEndpoint, Map<String, String> loggingContext) {
        Preconditions.checkNotNull(rpcEndpoint, (String)"rpc endpoint");
        SupervisorActor.ActorRegistration actorRegistration = this.registerRpcActor(rpcEndpoint, loggingContext);
        ActorRef actorRef2 = actorRegistration.getActorRef();
        CompletableFuture<Void> actorTerminationFuture = actorRegistration.getTerminationFuture();
        LOG.info("Starting RPC endpoint for {} at {} .", (Object)rpcEndpoint.getClass().getName(), (Object)actorRef2.path());
        String address = PekkoUtils.getRpcURL(this.actorSystem, actorRef2);
        Option<String> host = actorRef2.path().address().host();
        String hostname = host.isEmpty() ? "localhost" : host.get();
        Set implementedRpcGateways = RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass());
        implementedRpcGateways.add(RpcServer.class);
        implementedRpcGateways.add(PekkoBasedEndpoint.class);
        FencedPekkoInvocationHandler<Serializable> invocationHandler = rpcEndpoint instanceof FencedRpcEndpoint ? new FencedPekkoInvocationHandler<Serializable>(address, hostname, actorRef2, this.configuration.getTimeout(), this.configuration.getMaximumFramesize(), this.configuration.isForceRpcInvocationSerialization(), actorTerminationFuture, () -> ((FencedRpcEndpoint)((FencedRpcEndpoint)rpcEndpoint)).getFencingToken(), this.captureAskCallstacks, this.flinkClassLoader) : new FencedPekkoInvocationHandler<Serializable>(address, hostname, actorRef2, this.configuration.getTimeout(), this.configuration.getMaximumFramesize(), this.configuration.isForceRpcInvocationSerialization(), actorTerminationFuture, this.captureAskCallstacks, this.flinkClassLoader);
        ClassLoader classLoader = this.getClass().getClassLoader();
        RpcServer server = (RpcServer)Proxy.newProxyInstance(classLoader, implementedRpcGateways.toArray(new Class[implementedRpcGateways.size()]), invocationHandler);
        return server;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <C extends RpcEndpoint> SupervisorActor.ActorRegistration registerRpcActor(C rpcEndpoint, Map<String, String> loggingContext) {
        Class rpcActorType = rpcEndpoint instanceof FencedRpcEndpoint ? FencedPekkoRpcActor.class : PekkoRpcActor.class;
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((!this.stopped ? 1 : 0) != 0, (Object)"RpcService is stopped");
            SupervisorActor.StartRpcActorResponse startRpcActorResponse = SupervisorActor.startRpcActor(this.supervisor.getActor(), actorTerminationFuture -> Props.create(rpcActorType, rpcEndpoint, actorTerminationFuture, this.getVersion(), this.configuration.getMaximumFramesize(), this.configuration.isForceRpcInvocationSerialization(), this.flinkClassLoader, loggingContext), rpcEndpoint.getEndpointId());
            SupervisorActor.ActorRegistration actorRegistration = startRpcActorResponse.orElseThrow(cause -> new RpcRuntimeException(String.format("Could not create the %s for %s.", PekkoRpcActor.class.getSimpleName(), rpcEndpoint.getEndpointId()), cause));
            this.actors.put(actorRegistration.getActorRef(), rpcEndpoint);
            return actorRegistration;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stopServer(RpcServer selfGateway) {
        if (selfGateway instanceof PekkoBasedEndpoint) {
            RpcEndpoint rpcEndpoint;
            PekkoBasedEndpoint client = (PekkoBasedEndpoint)selfGateway;
            Object object = this.lock;
            synchronized (object) {
                if (this.stopped) {
                    return;
                }
                rpcEndpoint = this.actors.remove(client.getActorRef());
            }
            if (rpcEndpoint != null) {
                this.terminateRpcActor(client.getActorRef(), rpcEndpoint);
            } else {
                LOG.debug("RPC endpoint {} already stopped or from different RPC service", (Object)selfGateway.getAddress());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> rpcActorsTerminationFuture;
        Object object = this.lock;
        synchronized (object) {
            if (this.stopped) {
                return this.terminationFuture;
            }
            LOG.info("Stopping Pekko RPC service.");
            this.stopped = true;
            rpcActorsTerminationFuture = this.terminateRpcActors();
        }
        CompletableFuture supervisorTerminationFuture = FutureUtils.composeAfterwards(rpcActorsTerminationFuture, this.supervisor::closeAsync);
        CompletableFuture actorSystemTerminationFuture = FutureUtils.composeAfterwards((CompletableFuture)supervisorTerminationFuture, () -> ScalaFutureUtils.toJava(this.actorSystem.terminate()));
        actorSystemTerminationFuture.whenComplete((ignored, throwable) -> {
            ClassLoadingUtils.runWithContextClassLoader(() -> FutureUtils.doForward((Object)ignored, (Throwable)throwable, this.terminationFuture), (ClassLoader)this.flinkClassLoader);
            LOG.info("Stopped Pekko RPC service.");
        });
        return this.terminationFuture;
    }

    @Nonnull
    @GuardedBy(value="lock")
    private CompletableFuture<Void> terminateRpcActors() {
        ArrayList<CompletableFuture<Void>> rpcActorTerminationFutures = new ArrayList<CompletableFuture<Void>>(this.actors.size());
        for (Map.Entry<ActorRef, RpcEndpoint> actorRefRpcEndpointEntry : this.actors.entrySet()) {
            rpcActorTerminationFutures.add(this.terminateRpcActor(actorRefRpcEndpointEntry.getKey(), actorRefRpcEndpointEntry.getValue()));
        }
        this.actors.clear();
        return FutureUtils.waitForAll(rpcActorTerminationFutures);
    }

    private CompletableFuture<Void> terminateRpcActor(ActorRef rpcActorRef, RpcEndpoint rpcEndpoint) {
        rpcActorRef.tell((Object)ControlMessages.TERMINATE, ActorRef.noSender());
        return rpcEndpoint.getTerminationFuture();
    }

    public ScheduledExecutor getScheduledExecutor() {
        return this.internalScheduledExecutor;
    }

    private Tuple2<String, String> extractAddressHostname(ActorRef actorRef2) {
        String actorAddress = PekkoUtils.getRpcURL(this.actorSystem, actorRef2);
        Option<String> host = actorRef2.path().address().host();
        String hostname = host.isEmpty() ? "localhost" : host.get();
        return Tuple2.of((Object)actorAddress, (Object)hostname);
    }

    private <C extends RpcGateway> CompletableFuture<C> connectInternal(String address, Class<C> clazz, Function<ActorRef, InvocationHandler> invocationHandlerFactory) {
        Preconditions.checkState((!this.stopped ? 1 : 0) != 0, (Object)"RpcService is stopped");
        LOG.debug("Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.", (Object)address, (Object)clazz.getName());
        CompletableFuture<ActorRef> actorRefFuture = this.resolveActorAddress(address);
        CompletionStage handshakeFuture = actorRefFuture.thenCompose(actorRef2 -> ScalaFutureUtils.toJava(Patterns.ask(actorRef2, (Object)new RemoteHandshakeMessage(clazz, this.getVersion()), this.configuration.getTimeout().toMillis()).mapTo(ClassTag$.MODULE$.apply(HandshakeSuccessMessage.class))));
        CompletionStage gatewayFuture = actorRefFuture.thenCombineAsync(handshakeFuture, (actorRef2, ignored) -> {
            InvocationHandler invocationHandler = (InvocationHandler)invocationHandlerFactory.apply((ActorRef)actorRef2);
            ClassLoader classLoader = this.getClass().getClassLoader();
            RpcGateway proxy = (RpcGateway)Proxy.newProxyInstance(classLoader, new Class[]{clazz}, invocationHandler);
            return proxy;
        }, (Executor)this.actorSystem.dispatcher());
        return ClassLoadingUtils.guardCompletionWithContextClassLoader((CompletableFuture)gatewayFuture, (ClassLoader)this.flinkClassLoader);
    }

    private CompletableFuture<ActorRef> resolveActorAddress(String address) {
        ActorSelection actorSel = this.actorSystem.actorSelection(address);
        return actorSel.resolveOne(this.configuration.getTimeout()).toCompletableFuture().exceptionally(error -> {
            throw new CompletionException((Throwable)new RpcConnectionException(String.format("Could not connect to rpc endpoint under address %s.", address), error));
        });
    }

    private static final class Supervisor
    implements AutoCloseableAsync {
        private final ActorRef actor;
        private final ExecutorService terminationFutureExecutor;

        private Supervisor(ActorRef actor, ExecutorService terminationFutureExecutor) {
            this.actor = actor;
            this.terminationFutureExecutor = terminationFutureExecutor;
        }

        private static Supervisor create(ActorRef actorRef2, ExecutorService terminationFutureExecutor) {
            return new Supervisor(actorRef2, terminationFutureExecutor);
        }

        public ActorRef getActor() {
            return this.actor;
        }

        public CompletableFuture<Void> closeAsync() {
            return ExecutorUtils.nonBlockingShutdown((long)30L, (TimeUnit)TimeUnit.SECONDS, (ExecutorService[])new ExecutorService[]{this.terminationFutureExecutor});
        }
    }
}

