/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive.allocator;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.adaptive.allocator.FreeSlotFunction;
import org.apache.flink.runtime.scheduler.adaptive.allocator.IsSlotAvailableAndFreeFunction;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.ReserveSlotFunction;
import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator;
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestSlotInfo;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelismWithSlotSharing;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class SlotSharingSlotAllocatorTest
extends TestLogger {
    private static final FreeSlotFunction TEST_FREE_SLOT_FUNCTION = (a, c, t) -> {};
    private static final ReserveSlotFunction TEST_RESERVE_SLOT_FUNCTION = (allocationId, resourceProfile) -> TestingPhysicalSlot.builder().withAllocationID(allocationId).withResourceProfile(resourceProfile).build();
    private static final IsSlotAvailableAndFreeFunction TEST_IS_SLOT_FREE_FUNCTION = ignored -> true;
    private static final SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
    private static final SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup();
    private static final JobInformation.VertexInformation vertex1 = new TestVertexInformation(new JobVertexID(), 4, slotSharingGroup1);
    private static final JobInformation.VertexInformation vertex2 = new TestVertexInformation(new JobVertexID(), 2, slotSharingGroup1);
    private static final JobInformation.VertexInformation vertex3 = new TestVertexInformation(new JobVertexID(), 3, slotSharingGroup2);

    @Test
    public void testCalculateRequiredSlots() {
        SlotSharingSlotAllocator slotAllocator = SlotSharingSlotAllocator.createSlotSharingSlotAllocator((ReserveSlotFunction)TEST_RESERVE_SLOT_FUNCTION, (FreeSlotFunction)TEST_FREE_SLOT_FUNCTION, (IsSlotAvailableAndFreeFunction)TEST_IS_SLOT_FREE_FUNCTION);
        ResourceCounter resourceCounter = slotAllocator.calculateRequiredSlots(Arrays.asList(vertex1, vertex2, vertex3));
        Assert.assertThat((Object)resourceCounter.getResources(), (Matcher)Matchers.contains((Object[])new ResourceProfile[]{ResourceProfile.UNKNOWN}));
        Assert.assertThat((Object)resourceCounter.getResourceCount(ResourceProfile.UNKNOWN), (Matcher)CoreMatchers.is((Object)(Math.max(vertex1.getParallelism(), vertex2.getParallelism()) + vertex3.getParallelism())));
    }

    @Test
    public void testDetermineParallelismWithMinimumSlots() {
        SlotSharingSlotAllocator slotAllocator = SlotSharingSlotAllocator.createSlotSharingSlotAllocator((ReserveSlotFunction)TEST_RESERVE_SLOT_FUNCTION, (FreeSlotFunction)TEST_FREE_SLOT_FUNCTION, (IsSlotAvailableAndFreeFunction)TEST_IS_SLOT_FREE_FUNCTION);
        TestJobInformation jobInformation = new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
        VertexParallelism slotSharingAssignments = (VertexParallelism)slotAllocator.determineParallelism((JobInformation)jobInformation, SlotSharingSlotAllocatorTest.getSlots(2)).get();
        Map maxParallelismForVertices = slotSharingAssignments.getMaxParallelismForVertices();
        Assert.assertThat(maxParallelismForVertices.get(vertex1.getJobVertexID()), (Matcher)CoreMatchers.is((Object)1));
        Assert.assertThat(maxParallelismForVertices.get(vertex2.getJobVertexID()), (Matcher)CoreMatchers.is((Object)1));
        Assert.assertThat(maxParallelismForVertices.get(vertex3.getJobVertexID()), (Matcher)CoreMatchers.is((Object)1));
    }

    @Test
    public void testDetermineParallelismWithManySlots() {
        SlotSharingSlotAllocator slotAllocator = SlotSharingSlotAllocator.createSlotSharingSlotAllocator((ReserveSlotFunction)TEST_RESERVE_SLOT_FUNCTION, (FreeSlotFunction)TEST_FREE_SLOT_FUNCTION, (IsSlotAvailableAndFreeFunction)TEST_IS_SLOT_FREE_FUNCTION);
        TestJobInformation jobInformation = new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
        VertexParallelism slotSharingAssignments = (VertexParallelism)slotAllocator.determineParallelism((JobInformation)jobInformation, SlotSharingSlotAllocatorTest.getSlots(50)).get();
        Map maxParallelismForVertices = slotSharingAssignments.getMaxParallelismForVertices();
        Assert.assertThat(maxParallelismForVertices.get(vertex1.getJobVertexID()), (Matcher)CoreMatchers.is((Object)vertex1.getParallelism()));
        Assert.assertThat(maxParallelismForVertices.get(vertex2.getJobVertexID()), (Matcher)CoreMatchers.is((Object)vertex2.getParallelism()));
        Assert.assertThat(maxParallelismForVertices.get(vertex3.getJobVertexID()), (Matcher)CoreMatchers.is((Object)vertex3.getParallelism()));
    }

    @Test
    public void testDetermineParallelismWithVariedParallelism() {
        SlotSharingSlotAllocator slotAllocator = SlotSharingSlotAllocator.createSlotSharingSlotAllocator((ReserveSlotFunction)TEST_RESERVE_SLOT_FUNCTION, (FreeSlotFunction)TEST_FREE_SLOT_FUNCTION, (IsSlotAvailableAndFreeFunction)TEST_IS_SLOT_FREE_FUNCTION);
        SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
        TestVertexInformation vertex11 = new TestVertexInformation(new JobVertexID(), 4, slotSharingGroup1);
        TestVertexInformation vertex12 = new TestVertexInformation(new JobVertexID(), 1, slotSharingGroup1);
        TestVertexInformation vertex2 = new TestVertexInformation(new JobVertexID(), 2, new SlotSharingGroup());
        TestJobInformation testJobInformation = new TestJobInformation(Arrays.asList(vertex11, vertex12, vertex2));
        Map maxParallelismForVertices = ((VertexParallelismWithSlotSharing)slotAllocator.determineParallelism((JobInformation)testJobInformation, SlotSharingSlotAllocatorTest.getSlots(vertex11.getParallelism() + vertex2.getParallelism())).get()).getMaxParallelismForVertices();
        Assertions.assertThat((Integer)((Integer)maxParallelismForVertices.get(vertex11.getJobVertexID()))).isEqualTo(vertex11.getParallelism());
        Assertions.assertThat((Integer)((Integer)maxParallelismForVertices.get(vertex12.getJobVertexID()))).isEqualTo(vertex12.getParallelism());
        Assertions.assertThat((Integer)((Integer)maxParallelismForVertices.get(vertex2.getJobVertexID()))).isEqualTo(vertex2.getParallelism());
    }

    @Test
    public void testDetermineParallelismUnsuccessfulWithLessSlotsThanSlotSharingGroups() {
        SlotSharingSlotAllocator slotAllocator = SlotSharingSlotAllocator.createSlotSharingSlotAllocator((ReserveSlotFunction)TEST_RESERVE_SLOT_FUNCTION, (FreeSlotFunction)TEST_FREE_SLOT_FUNCTION, (IsSlotAvailableAndFreeFunction)TEST_IS_SLOT_FREE_FUNCTION);
        TestJobInformation jobInformation = new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
        Optional slotSharingAssignments = slotAllocator.determineParallelism((JobInformation)jobInformation, SlotSharingSlotAllocatorTest.getSlots(1));
        Assert.assertThat((Object)slotSharingAssignments.isPresent(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void testReserveAvailableResources() {
        SlotSharingSlotAllocator slotAllocator = SlotSharingSlotAllocator.createSlotSharingSlotAllocator((ReserveSlotFunction)TEST_RESERVE_SLOT_FUNCTION, (FreeSlotFunction)TEST_FREE_SLOT_FUNCTION, (IsSlotAvailableAndFreeFunction)TEST_IS_SLOT_FREE_FUNCTION);
        TestJobInformation jobInformation = new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
        VertexParallelismWithSlotSharing slotAssignments = (VertexParallelismWithSlotSharing)slotAllocator.determineParallelism((JobInformation)jobInformation, SlotSharingSlotAllocatorTest.getSlots(50)).get();
        ReservedSlots reservedSlots = (ReservedSlots)slotAllocator.tryReserveResources((VertexParallelism)slotAssignments).orElseThrow(() -> new RuntimeException("Expected that reservation succeeds."));
        HashMap<ExecutionVertexID, SlotInfo> expectedAssignments = new HashMap<ExecutionVertexID, SlotInfo>();
        for (SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot executionSlotSharingGroupAndSlot : slotAssignments.getAssignments()) {
            for (ExecutionVertexID containedExecutionVertex : executionSlotSharingGroupAndSlot.getExecutionSlotSharingGroup().getContainedExecutionVertices()) {
                expectedAssignments.put(containedExecutionVertex, executionSlotSharingGroupAndSlot.getSlotInfo());
            }
        }
        for (Map.Entry entry : expectedAssignments.entrySet()) {
            LogicalSlot assignedSlot = reservedSlots.getSlotFor((ExecutionVertexID)entry.getKey());
            SlotInfo backingSlot = (SlotInfo)entry.getValue();
            Assert.assertThat((Object)assignedSlot.getAllocationId(), (Matcher)CoreMatchers.is((Object)backingSlot.getAllocationId()));
        }
    }

    @Test
    public void testReserveUnavailableResources() {
        SlotSharingSlotAllocator slotSharingSlotAllocator = SlotSharingSlotAllocator.createSlotSharingSlotAllocator((ReserveSlotFunction)TEST_RESERVE_SLOT_FUNCTION, (FreeSlotFunction)TEST_FREE_SLOT_FUNCTION, ignored -> false);
        TestJobInformation jobInformation = new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3));
        VertexParallelismWithSlotSharing slotAssignments = (VertexParallelismWithSlotSharing)slotSharingSlotAllocator.determineParallelism((JobInformation)jobInformation, SlotSharingSlotAllocatorTest.getSlots(50)).get();
        Optional reservedSlots = slotSharingSlotAllocator.tryReserveResources((VertexParallelism)slotAssignments);
        Assert.assertFalse((boolean)reservedSlots.isPresent());
    }

    private static Collection<SlotInfo> getSlots(int count) {
        ArrayList<SlotInfo> slotInfo = new ArrayList<SlotInfo>();
        for (int i = 0; i < count; ++i) {
            slotInfo.add(new TestSlotInfo());
        }
        return slotInfo;
    }

    private static class TestVertexInformation
    implements JobInformation.VertexInformation {
        private final JobVertexID jobVertexId;
        private final int parallelism;
        private final SlotSharingGroup slotSharingGroup;

        private TestVertexInformation(JobVertexID jobVertexId, int parallelism, SlotSharingGroup slotSharingGroup) {
            this.jobVertexId = jobVertexId;
            this.parallelism = parallelism;
            this.slotSharingGroup = slotSharingGroup;
            slotSharingGroup.addVertexToGroup(jobVertexId);
        }

        public JobVertexID getJobVertexID() {
            return this.jobVertexId;
        }

        public int getParallelism() {
            return this.parallelism;
        }

        public SlotSharingGroup getSlotSharingGroup() {
            return this.slotSharingGroup;
        }
    }

    private static class TestJobInformation
    implements JobInformation {
        private final Map<JobVertexID, JobInformation.VertexInformation> vertexIdToInformation;
        private final Collection<SlotSharingGroup> slotSharingGroups;

        private TestJobInformation(Collection<JobInformation.VertexInformation> vertexIdToInformation) {
            this.vertexIdToInformation = vertexIdToInformation.stream().collect(Collectors.toMap(JobInformation.VertexInformation::getJobVertexID, Function.identity()));
            this.slotSharingGroups = vertexIdToInformation.stream().map(JobInformation.VertexInformation::getSlotSharingGroup).collect(Collectors.toSet());
        }

        public Collection<SlotSharingGroup> getSlotSharingGroups() {
            return this.slotSharingGroups;
        }

        public JobInformation.VertexInformation getVertexInformation(JobVertexID jobVertexId) {
            return this.vertexIdToInformation.get(jobVertexId);
        }

        public Iterable<JobInformation.VertexInformation> getVertices() {
            return this.vertexIdToInformation.values();
        }
    }
}

