/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorRef;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler;
import org.apache.flink.runtime.rpc.messages.FencedMessage;
import org.apache.flink.runtime.rpc.messages.LocalFencedMessage;
import org.apache.flink.runtime.rpc.messages.RemoteFencedMessage;
import org.apache.flink.util.Preconditions;

public class FencedAkkaInvocationHandler<F extends Serializable>
extends AkkaInvocationHandler
implements MainThreadExecutable,
FencedRpcGateway<F> {
    private final Supplier<F> fencingTokenSupplier;

    public FencedAkkaInvocationHandler(String address, String hostname, ActorRef rpcEndpoint, Duration timeout, long maximumFramesize, boolean forceRpcInvocationSerialization, @Nullable CompletableFuture<Void> terminationFuture, Supplier<F> fencingTokenSupplier, boolean captureAskCallStacks, ClassLoader flinkClassLoader) {
        super(address, hostname, rpcEndpoint, timeout, maximumFramesize, forceRpcInvocationSerialization, terminationFuture, captureAskCallStacks, flinkClassLoader);
        this.fencingTokenSupplier = (Supplier)Preconditions.checkNotNull(fencingTokenSupplier);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Class<?> declaringClass = method.getDeclaringClass();
        if (declaringClass.equals(MainThreadExecutable.class) || declaringClass.equals(FencedRpcGateway.class)) {
            return method.invoke((Object)this, args);
        }
        return super.invoke(proxy, method, args);
    }

    @Override
    public void tell(Object message) {
        super.tell(this.fenceMessage(message));
    }

    @Override
    public CompletableFuture<?> ask(Object message, Duration timeout) {
        return super.ask(this.fenceMessage(message), timeout);
    }

    public F getFencingToken() {
        return (F)((Serializable)this.fencingTokenSupplier.get());
    }

    private <P> FencedMessage<F, P> fenceMessage(P message) {
        if (this.isLocal) {
            return new LocalFencedMessage((Serializable)this.fencingTokenSupplier.get(), message);
        }
        if (message instanceof Serializable) {
            RemoteFencedMessage result2 = new RemoteFencedMessage((Serializable)this.fencingTokenSupplier.get(), (Serializable)message);
            return result2;
        }
        throw new RuntimeException("Trying to send a non-serializable message " + message + " to a remote RpcEndpoint. Please make sure that the message implements java.io.Serializable.");
    }
}

