/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc;

import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcServer;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;

public class TestingRpcService
implements RpcService {
    private static final RpcSystem RPC_SYSTEM_SINGLETON = RpcSystem.load();
    private static final Function<RpcGateway, CompletableFuture<RpcGateway>> DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION = CompletableFuture::completedFuture;
    private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
    private volatile Function<RpcGateway, CompletableFuture<RpcGateway>> rpcGatewayFutureFunction = DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION;
    private final RpcService backingRpcService;

    public TestingRpcService() {
        try {
            this.backingRpcService = RPC_SYSTEM_SINGLETON.localServiceBuilder(new Configuration()).createAndStart();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        this.registeredConnections = new ConcurrentHashMap();
    }

    public CompletableFuture<Void> closeAsync() {
        CompletableFuture terminationFuture = this.backingRpcService.closeAsync();
        terminationFuture.whenComplete((ignored, throwable) -> this.registeredConnections.clear());
        return terminationFuture;
    }

    public void registerGateway(String address, RpcGateway gateway) {
        Preconditions.checkNotNull((Object)address);
        Preconditions.checkNotNull((Object)gateway);
        if (this.registeredConnections.putIfAbsent(address, gateway) != null) {
            throw new IllegalStateException("a gateway is already registered under " + address);
        }
    }

    public void unregisterGateway(String address) {
        Preconditions.checkNotNull((Object)address);
        if (this.registeredConnections.remove(address) == null) {
            throw new IllegalStateException("no gateway is registered under " + address);
        }
    }

    private <C extends RpcGateway> CompletableFuture<C> getRpcGatewayFuture(C gateway) {
        return this.rpcGatewayFutureFunction.apply(gateway);
    }

    public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) {
        RpcGateway gateway = this.registeredConnections.get(address);
        if (gateway != null) {
            if (clazz.isAssignableFrom(gateway.getClass())) {
                RpcGateway typedGateway = gateway;
                return this.getRpcGatewayFuture(typedGateway);
            }
            return FutureUtils.completedExceptionally((Throwable)new Exception("Gateway registered under " + address + " is not of type " + String.valueOf(clazz)));
        }
        return this.backingRpcService.connect(address, clazz);
    }

    public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(String address, F fencingToken, Class<C> clazz) {
        RpcGateway gateway = this.registeredConnections.get(address);
        if (gateway != null) {
            if (clazz.isAssignableFrom(gateway.getClass())) {
                FencedRpcGateway typedGateway = (FencedRpcGateway)gateway;
                return this.getRpcGatewayFuture(typedGateway);
            }
            return FutureUtils.completedExceptionally((Throwable)new Exception("Gateway registered under " + address + " is not of type " + String.valueOf(clazz)));
        }
        return this.backingRpcService.connect(address, fencingToken, clazz);
    }

    public void clearGateways() {
        this.registeredConnections.clear();
    }

    public void resetRpcGatewayFutureFunction() {
        this.rpcGatewayFutureFunction = DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION;
    }

    public void setRpcGatewayFutureFunction(Function<RpcGateway, CompletableFuture<RpcGateway>> rpcGatewayFutureFunction) {
        this.rpcGatewayFutureFunction = rpcGatewayFutureFunction;
    }

    public String getAddress() {
        return this.backingRpcService.getAddress();
    }

    public int getPort() {
        return this.backingRpcService.getPort();
    }

    public <C extends RpcGateway> C getSelfGateway(Class<C> selfGatewayType, RpcServer rpcServer) {
        return (C)this.backingRpcService.getSelfGateway(selfGatewayType, rpcServer);
    }

    public <C extends RpcEndpoint> RpcServer startServer(C rpcEndpoint, Map<String, String> loggingContext) {
        return this.backingRpcService.startServer(rpcEndpoint, Collections.emptyMap());
    }

    public void stopServer(RpcServer selfGateway) {
        this.backingRpcService.stopServer(selfGateway);
    }

    public ScheduledExecutor getScheduledExecutor() {
        return this.backingRpcService.getScheduledExecutor();
    }
}

