/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerUtils;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingResourceActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingResourceActionsBuilder;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

public class SlotManagerFailUnfulfillableTest
extends TestLogger {
    private static final WorkerResourceSpec WORKER_RESOURCE_SPEC = new WorkerResourceSpec.Builder().setCpuCores(100.0).setTaskHeapMemoryMB(10000).setTaskOffHeapMemoryMB(10000).setNetworkMemoryMB(10000).setManagedMemoryMB(10000).build();

    @Test
    public void testTurnOnKeepsPendingFulfillableRequests() throws Exception {
        ResourceProfile resourceProfile = ResourceProfile.fromResources((double)2.0, (int)100);
        SlotManager slotManager = SlotManagerFailUnfulfillableTest.createSlotManagerNotStartingNewTMs();
        slotManager.setFailUnfulfillableRequest(false);
        SlotManagerFailUnfulfillableTest.registerFreeSlot(slotManager, resourceProfile);
        slotManager.registerSlotRequest(SlotManagerFailUnfulfillableTest.slotRequest(resourceProfile));
        slotManager.registerSlotRequest(SlotManagerFailUnfulfillableTest.slotRequest(resourceProfile));
        slotManager.setFailUnfulfillableRequest(true);
        Assert.assertEquals((long)1L, (long)slotManager.getNumberPendingSlotRequests());
    }

    @Test
    public void testTurnOnCancelsPendingUnFulfillableRequests() throws Exception {
        ResourceProfile availableProfile = ResourceProfile.fromResources((double)2.0, (int)100);
        ResourceProfile unfulfillableProfile = ResourceProfile.fromResources((double)1.0, (int)200);
        ArrayList<Tuple3<JobID, AllocationID, Exception>> allocationFailures = new ArrayList<Tuple3<JobID, AllocationID, Exception>>();
        SlotManager slotManager = SlotManagerFailUnfulfillableTest.createSlotManagerNotStartingNewTMs(allocationFailures);
        slotManager.setFailUnfulfillableRequest(false);
        SlotManagerFailUnfulfillableTest.registerFreeSlot(slotManager, availableProfile);
        SlotRequest request = SlotManagerFailUnfulfillableTest.slotRequest(unfulfillableProfile);
        slotManager.registerSlotRequest(request);
        slotManager.setFailUnfulfillableRequest(true);
        Assert.assertEquals((long)1L, (long)allocationFailures.size());
        Assert.assertEquals((Object)request.getAllocationId(), (Object)((Tuple3)allocationFailures.get((int)0)).f1);
        Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)((Throwable)((Tuple3)allocationFailures.get((int)0)).f2), UnfulfillableSlotRequestException.class).isPresent());
        Assert.assertEquals((long)0L, (long)slotManager.getNumberPendingSlotRequests());
    }

    @Test
    public void testTurnOnKeepsRequestsWithStartingTMs() throws Exception {
        ResourceProfile availableProfile = ResourceProfile.fromResources((double)2.0, (int)100);
        ResourceProfile newTmProfile = SlotManagerUtils.generateDefaultSlotResourceProfile((WorkerResourceSpec)WORKER_RESOURCE_SPEC, (int)1);
        SlotManager slotManager = SlotManagerFailUnfulfillableTest.createSlotManagerStartingNewTMs();
        slotManager.setFailUnfulfillableRequest(false);
        SlotManagerFailUnfulfillableTest.registerFreeSlot(slotManager, availableProfile);
        slotManager.registerSlotRequest(SlotManagerFailUnfulfillableTest.slotRequest(newTmProfile));
        slotManager.setFailUnfulfillableRequest(true);
        Assert.assertEquals((long)1L, (long)slotManager.getNumberPendingSlotRequests());
    }

    @Test
    public void testFulfillableRequestsKeepPendingWhenOn() throws Exception {
        ResourceProfile availableProfile = ResourceProfile.fromResources((double)2.0, (int)100);
        SlotManager slotManager = SlotManagerFailUnfulfillableTest.createSlotManagerNotStartingNewTMs();
        SlotManagerFailUnfulfillableTest.registerFreeSlot(slotManager, availableProfile);
        slotManager.registerSlotRequest(SlotManagerFailUnfulfillableTest.slotRequest(availableProfile));
        slotManager.registerSlotRequest(SlotManagerFailUnfulfillableTest.slotRequest(availableProfile));
        Assert.assertEquals((long)1L, (long)slotManager.getNumberPendingSlotRequests());
    }

    @Test
    public void testUnfulfillableRequestsFailWhenOn() {
        ResourceProfile availableProfile = ResourceProfile.fromResources((double)2.0, (int)100);
        ResourceProfile unfulfillableProfile = ResourceProfile.fromResources((double)2.0, (int)200);
        ArrayList<Tuple3<JobID, AllocationID, Exception>> notifiedAllocationFailures = new ArrayList<Tuple3<JobID, AllocationID, Exception>>();
        SlotManager slotManager = SlotManagerFailUnfulfillableTest.createSlotManagerNotStartingNewTMs(notifiedAllocationFailures);
        SlotManagerFailUnfulfillableTest.registerFreeSlot(slotManager, availableProfile);
        try {
            slotManager.registerSlotRequest(SlotManagerFailUnfulfillableTest.slotRequest(unfulfillableProfile));
            Assert.fail((String)"this should cause an exception");
        }
        catch (ResourceManagerException exception) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)exception, UnfulfillableSlotRequestException.class).isPresent());
        }
        Assert.assertEquals((long)0L, (long)notifiedAllocationFailures.size());
        Assert.assertEquals((long)0L, (long)slotManager.getNumberPendingSlotRequests());
    }

    @Test
    public void testStartingTmKeepsSlotPendingWhenOn() throws Exception {
        ResourceProfile availableProfile = ResourceProfile.fromResources((double)2.0, (int)100);
        ResourceProfile newTmProfile = SlotManagerUtils.generateDefaultSlotResourceProfile((WorkerResourceSpec)WORKER_RESOURCE_SPEC, (int)1);
        SlotManager slotManager = SlotManagerFailUnfulfillableTest.createSlotManagerStartingNewTMs();
        SlotManagerFailUnfulfillableTest.registerFreeSlot(slotManager, availableProfile);
        slotManager.registerSlotRequest(SlotManagerFailUnfulfillableTest.slotRequest(newTmProfile));
        Assert.assertEquals((long)1L, (long)slotManager.getNumberPendingSlotRequests());
    }

    private static SlotManager createSlotManagerNotStartingNewTMs() {
        return SlotManagerFailUnfulfillableTest.createSlotManager(new ArrayList<Tuple3<JobID, AllocationID, Exception>>(), false);
    }

    private static SlotManager createSlotManagerNotStartingNewTMs(List<Tuple3<JobID, AllocationID, Exception>> notifiedAllocationFailures) {
        return SlotManagerFailUnfulfillableTest.createSlotManager(notifiedAllocationFailures, false);
    }

    private static SlotManager createSlotManagerStartingNewTMs() {
        return SlotManagerFailUnfulfillableTest.createSlotManager(new ArrayList<Tuple3<JobID, AllocationID, Exception>>(), true);
    }

    private static SlotManager createSlotManager(List<Tuple3<JobID, AllocationID, Exception>> notifiedAllocationFailures, boolean startNewTMs) {
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().setAllocateResourceFunction(ignored -> startNewTMs).setNotifyAllocationFailureConsumer(tuple3 -> notifiedAllocationFailures.add((Tuple3<JobID, AllocationID, Exception>)tuple3)).build();
        SlotManagerImpl slotManager = SlotManagerBuilder.newBuilder().setDefaultWorkerResourceSpec(WORKER_RESOURCE_SPEC).build();
        slotManager.start(ResourceManagerId.generate(), Executors.directExecutor(), (ResourceActions)resourceManagerActions);
        return slotManager;
    }

    private static void registerFreeSlot(SlotManager slotManager, ResourceProfile slotProfile) {
        ResourceID resourceID = ResourceID.generate();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, (TaskExecutorGateway)taskExecutorGateway);
        SlotReport slotReport = new SlotReport(Collections.singleton(new SlotStatus(new SlotID(resourceID, 0), slotProfile)));
        slotManager.registerTaskManager(taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
    }

    private static SlotRequest slotRequest(ResourceProfile profile) {
        return new SlotRequest(new JobID(), new AllocationID(), profile, "foobar");
    }
}

