/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.FlinkCompletableFutureAssert;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.resourcemanager.DefaultJobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdActions;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

class DefaultJobLeaderIdServiceTest {
    DefaultJobLeaderIdServiceTest() {
    }

    @Test
    @Timeout(value=10000L, unit=TimeUnit.MILLISECONDS)
    void testAddingJob() throws Exception {
        JobID jobId = new JobID();
        String address = "foobar";
        JobMasterId leaderId = JobMasterId.generate();
        TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
        SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService(null, null);
        highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService);
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor)Mockito.mock(ScheduledExecutor.class);
        Time timeout = Time.milliseconds((long)5000L);
        JobLeaderIdActions jobLeaderIdActions = (JobLeaderIdActions)Mockito.mock(JobLeaderIdActions.class);
        DefaultJobLeaderIdService jobLeaderIdService = new DefaultJobLeaderIdService((HighAvailabilityServices)highAvailabilityServices, scheduledExecutor, timeout);
        jobLeaderIdService.start(jobLeaderIdActions);
        jobLeaderIdService.addJob(jobId);
        CompletableFuture leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
        leaderRetrievalService.notifyListener("foobar", leaderId.toUUID());
        Assertions.assertThat((CompletableFuture)leaderIdFuture).isCompletedWithValue((Object)leaderId);
        Assertions.assertThat((boolean)jobLeaderIdService.containsJob(jobId)).isTrue();
    }

    @Test
    @Timeout(value=10000L, unit=TimeUnit.MILLISECONDS)
    void testRemovingJob() throws Exception {
        JobID jobId = new JobID();
        TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
        SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService(null, null);
        highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService);
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor)Mockito.mock(ScheduledExecutor.class);
        Time timeout = Time.milliseconds((long)5000L);
        JobLeaderIdActions jobLeaderIdActions = (JobLeaderIdActions)Mockito.mock(JobLeaderIdActions.class);
        DefaultJobLeaderIdService jobLeaderIdService = new DefaultJobLeaderIdService((HighAvailabilityServices)highAvailabilityServices, scheduledExecutor, timeout);
        jobLeaderIdService.start(jobLeaderIdActions);
        jobLeaderIdService.addJob(jobId);
        CompletableFuture leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
        jobLeaderIdService.removeJob(jobId);
        Assertions.assertThat((boolean)jobLeaderIdService.containsJob(jobId)).isFalse();
        ((FlinkCompletableFutureAssert)FlinkAssertions.assertThatFuture((CompletableFuture)leaderIdFuture).withFailMessage("The leader id future should be completed exceptionally.", new Object[0])).eventuallyFails().withThrowableOfType(ExecutionException.class);
    }

    @Test
    void testInitialJobTimeout() throws Exception {
        JobID jobId = new JobID();
        TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
        SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService(null, null);
        highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService);
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor)Mockito.mock(ScheduledExecutor.class);
        Time timeout = Time.milliseconds((long)5000L);
        JobLeaderIdActions jobLeaderIdActions = (JobLeaderIdActions)Mockito.mock(JobLeaderIdActions.class);
        DefaultJobLeaderIdService jobLeaderIdService = new DefaultJobLeaderIdService((HighAvailabilityServices)highAvailabilityServices, scheduledExecutor, timeout);
        jobLeaderIdService.start(jobLeaderIdActions);
        jobLeaderIdService.addJob(jobId);
        Assertions.assertThat((boolean)jobLeaderIdService.containsJob(jobId)).isTrue();
        ArgumentCaptor runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
        ((ScheduledExecutor)Mockito.verify((Object)scheduledExecutor)).schedule((Runnable)runnableArgumentCaptor.capture(), ArgumentMatchers.anyLong(), (TimeUnit)((Object)ArgumentMatchers.any(TimeUnit.class)));
        Runnable timeoutRunnable = (Runnable)runnableArgumentCaptor.getValue();
        timeoutRunnable.run();
        ArgumentCaptor timeoutIdArgumentCaptor = ArgumentCaptor.forClass(UUID.class);
        ((JobLeaderIdActions)Mockito.verify((Object)jobLeaderIdActions, (VerificationMode)Mockito.times((int)1))).notifyJobTimeout((JobID)ArgumentMatchers.eq((Object)jobId), (UUID)timeoutIdArgumentCaptor.capture());
        Assertions.assertThat((boolean)jobLeaderIdService.isValidTimeout(jobId, (UUID)timeoutIdArgumentCaptor.getValue())).isTrue();
    }

    @Test
    @Timeout(value=10000L, unit=TimeUnit.MILLISECONDS)
    void jobTimeoutAfterLostLeadership() throws Exception {
        JobID jobId = new JobID();
        String address = "foobar";
        JobMasterId leaderId = JobMasterId.generate();
        TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
        SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService(null, null);
        highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService);
        ScheduledFuture timeout1 = (ScheduledFuture)Mockito.mock(ScheduledFuture.class);
        ScheduledFuture timeout2 = (ScheduledFuture)Mockito.mock(ScheduledFuture.class);
        ArrayDeque<ScheduledFuture> timeoutQueue = new ArrayDeque<ScheduledFuture>(Arrays.asList(timeout1, timeout2));
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor)Mockito.mock(ScheduledExecutor.class);
        AtomicReference lastRunnable = new AtomicReference();
        ((ScheduledExecutor)Mockito.doAnswer(invocation -> {
            lastRunnable.set((Runnable)invocation.getArguments()[0]);
            return timeoutQueue.poll();
        }).when((Object)scheduledExecutor)).schedule((Runnable)ArgumentMatchers.any(Runnable.class), ArgumentMatchers.anyLong(), (TimeUnit)((Object)ArgumentMatchers.any(TimeUnit.class)));
        Time timeout = Time.milliseconds((long)5000L);
        JobLeaderIdActions jobLeaderIdActions = (JobLeaderIdActions)Mockito.mock(JobLeaderIdActions.class);
        AtomicReference lastTimeoutId = new AtomicReference();
        ((JobLeaderIdActions)Mockito.doAnswer(invocation -> {
            lastTimeoutId.set((UUID)invocation.getArguments()[1]);
            return null;
        }).when((Object)jobLeaderIdActions)).notifyJobTimeout((JobID)ArgumentMatchers.eq((Object)jobId), (UUID)ArgumentMatchers.any(UUID.class));
        DefaultJobLeaderIdService jobLeaderIdService = new DefaultJobLeaderIdService((HighAvailabilityServices)highAvailabilityServices, scheduledExecutor, timeout);
        jobLeaderIdService.start(jobLeaderIdActions);
        jobLeaderIdService.addJob(jobId);
        CompletableFuture leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
        leaderRetrievalService.notifyListener("foobar", leaderId.toUUID());
        Assertions.assertThat((CompletableFuture)leaderIdFuture).isCompletedWithValue((Object)leaderId);
        Assertions.assertThat((boolean)jobLeaderIdService.containsJob(jobId)).isTrue();
        ((ScheduledFuture)Mockito.verify((Object)timeout1, (VerificationMode)Mockito.times((int)1))).cancel(ArgumentMatchers.anyBoolean());
        ((ScheduledExecutor)Mockito.verify((Object)scheduledExecutor, (VerificationMode)Mockito.times((int)1))).schedule((Runnable)ArgumentMatchers.any(Runnable.class), ArgumentMatchers.anyLong(), (TimeUnit)((Object)ArgumentMatchers.any(TimeUnit.class)));
        Runnable runnable = (Runnable)lastRunnable.get();
        Assertions.assertThat((Object)runnable).isNotNull();
        runnable.run();
        ((JobLeaderIdActions)Mockito.verify((Object)jobLeaderIdActions, (VerificationMode)Mockito.times((int)1))).notifyJobTimeout((JobID)ArgumentMatchers.eq((Object)jobId), (UUID)ArgumentMatchers.any(UUID.class));
        Assertions.assertThat((boolean)jobLeaderIdService.isValidTimeout(jobId, (UUID)lastTimeoutId.get())).isFalse();
        leaderRetrievalService.notifyListener("", null);
        ((ScheduledExecutor)Mockito.verify((Object)scheduledExecutor, (VerificationMode)Mockito.times((int)2))).schedule((Runnable)ArgumentMatchers.any(Runnable.class), ArgumentMatchers.anyLong(), (TimeUnit)((Object)ArgumentMatchers.any(TimeUnit.class)));
        runnable = (Runnable)lastRunnable.get();
        Assertions.assertThat((Object)runnable).isNotNull();
        runnable.run();
        ((JobLeaderIdActions)Mockito.verify((Object)jobLeaderIdActions, (VerificationMode)Mockito.times((int)2))).notifyJobTimeout((JobID)ArgumentMatchers.eq((Object)jobId), (UUID)ArgumentMatchers.any(UUID.class));
        Assertions.assertThat((boolean)jobLeaderIdService.isValidTimeout(jobId, (UUID)lastTimeoutId.get())).isTrue();
    }

    @Test
    @Timeout(value=10000L, unit=TimeUnit.MILLISECONDS)
    void testLeaderFutureWaitsForValidLeader() throws Exception {
        JobID jobId = new JobID();
        TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
        SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService(null, null);
        highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService);
        DefaultJobLeaderIdService jobLeaderIdService = new DefaultJobLeaderIdService((HighAvailabilityServices)highAvailabilityServices, (ScheduledExecutor)new ManuallyTriggeredScheduledExecutor(), Time.milliseconds((long)5000L));
        jobLeaderIdService.start((JobLeaderIdActions)new NoOpJobLeaderIdActions());
        jobLeaderIdService.addJob(jobId);
        leaderRetrievalService.notifyListener("foo", UUID.randomUUID());
        leaderRetrievalService.notifyListener(null, null);
        CompletableFuture leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
        Assertions.assertThat((CompletableFuture)leaderIdFuture).isNotDone();
        UUID newLeaderId = UUID.randomUUID();
        leaderRetrievalService.notifyListener("foo", newLeaderId);
        FlinkAssertions.assertThatFuture((CompletableFuture)leaderIdFuture).eventuallySucceeds().isEqualTo((Object)JobMasterId.fromUuidOrNull((UUID)newLeaderId));
    }

    @Test
    void testIsStarted() throws Exception {
        JobID jobId = new JobID();
        TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
        SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService(null, null);
        highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService);
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor)Mockito.mock(ScheduledExecutor.class);
        Time timeout = Time.milliseconds((long)5000L);
        JobLeaderIdActions jobLeaderIdActions = (JobLeaderIdActions)Mockito.mock(JobLeaderIdActions.class);
        DefaultJobLeaderIdService jobLeaderIdService = new DefaultJobLeaderIdService((HighAvailabilityServices)highAvailabilityServices, scheduledExecutor, timeout);
        Assertions.assertThat((boolean)jobLeaderIdService.isStarted()).isFalse();
        jobLeaderIdService.start(jobLeaderIdActions);
        Assertions.assertThat((boolean)jobLeaderIdService.isStarted()).isTrue();
        jobLeaderIdService.stop();
        Assertions.assertThat((boolean)jobLeaderIdService.isStarted()).isFalse();
    }

    private static class NoOpJobLeaderIdActions
    implements JobLeaderIdActions {
        private NoOpJobLeaderIdActions() {
        }

        public void jobLeaderLostLeadership(JobID jobId, JobMasterId oldJobMasterId) {
        }

        public void notifyJobTimeout(JobID jobId, UUID timeoutId) {
        }

        public void handleError(Throwable error) {
        }
    }
}

