/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultResourceTracker;
import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer;
import org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedTaskManagerTracker;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceTracker;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotState;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotStatusSyncer;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerInfo;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerTracker;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.QuadConsumer;
import org.assertj.core.api.AbstractComparableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class DefaultSlotStatusSyncerTest {
    private static final Time TASK_MANAGER_REQUEST_TIMEOUT = Time.seconds((long)10L);
    private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION = new TaskExecutorConnection(ResourceID.generate(), (TaskExecutorGateway)new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    DefaultSlotStatusSyncerTest() {
    }

    @Test
    void testSlotAllocationSucceeds() throws Exception {
        DefaultSlotStatusSyncerTest.testSlotAllocation((QuadConsumer<SlotStatusSyncer, TaskManagerTracker, InstanceID, AllocationID>)((QuadConsumer)(ignored0, ignored1, ignored2, ignored3) -> {}));
    }

    @Test
    void testAllocationUpdatesIgnoredIfSlotFreed() throws Exception {
        DefaultSlotStatusSyncerTest.testSlotAllocation((QuadConsumer<SlotStatusSyncer, TaskManagerTracker, InstanceID, AllocationID>)((QuadConsumer)(slotStatusSyncer, taskManagerTracker, ignored, allocationId) -> {
            slotStatusSyncer.freeSlot(allocationId);
            Assertions.assertThat((Optional)taskManagerTracker.getAllocatedOrPendingSlot(allocationId)).isEmpty();
        }));
    }

    @Test
    void testAllocateSlotFailsWithException() {
        FineGrainedTaskManagerTracker taskManagerTracker = new FineGrainedTaskManagerTracker();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(ignored -> FutureUtils.completedExceptionally((Throwable)new TimeoutException("timeout"))).createTestingTaskExecutorGateway();
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(ResourceID.generate(), (TaskExecutorGateway)taskExecutorGateway);
        taskManagerTracker.addTaskManager(taskExecutorConnection, ResourceProfile.ANY, ResourceProfile.ANY);
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        JobID jobId = new JobID();
        DefaultSlotStatusSyncer slotStatusSyncer = new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT);
        slotStatusSyncer.initialize((TaskManagerTracker)taskManagerTracker, (ResourceTracker)resourceTracker, ResourceManagerId.generate(), (Executor)EXECUTOR_RESOURCE.getExecutor());
        CompletableFuture allocatedFuture = slotStatusSyncer.allocateSlot(taskExecutorConnection.getInstanceID(), jobId, "address", ResourceProfile.ANY);
        Assertions.assertThatThrownBy(allocatedFuture::get).hasCauseInstanceOf(TimeoutException.class);
        Assertions.assertThat((Collection)resourceTracker.getAcquiredResources(jobId)).isEmpty();
        Assertions.assertThat((Optional)taskManagerTracker.getRegisteredTaskManager(taskExecutorConnection.getInstanceID())).hasValueSatisfying(taskManagerInfo -> Assertions.assertThat((Map)taskManagerInfo.getAllocatedSlots()).isEmpty());
    }

    @Test
    void testAllocationUpdatesIgnoredIfSlotRemoved() throws Exception {
        DefaultSlotStatusSyncerTest.testSlotAllocation((QuadConsumer<SlotStatusSyncer, TaskManagerTracker, InstanceID, AllocationID>)((QuadConsumer)(slotStatusSyncer, taskManagerTracker, instanceID, allocationId) -> {
            taskManagerTracker.removeTaskManager(instanceID);
            Assertions.assertThat((Optional)taskManagerTracker.getAllocatedOrPendingSlot(allocationId)).isEmpty();
        }));
    }

    @Test
    void testFreeSlot() {
        FineGrainedTaskManagerTracker taskManagerTracker = new FineGrainedTaskManagerTracker();
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        JobID jobId = new JobID();
        AllocationID allocationId = new AllocationID();
        DefaultSlotStatusSyncer slotStatusSyncer = new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT);
        slotStatusSyncer.initialize((TaskManagerTracker)taskManagerTracker, (ResourceTracker)resourceTracker, ResourceManagerId.generate(), (Executor)EXECUTOR_RESOURCE.getExecutor());
        taskManagerTracker.addTaskManager(TASK_EXECUTOR_CONNECTION, ResourceProfile.ANY, ResourceProfile.ANY);
        taskManagerTracker.notifySlotStatus(allocationId, jobId, TASK_EXECUTOR_CONNECTION.getInstanceID(), ResourceProfile.ANY, SlotState.ALLOCATED);
        resourceTracker.notifyAcquiredResource(jobId, ResourceProfile.ANY);
        slotStatusSyncer.freeSlot(new AllocationID());
        Assertions.assertThat((Collection)resourceTracker.getAcquiredResources(jobId)).containsExactly((Object[])new ResourceRequirement[]{ResourceRequirement.create((ResourceProfile)ResourceProfile.ANY, (int)1)});
        Assertions.assertThat((Optional)taskManagerTracker.getAllocatedOrPendingSlot(allocationId)).isPresent();
        slotStatusSyncer.freeSlot(allocationId);
        Assertions.assertThat((Collection)resourceTracker.getAcquiredResources(jobId)).isEmpty();
        Assertions.assertThat((Optional)taskManagerTracker.getRegisteredTaskManager(TASK_EXECUTOR_CONNECTION.getInstanceID())).hasValueSatisfying(taskManagerInfo -> Assertions.assertThat((Map)taskManagerInfo.getAllocatedSlots()).isEmpty());
    }

    @Test
    void testSlotStatusProcessing() {
        FineGrainedTaskManagerTracker taskManagerTracker = new FineGrainedTaskManagerTracker();
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        DefaultSlotStatusSyncer slotStatusSyncer = new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT);
        slotStatusSyncer.initialize((TaskManagerTracker)taskManagerTracker, (ResourceTracker)resourceTracker, ResourceManagerId.generate(), (Executor)EXECUTOR_RESOURCE.getExecutor());
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(ignored -> new CompletableFuture()).createTestingTaskExecutorGateway();
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(ResourceID.generate(), (TaskExecutorGateway)taskExecutorGateway);
        JobID jobId = new JobID();
        AllocationID allocationId1 = new AllocationID();
        AllocationID allocationId2 = new AllocationID();
        SlotID slotId1 = new SlotID(taskExecutorConnection.getResourceID(), 0);
        SlotID slotId2 = new SlotID(taskExecutorConnection.getResourceID(), 1);
        SlotID slotId3 = new SlotID(taskExecutorConnection.getResourceID(), 2);
        ResourceProfile totalResource = ResourceProfile.fromResources((double)5.0, (int)20);
        ResourceProfile resource = ResourceProfile.fromResources((double)1.0, (int)4);
        SlotReport slotReport1 = new SlotReport(Arrays.asList(new SlotStatus(slotId1, totalResource), new SlotStatus(slotId2, resource, jobId, allocationId1), new SlotStatus(slotId3, resource, jobId, allocationId2)));
        SlotReport slotReport2 = new SlotReport(Arrays.asList(new SlotStatus(slotId3, resource), new SlotStatus(slotId2, resource, jobId, allocationId1)));
        taskManagerTracker.addTaskManager(taskExecutorConnection, totalResource, totalResource);
        slotStatusSyncer.reportSlotStatus(taskExecutorConnection.getInstanceID(), slotReport1);
        Assertions.assertThat((Collection)resourceTracker.getAcquiredResources(jobId)).contains((Object[])new ResourceRequirement[]{ResourceRequirement.create((ResourceProfile)resource, (int)2)});
        Assertions.assertThat((Optional)taskManagerTracker.getRegisteredTaskManager(taskExecutorConnection.getInstanceID())).hasValueSatisfying(taskManagerInfo -> {
            ObjectAssert cfr_ignored_0 = (ObjectAssert)Assertions.assertThat((Object)taskManagerInfo.getAvailableResource()).isEqualTo((Object)ResourceProfile.fromResources((double)3.0, (int)12));
        });
        Assertions.assertThat((Optional)taskManagerTracker.getAllocatedOrPendingSlot(allocationId1)).isPresent();
        Assertions.assertThat((Optional)taskManagerTracker.getAllocatedOrPendingSlot(allocationId2)).isPresent();
        slotStatusSyncer.allocateSlot(taskExecutorConnection.getInstanceID(), jobId, "address", resource);
        Assertions.assertThat((Collection)resourceTracker.getAcquiredResources(jobId)).contains((Object[])new ResourceRequirement[]{ResourceRequirement.create((ResourceProfile)resource, (int)3)});
        Assertions.assertThat((Optional)taskManagerTracker.getRegisteredTaskManager(taskExecutorConnection.getInstanceID())).hasValueSatisfying(taskManagerInfo -> {
            ObjectAssert cfr_ignored_0 = (ObjectAssert)Assertions.assertThat((Object)taskManagerInfo.getAvailableResource()).isEqualTo((Object)ResourceProfile.fromResources((double)2.0, (int)8));
        });
        AllocationID allocationId3 = ((TaskManagerInfo)taskManagerTracker.getRegisteredTaskManager(taskExecutorConnection.getInstanceID()).get()).getAllocatedSlots().keySet().stream().filter(allocationId -> !allocationId.equals((Object)allocationId1) && !allocationId.equals((Object)allocationId2)).findAny().get();
        slotStatusSyncer.reportSlotStatus(taskExecutorConnection.getInstanceID(), slotReport2);
        Assertions.assertThat((Collection)resourceTracker.getAcquiredResources(jobId)).contains((Object[])new ResourceRequirement[]{ResourceRequirement.create((ResourceProfile)resource, (int)2)});
        Assertions.assertThat((Optional)taskManagerTracker.getRegisteredTaskManager(taskExecutorConnection.getInstanceID())).hasValueSatisfying(taskManagerInfo -> {
            ObjectAssert cfr_ignored_0 = (ObjectAssert)Assertions.assertThat((Object)taskManagerInfo.getAvailableResource()).isEqualTo((Object)ResourceProfile.fromResources((double)3.0, (int)12));
        });
        Assertions.assertThat((Optional)taskManagerTracker.getAllocatedOrPendingSlot(allocationId2)).isNotPresent();
        Assertions.assertThat((Optional)taskManagerTracker.getAllocatedOrPendingSlot(allocationId1)).hasValueSatisfying(slot -> {
            AbstractComparableAssert cfr_ignored_0 = (AbstractComparableAssert)Assertions.assertThat((Comparable)slot.getState()).isEqualTo((Object)SlotState.ALLOCATED);
        });
        Assertions.assertThat((Optional)taskManagerTracker.getAllocatedOrPendingSlot(allocationId3)).hasValueSatisfying(slot -> {
            AbstractComparableAssert cfr_ignored_0 = (AbstractComparableAssert)Assertions.assertThat((Comparable)slot.getState()).isEqualTo((Object)SlotState.PENDING);
        });
    }

    private static void testSlotAllocation(QuadConsumer<SlotStatusSyncer, TaskManagerTracker, InstanceID, AllocationID> beforeCompletingSlotRequestCallback) throws ExecutionException, InterruptedException {
        FineGrainedTaskManagerTracker taskManagerTracker = new FineGrainedTaskManagerTracker();
        CompletableFuture requestFuture = new CompletableFuture();
        CompletableFuture<Acknowledge> responseFuture = new CompletableFuture<Acknowledge>();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple6 -> {
            requestFuture.complete(tuple6.f2);
            return responseFuture;
        }).createTestingTaskExecutorGateway();
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(ResourceID.generate(), (TaskExecutorGateway)taskExecutorGateway);
        taskManagerTracker.addTaskManager(taskExecutorConnection, ResourceProfile.ANY, ResourceProfile.ANY);
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        JobID jobId = new JobID();
        DefaultSlotStatusSyncer slotStatusSyncer = new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT);
        slotStatusSyncer.initialize((TaskManagerTracker)taskManagerTracker, (ResourceTracker)resourceTracker, ResourceManagerId.generate(), (Executor)EXECUTOR_RESOURCE.getExecutor());
        CompletableFuture allocatedFuture = slotStatusSyncer.allocateSlot(taskExecutorConnection.getInstanceID(), jobId, "address", ResourceProfile.ANY);
        AllocationID allocationId = (AllocationID)requestFuture.get();
        Assertions.assertThat((Collection)resourceTracker.getAcquiredResources(jobId)).contains((Object[])new ResourceRequirement[]{ResourceRequirement.create((ResourceProfile)ResourceProfile.ANY, (int)1)});
        Assertions.assertThat((Optional)taskManagerTracker.getAllocatedOrPendingSlot(allocationId)).hasValueSatisfying(slot -> {
            Assertions.assertThat((Comparable)slot.getJobId()).isEqualTo((Object)jobId);
            Assertions.assertThat((Comparable)slot.getState()).isEqualTo((Object)SlotState.PENDING);
        });
        beforeCompletingSlotRequestCallback.accept((Object)slotStatusSyncer, (Object)taskManagerTracker, (Object)taskExecutorConnection.getInstanceID(), (Object)allocationId);
        responseFuture.complete(Acknowledge.get());
        FlinkAssertions.assertThatFuture((CompletableFuture)allocatedFuture).eventuallySucceeds();
    }
}

