/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.pekko;

import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.runtime.concurrent.ClassLoadingUtils;
import org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.Local;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcGatewayUtils;
import org.apache.flink.runtime.rpc.RpcServer;
import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.runtime.rpc.messages.CallAsync;
import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation;
import org.apache.flink.runtime.rpc.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.messages.RunAsync;
import org.apache.flink.runtime.rpc.pekko.ControlMessages;
import org.apache.flink.runtime.rpc.pekko.PekkoBasedEndpoint;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils;
import org.apache.flink.runtime.rpc.pekko.RpcSerializedValue;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.pattern.AskTimeoutException;
import org.apache.pekko.pattern.Patterns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PekkoInvocationHandler
implements InvocationHandler,
PekkoBasedEndpoint,
RpcServer {
    private static final Logger LOG = LoggerFactory.getLogger(PekkoInvocationHandler.class);
    private final String address;
    private final String hostname;
    private final ActorRef rpcEndpoint;
    private final ClassLoader flinkClassLoader;
    protected final boolean isLocal;
    protected final boolean forceRpcInvocationSerialization;
    private final Duration timeout;
    private final long maximumFramesize;
    @Nullable
    private final CompletableFuture<Void> terminationFuture;
    private final boolean captureAskCallStack;

    PekkoInvocationHandler(String address, String hostname, ActorRef rpcEndpoint, Duration timeout, long maximumFramesize, boolean forceRpcInvocationSerialization, @Nullable CompletableFuture<Void> terminationFuture, boolean captureAskCallStack, ClassLoader flinkClassLoader) {
        this.address = (String)Preconditions.checkNotNull((Object)address);
        this.hostname = (String)Preconditions.checkNotNull((Object)hostname);
        this.rpcEndpoint = (ActorRef)Preconditions.checkNotNull((Object)rpcEndpoint);
        this.flinkClassLoader = (ClassLoader)Preconditions.checkNotNull((Object)flinkClassLoader);
        this.isLocal = this.rpcEndpoint.path().address().hasLocalScope();
        this.timeout = (Duration)Preconditions.checkNotNull((Object)timeout);
        this.maximumFramesize = maximumFramesize;
        this.forceRpcInvocationSerialization = forceRpcInvocationSerialization;
        this.terminationFuture = terminationFuture;
        this.captureAskCallStack = captureAskCallStack;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Object result;
        Class<?> declaringClass = method.getDeclaringClass();
        if (declaringClass.equals(PekkoBasedEndpoint.class) || declaringClass.equals(Object.class) || declaringClass.equals(RpcGateway.class) || declaringClass.equals(StartStoppable.class) || declaringClass.equals(MainThreadExecutable.class) || declaringClass.equals(RpcServer.class)) {
            result = method.invoke((Object)this, args);
        } else {
            if (declaringClass.equals(FencedRpcGateway.class)) {
                throw new UnsupportedOperationException("InvocationHandler does not support the call FencedRpcGateway#" + method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to retrieve a properly FencedRpcGateway.");
            }
            result = this.invokeRpc(method, args);
        }
        return result;
    }

    @Override
    public ActorRef getActorRef() {
        return this.rpcEndpoint;
    }

    public void runAsync(Runnable runnable) {
        this.scheduleRunAsync(runnable, 0L);
    }

    public void scheduleRunAsync(Runnable runnable, long delayMillis) {
        Preconditions.checkNotNull((Object)runnable, (String)"runnable");
        Preconditions.checkArgument((delayMillis >= 0L ? 1 : 0) != 0, (Object)"delay must be zero or greater");
        if (!this.isLocal) {
            throw new RuntimeException("Trying to send a Runnable to a remote actor at " + String.valueOf(this.rpcEndpoint.path()) + ". This is not supported.");
        }
        long atTimeNanos = delayMillis == 0L ? 0L : System.nanoTime() + delayMillis * 1000000L;
        this.tell(new RunAsync(runnable, atTimeNanos));
    }

    public <V> CompletableFuture<V> callAsync(Callable<V> callable, Duration callTimeout) {
        if (this.isLocal) {
            CompletableFuture<?> resultFuture = this.ask(new CallAsync(callable), callTimeout);
            return resultFuture;
        }
        throw new RuntimeException("Trying to send a Callable to a remote actor at " + String.valueOf(this.rpcEndpoint.path()) + ". This is not supported.");
    }

    public void start() {
        this.rpcEndpoint.tell((Object)ControlMessages.START, ActorRef.noSender());
    }

    public void stop() {
        this.rpcEndpoint.tell((Object)ControlMessages.STOP, ActorRef.noSender());
    }

    private Object invokeRpc(Method method, Object[] args) throws Exception {
        CompletableFuture result;
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        boolean isLocalRpcInvocation = method.getAnnotation(Local.class) != null;
        Annotation[][] parameterAnnotations = method.getParameterAnnotations();
        Duration futureTimeout = RpcGatewayUtils.extractRpcTimeout((Annotation[][])parameterAnnotations, (Object[])args, (Duration)this.timeout);
        RpcInvocation rpcInvocation = this.createRpcInvocationMessage(method.getDeclaringClass().getSimpleName(), methodName, isLocalRpcInvocation, parameterTypes, args);
        Class<?> returnType = method.getReturnType();
        if (Objects.equals(returnType, Void.TYPE)) {
            this.tell(rpcInvocation);
            result = null;
        } else {
            Throwable callStackCapture = this.captureAskCallStack ? new Throwable() : null;
            CompletionStage resultFuture = this.ask(rpcInvocation, futureTimeout).thenApply(resultValue -> PekkoInvocationHandler.deserializeValueIfNeeded(resultValue, method, this.flinkClassLoader));
            CompletableFuture completableFuture = new CompletableFuture();
            ((CompletableFuture)resultFuture).whenComplete((resultValue, failure) -> {
                if (failure != null) {
                    completableFuture.completeExceptionally(PekkoInvocationHandler.resolveTimeoutException(ExceptionUtils.stripCompletionException((Throwable)failure), callStackCapture, this.address, rpcInvocation));
                } else {
                    completableFuture.complete(resultValue);
                }
            });
            if (Objects.equals(returnType, CompletableFuture.class)) {
                result = completableFuture;
            } else {
                try {
                    result = completableFuture.get(futureTimeout.toMillis(), TimeUnit.MILLISECONDS);
                }
                catch (ExecutionException ee) {
                    throw new RpcException("Failure while obtaining synchronous RPC result.", ExceptionUtils.stripExecutionException((Throwable)ee));
                }
            }
        }
        return result;
    }

    private RpcInvocation createRpcInvocationMessage(String declaringClassName, String methodName, boolean isLocalRpcInvocation, Class<?>[] parameterTypes, Object[] args) throws IOException {
        Object rpcInvocation = this.isLocal && (!this.forceRpcInvocationSerialization || isLocalRpcInvocation) ? new LocalRpcInvocation(declaringClassName, methodName, (Class[])parameterTypes, args) : new RemoteRpcInvocation(declaringClassName, methodName, (Class[])parameterTypes, args);
        return rpcInvocation;
    }

    protected void tell(Object message) {
        this.rpcEndpoint.tell(message, ActorRef.noSender());
    }

    protected CompletableFuture<?> ask(Object message, Duration timeout) {
        CompletableFuture response = ScalaFutureUtils.toJava(Patterns.ask(this.rpcEndpoint, message, timeout.toMillis()));
        return ClassLoadingUtils.guardCompletionWithContextClassLoader(response, (ClassLoader)this.flinkClassLoader);
    }

    public String getAddress() {
        return this.address;
    }

    public String getHostname() {
        return this.hostname;
    }

    public CompletableFuture<Void> getTerminationFuture() {
        return this.terminationFuture;
    }

    private static Object deserializeValueIfNeeded(Object o, Method method, ClassLoader flinkClassLoader) {
        if (o instanceof RpcSerializedValue) {
            try {
                return ((RpcSerializedValue)o).deserializeValue(flinkClassLoader);
            }
            catch (IOException | ClassNotFoundException e) {
                throw new CompletionException((Throwable)new RpcException("Could not deserialize the serialized payload of RPC method : " + method.getName(), (Throwable)e));
            }
        }
        return o;
    }

    static Throwable resolveTimeoutException(Throwable exception, @Nullable Throwable callStackCapture, String recipient, RpcInvocation rpcInvocation) {
        if (!(exception instanceof AskTimeoutException)) {
            return exception;
        }
        Object newException = PekkoRpcServiceUtils.isRecipientTerminatedException(exception) ? new RecipientUnreachableException("unknown", recipient, rpcInvocation.toString()) : new TimeoutException(String.format("Invocation of [%s] at recipient [%s] timed out. This is usually caused by: 1) Pekko failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase %s.", rpcInvocation, recipient, RpcOptions.ASK_TIMEOUT_DURATION.key()));
        newException.initCause(exception);
        if (callStackCapture != null) {
            StackTraceElement[] stackTrace = callStackCapture.getStackTrace();
            newException.setStackTrace(Arrays.copyOfRange(stackTrace, 3, stackTrace.length));
        }
        return newException;
    }
}

