/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationRejection;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcServiceExtension;
import org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService;
import org.apache.flink.runtime.taskexecutor.JobLeaderListener;
import org.apache.flink.runtime.taskexecutor.JobLeaderService;
import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.util.FlinkException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class DefaultJobLeaderServiceTest {
    private final TestingRpcServiceExtension rpcServiceExtension = new TestingRpcServiceExtension();
    @RegisterExtension
    private final EachCallbackWrapper<TestingRpcServiceExtension> eachWrapper = new EachCallbackWrapper((CustomExtension)this.rpcServiceExtension);

    DefaultJobLeaderServiceTest() {
    }

    @Test
    void handlesConcurrentJobAdditionsAndLeaderChanges() throws Exception {
        DefaultJobLeaderService jobLeaderService = new DefaultJobLeaderService((UnresolvedTaskManagerLocation)new LocalUnresolvedTaskManagerLocation(), RetryingRegistrationConfiguration.defaultConfiguration());
        TestingJobLeaderListener jobLeaderListener = new TestingJobLeaderListener();
        int numberOperations = 20;
        ArrayBlockingQueue instantiatedLeaderRetrievalServices = new ArrayBlockingQueue(20);
        TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServicesBuilder().setJobMasterLeaderRetrieverFunction(leaderForJobId -> {
            SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService();
            instantiatedLeaderRetrievalServices.offer(leaderRetrievalService);
            return leaderRetrievalService;
        }).build();
        jobLeaderService.start("foobar", (RpcService)this.rpcServiceExtension.getTestingRpcService(), (HighAvailabilityServices)haServices, (JobLeaderListener)jobLeaderListener);
        CheckedThread addJobAction = new CheckedThread((JobLeaderService)jobLeaderService){
            final /* synthetic */ JobLeaderService val$jobLeaderService;
            {
                this.val$jobLeaderService = jobLeaderService;
            }

            public void go() throws Exception {
                for (int i = 0; i < 20; ++i) {
                    JobID jobId = JobID.generate();
                    this.val$jobLeaderService.addJob(jobId, "foobar");
                    Thread.yield();
                    this.val$jobLeaderService.removeJob(jobId);
                }
            }
        };
        addJobAction.start();
        for (int i = 0; i < 20; ++i) {
            SettableLeaderRetrievalService leaderRetrievalService = (SettableLeaderRetrievalService)instantiatedLeaderRetrievalServices.take();
            leaderRetrievalService.notifyListener("foobar", UUID.randomUUID());
        }
        addJobAction.sync();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void doesNotReconnectAfterTargetLostLeadership() throws Exception {
        JobID jobId = new JobID();
        SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService();
        TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServicesBuilder().setJobMasterLeaderRetrieverFunction(ignored -> leaderRetrievalService).build();
        TestingJobMasterGateway jobMasterGateway = this.registerJobMaster();
        OneShotLatch jobManagerGainedLeadership = new OneShotLatch();
        TestingJobLeaderListener testingJobLeaderListener = new TestingJobLeaderListener(ignored -> jobManagerGainedLeadership.trigger());
        JobLeaderService jobLeaderService = this.createAndStartJobLeaderService(haServices, testingJobLeaderListener);
        try {
            jobLeaderService.addJob(jobId, jobMasterGateway.getAddress());
            leaderRetrievalService.notifyListener(jobMasterGateway.getAddress(), UUID.randomUUID());
            jobManagerGainedLeadership.await();
            leaderRetrievalService.notifyListener(null, null);
            testingJobLeaderListener.waitUntilJobManagerLostLeadership();
            jobLeaderService.reconnect(jobId);
        }
        finally {
            jobLeaderService.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void canReconnectToOldLeaderWithSameLeaderAddress() throws Exception {
        JobID jobId = new JobID();
        SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService();
        TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServicesBuilder().setJobMasterLeaderRetrieverFunction(ignored -> leaderRetrievalService).build();
        TestingJobMasterGateway jobMasterGateway = this.registerJobMaster();
        ArrayBlockingQueue leadershipQueue = new ArrayBlockingQueue(1);
        TestingJobLeaderListener testingJobLeaderListener = new TestingJobLeaderListener(leadershipQueue::offer);
        JobLeaderService jobLeaderService = this.createAndStartJobLeaderService(haServices, testingJobLeaderListener);
        try {
            jobLeaderService.addJob(jobId, jobMasterGateway.getAddress());
            UUID leaderSessionId = UUID.randomUUID();
            leaderRetrievalService.notifyListener(jobMasterGateway.getAddress(), leaderSessionId);
            Assertions.assertThat((Comparable)((Comparable)leadershipQueue.take())).isEqualTo((Object)jobId);
            leaderRetrievalService.notifyListener(null, null);
            testingJobLeaderListener.waitUntilJobManagerLostLeadership();
            leaderRetrievalService.notifyListener(jobMasterGateway.getAddress(), leaderSessionId);
            Assertions.assertThat((Comparable)((Comparable)leadershipQueue.take())).isEqualTo((Object)jobId);
        }
        finally {
            jobLeaderService.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void removeJobWithFailingLeaderRetrievalServiceStopWillStopListeningToLeaderNotifications() throws Exception {
        FailingSettableLeaderRetrievalService leaderRetrievalService = new FailingSettableLeaderRetrievalService();
        TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServicesBuilder().setJobMasterLeaderRetrieverFunction(ignored -> leaderRetrievalService).build();
        JobID jobId = new JobID();
        CompletableFuture newLeaderFuture = new CompletableFuture();
        TestingJobLeaderListener testingJobLeaderListener = new TestingJobLeaderListener(newLeaderFuture::complete);
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().build();
        this.rpcServiceExtension.getTestingRpcService().registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        JobLeaderService jobLeaderService = this.createAndStartJobLeaderService(haServices, testingJobLeaderListener);
        try {
            jobLeaderService.addJob(jobId, "foobar");
            jobLeaderService.removeJob(jobId);
            leaderRetrievalService.notifyListener(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
            FlinkAssertions.assertThatFuture(newLeaderFuture).willNotCompleteWithin(Duration.ofMillis(10L));
        }
        finally {
            jobLeaderService.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void rejectedJobManagerRegistrationCallsJobLeaderListener() throws Exception {
        SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService();
        TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServicesBuilder().setJobMasterLeaderRetrieverFunction(ignored -> leaderRetrievalService).build();
        JobID jobId = new JobID();
        CompletableFuture rejectedRegistrationFuture = new CompletableFuture();
        TestingJobLeaderListener testingJobLeaderListener = new TestingJobLeaderListener(ignored -> {}, rejectedRegistrationFuture::complete);
        JobLeaderService jobLeaderService = this.createAndStartJobLeaderService(haServices, testingJobLeaderListener);
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setRegisterTaskManagerFunction((jobID, taskManagerRegistrationInformation) -> CompletableFuture.completedFuture(new JMTMRegistrationRejection("foobar"))).build();
        this.rpcServiceExtension.getTestingRpcService().registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        try {
            jobLeaderService.addJob(jobId, "foobar");
            leaderRetrievalService.notifyListener(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
            FlinkAssertions.assertThatFuture(rejectedRegistrationFuture).eventuallySucceeds().isEqualTo((Object)jobId);
        }
        finally {
            jobLeaderService.stop();
        }
    }

    private JobLeaderService createAndStartJobLeaderService(HighAvailabilityServices haServices, JobLeaderListener testingJobLeaderListener) {
        DefaultJobLeaderService jobLeaderService = new DefaultJobLeaderService((UnresolvedTaskManagerLocation)new LocalUnresolvedTaskManagerLocation(), RetryingRegistrationConfiguration.defaultConfiguration());
        jobLeaderService.start("foobar", (RpcService)this.rpcServiceExtension.getTestingRpcService(), haServices, testingJobLeaderListener);
        return jobLeaderService;
    }

    private TestingJobMasterGateway registerJobMaster() {
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().build();
        this.rpcServiceExtension.getTestingRpcService().registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        return jobMasterGateway;
    }

    private static final class TestingJobLeaderListener
    implements JobLeaderListener {
        private final CountDownLatch jobManagerLostLeadership = new CountDownLatch(1);
        private final Consumer<JobID> jobManagerGainedLeadership;
        private final Consumer<JobID> jobManagerRejectedRegistrationConsumer;

        private TestingJobLeaderListener() {
            this((JobID ignored) -> {});
        }

        private TestingJobLeaderListener(Consumer<JobID> jobManagerGainedLeadership) {
            this(jobManagerGainedLeadership, (JobID ignored) -> {});
        }

        private TestingJobLeaderListener(Consumer<JobID> jobManagerGainedLeadership, Consumer<JobID> jobManagerRejectedRegistrationConsumer) {
            this.jobManagerGainedLeadership = jobManagerGainedLeadership;
            this.jobManagerRejectedRegistrationConsumer = jobManagerRejectedRegistrationConsumer;
        }

        public void jobManagerGainedLeadership(JobID jobId, JobMasterGateway jobManagerGateway, JMTMRegistrationSuccess registrationMessage) {
            this.jobManagerGainedLeadership.accept(jobId);
        }

        public void jobManagerLostLeadership(JobID jobId, JobMasterId jobMasterId) {
            this.jobManagerLostLeadership.countDown();
        }

        public void handleError(Throwable throwable) {
        }

        public void jobManagerRejectedRegistration(JobID jobId, String targetAddress, JMTMRegistrationRejection rejection) {
            this.jobManagerRejectedRegistrationConsumer.accept(jobId);
        }

        private void waitUntilJobManagerLostLeadership() throws InterruptedException {
            this.jobManagerLostLeadership.await();
        }
    }

    private static final class FailingSettableLeaderRetrievalService
    extends SettableLeaderRetrievalService {
        private FailingSettableLeaderRetrievalService() {
        }

        @Override
        public void stop() throws FlinkException {
            throw new FlinkException("Test exception");
        }
    }
}

