/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.TestingResourceManagerService;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ResourceManagerJobMasterTest
extends TestLogger {
    private static final Time TIMEOUT = Time.seconds((long)10L);
    private TestingRpcService rpcService;
    private JobID jobId;
    private ResourceID jobMasterResourceId;
    private TestingJobMasterGateway jobMasterGateway;
    private SettableLeaderRetrievalService jobMasterLeaderRetrievalService;
    private TestingResourceManagerService resourceManagerService;
    private ResourceManagerGateway resourceManagerGateway;

    @Before
    public void setup() throws Exception {
        this.rpcService = new TestingRpcService();
        this.jobId = new JobID();
        this.jobMasterResourceId = ResourceID.generate();
        this.createAndRegisterJobMasterGateway();
        this.createAndStartResourceManagerService();
    }

    private void createAndRegisterJobMasterGateway() {
        this.jobMasterGateway = new TestingJobMasterGatewayBuilder().build();
        this.rpcService.registerGateway(this.jobMasterGateway.getAddress(), (RpcGateway)this.jobMasterGateway);
        this.jobMasterLeaderRetrievalService = new SettableLeaderRetrievalService(this.jobMasterGateway.getAddress(), this.jobMasterGateway.getFencingToken().toUUID());
    }

    private void createAndStartResourceManagerService() throws Exception {
        TestingLeaderElection leaderElection = new TestingLeaderElection();
        this.resourceManagerService = TestingResourceManagerService.newBuilder().setRpcService(this.rpcService).setJmLeaderRetrieverFunction(requestedJobId -> {
            if (requestedJobId.equals((Object)this.jobId)) {
                return this.jobMasterLeaderRetrievalService;
            }
            throw new FlinkRuntimeException(String.format("Unknown job id %s", this.jobId));
        }).setRmLeaderElection(leaderElection).build();
        this.resourceManagerService.start();
        this.resourceManagerService.isLeader(UUID.randomUUID()).join();
        this.resourceManagerGateway = this.resourceManagerService.getResourceManagerGateway().orElseThrow(() -> new AssertionError((Object)"RM not available after confirming leadership."));
    }

    @After
    public void teardown() throws Exception {
        if (this.resourceManagerService != null) {
            this.resourceManagerService.rethrowFatalErrorIfAny();
            this.resourceManagerService.cleanUp();
        }
        if (this.rpcService != null) {
            RpcUtils.terminateRpcService((RpcService[])new RpcService[]{this.rpcService});
        }
    }

    @Test
    public void testRegisterJobMaster() throws Exception {
        CompletableFuture successfulFuture = this.resourceManagerGateway.registerJobMaster(this.jobMasterGateway.getFencingToken(), this.jobMasterResourceId, this.jobMasterGateway.getAddress(), this.jobId, TIMEOUT);
        RegistrationResponse response = (RegistrationResponse)successfulFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
        Assert.assertTrue((boolean)(response instanceof JobMasterRegistrationSuccess));
    }

    @Test
    public void testRegisterJobMasterWithUnmatchedLeaderSessionId1() throws Exception {
        ResourceManagerGateway wronglyFencedGateway = this.rpcService.connect(this.resourceManagerGateway.getAddress(), ResourceManagerId.generate(), ResourceManagerGateway.class).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
        CompletableFuture unMatchedLeaderFuture = wronglyFencedGateway.registerJobMaster(this.jobMasterGateway.getFencingToken(), this.jobMasterResourceId, this.jobMasterGateway.getAddress(), this.jobId, TIMEOUT);
        try {
            unMatchedLeaderFuture.get(5L, TimeUnit.SECONDS);
            Assert.fail((String)"Should fail because we are using the wrong fencing token.");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(ExceptionUtils.stripExecutionException((Throwable)e) instanceof FencingTokenException));
        }
    }

    @Test
    public void testRegisterJobMasterWithUnmatchedLeaderSessionId2() throws Exception {
        JobMasterId differentJobMasterId = JobMasterId.generate();
        CompletableFuture unMatchedLeaderFuture = this.resourceManagerGateway.registerJobMaster(differentJobMasterId, this.jobMasterResourceId, this.jobMasterGateway.getAddress(), this.jobId, TIMEOUT);
        Assert.assertTrue((boolean)(unMatchedLeaderFuture.get() instanceof RegistrationResponse.Failure));
    }

    @Test
    public void testRegisterJobMasterFromInvalidAddress() throws Exception {
        String invalidAddress = "/jobMasterAddress2";
        CompletableFuture invalidAddressFuture = this.resourceManagerGateway.registerJobMaster(new JobMasterId(HighAvailabilityServices.DEFAULT_LEADER_ID), this.jobMasterResourceId, invalidAddress, this.jobId, TIMEOUT);
        Assert.assertTrue((boolean)(invalidAddressFuture.get(5L, TimeUnit.SECONDS) instanceof RegistrationResponse.Failure));
    }

    @Test
    public void testRegisterJobMasterWithFailureLeaderListener() throws Exception {
        JobID unknownJobIDToHAServices = new JobID();
        CompletableFuture registrationFuture = this.resourceManagerGateway.registerJobMaster(this.jobMasterGateway.getFencingToken(), this.jobMasterResourceId, this.jobMasterGateway.getAddress(), unknownJobIDToHAServices, TIMEOUT);
        try {
            registrationFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.fail((String)"Expected to fail with a ResourceManagerException.");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(ExceptionUtils.stripExecutionException((Throwable)e) instanceof ResourceManagerException));
        }
        this.resourceManagerService.ignoreFatalErrors();
    }
}

