/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.slotmanager.PendingTaskManagerSlot;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceDeclaration;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerUtils;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerRegistration;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerSlotId;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class TaskExecutorManager
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorManager.class);
    private final ResourceProfile defaultSlotResourceProfile;
    private final WorkerResourceSpec defaultWorkerResourceSpec;
    private final int numSlotsPerWorker;
    private final int maxSlotNum;
    private final boolean waitResultConsumedBeforeRelease;
    private final int redundantTaskManagerNum;
    private final Time taskManagerTimeout;
    private final ResourceAllocator resourceAllocator;
    private final Map<InstanceID, TaskManagerRegistration> taskManagerRegistrations = new HashMap<InstanceID, TaskManagerRegistration>();
    private final Map<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots = new HashMap<TaskManagerSlotId, PendingTaskManagerSlot>();
    private final Executor mainThreadExecutor;
    private final ScheduledFuture<?> taskManagerTimeoutsAndRedundancyCheck;
    private final Set<InstanceID> unWantedWorkers;
    private final ScheduledExecutor scheduledExecutor;
    private final Duration declareNeededResourceDelay;
    private CompletableFuture<Void> declareNeededResourceFuture;

    TaskExecutorManager(WorkerResourceSpec defaultWorkerResourceSpec, int numSlotsPerWorker, int maxNumSlots, boolean waitResultConsumedBeforeRelease, int redundantTaskManagerNum, Time taskManagerTimeout, Duration declareNeededResourceDelay, ScheduledExecutor scheduledExecutor, Executor mainThreadExecutor, ResourceAllocator resourceAllocator) {
        this.defaultWorkerResourceSpec = defaultWorkerResourceSpec;
        this.numSlotsPerWorker = numSlotsPerWorker;
        this.maxSlotNum = maxNumSlots;
        this.waitResultConsumedBeforeRelease = waitResultConsumedBeforeRelease;
        this.redundantTaskManagerNum = redundantTaskManagerNum;
        this.taskManagerTimeout = taskManagerTimeout;
        this.defaultSlotResourceProfile = SlotManagerUtils.generateDefaultSlotResourceProfile(defaultWorkerResourceSpec, numSlotsPerWorker);
        this.scheduledExecutor = scheduledExecutor;
        this.declareNeededResourceDelay = declareNeededResourceDelay;
        this.unWantedWorkers = new HashSet<InstanceID>();
        this.resourceAllocator = (ResourceAllocator)Preconditions.checkNotNull((Object)resourceAllocator);
        this.mainThreadExecutor = mainThreadExecutor;
        this.taskManagerTimeoutsAndRedundancyCheck = scheduledExecutor.scheduleWithFixedDelay(() -> mainThreadExecutor.execute(this::checkTaskManagerTimeoutsAndRedundancy), 0L, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    @Override
    public void close() {
        this.taskManagerTimeoutsAndRedundancyCheck.cancel(false);
    }

    public boolean isTaskManagerRegistered(InstanceID instanceId) {
        return this.taskManagerRegistrations.containsKey((Object)instanceId);
    }

    public boolean registerTaskManager(TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport, ResourceProfile totalResourceProfile, ResourceProfile defaultSlotResourceProfile) {
        if (this.isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
            LOG.info("The total number of slots exceeds the max limitation {}, could not register the excess task executor {}.", (Object)this.maxSlotNum, (Object)taskExecutorConnection.getInstanceID());
            return false;
        }
        TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(taskExecutorConnection, StreamSupport.stream(initialSlotReport.spliterator(), false).map(SlotStatus::getSlotID).collect(Collectors.toList()), totalResourceProfile, defaultSlotResourceProfile);
        this.taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration);
        for (SlotStatus slotStatus : initialSlotReport) {
            if (slotStatus.getJobID() != null) continue;
            this.findAndRemoveExactlyMatchingPendingTaskManagerSlot(slotStatus.getResourceProfile());
        }
        return true;
    }

    private boolean isMaxSlotNumExceededAfterRegistration(SlotReport initialSlotReport) {
        if (!this.isMaxSlotNumExceededAfterAdding(initialSlotReport.getNumSlotStatus())) {
            return false;
        }
        return this.isMaxSlotNumExceededAfterAdding(this.getNumNonPendingReportedNewSlots(initialSlotReport));
    }

    private int getNumNonPendingReportedNewSlots(SlotReport slotReport) {
        HashSet<TaskManagerSlotId> matchingPendingSlots = new HashSet<TaskManagerSlotId>();
        block0: for (SlotStatus slotStatus : slotReport) {
            if (slotStatus.getAllocationID() != null) continue;
            for (PendingTaskManagerSlot pendingTaskManagerSlot : this.pendingSlots.values()) {
                if (matchingPendingSlots.contains((Object)pendingTaskManagerSlot.getTaskManagerSlotId()) || !this.isPendingSlotExactlyMatchingResourceProfile(pendingTaskManagerSlot, slotStatus.getResourceProfile())) continue;
                matchingPendingSlots.add(pendingTaskManagerSlot.getTaskManagerSlotId());
                continue block0;
            }
        }
        return slotReport.getNumSlotStatus() - matchingPendingSlots.size();
    }

    private void findAndRemoveExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile) {
        for (PendingTaskManagerSlot pendingTaskManagerSlot : this.pendingSlots.values()) {
            if (!this.isPendingSlotExactlyMatchingResourceProfile(pendingTaskManagerSlot, resourceProfile)) continue;
            this.pendingSlots.remove((Object)pendingTaskManagerSlot.getTaskManagerSlotId());
            return;
        }
    }

    private boolean isPendingSlotExactlyMatchingResourceProfile(PendingTaskManagerSlot pendingTaskManagerSlot, ResourceProfile resourceProfile) {
        return pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile);
    }

    public void unregisterTaskExecutor(InstanceID instanceId) {
        this.taskManagerRegistrations.remove((Object)instanceId);
        this.unWantedWorkers.remove((Object)instanceId);
    }

    public Collection<InstanceID> getTaskExecutors() {
        return new ArrayList<InstanceID>(this.taskManagerRegistrations.keySet());
    }

    public Optional<ResourceRequirement> allocateWorker(ResourceProfile requestedSlotResourceProfile) {
        if (!this.resourceAllocator.isSupported()) {
            return Optional.empty();
        }
        int numRegisteredSlots = this.getNumberRegisteredSlots();
        int numPendingSlots = this.getNumberPendingTaskManagerSlots();
        if (this.isMaxSlotNumExceededAfterAdding(this.numSlotsPerWorker)) {
            LOG.warn("Could not allocate {} more slots. The number of registered and pending slots is {}, while the maximum is {}.", new Object[]{this.numSlotsPerWorker, numPendingSlots + numRegisteredSlots, this.maxSlotNum});
            return Optional.empty();
        }
        if (!this.defaultSlotResourceProfile.isMatching(requestedSlotResourceProfile)) {
            return Optional.empty();
        }
        for (int i = 0; i < this.numSlotsPerWorker; ++i) {
            PendingTaskManagerSlot pendingTaskManagerSlot = new PendingTaskManagerSlot(this.defaultSlotResourceProfile);
            this.pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(), pendingTaskManagerSlot);
        }
        this.declareNeededResourcesWithDelay();
        return Optional.of(ResourceRequirement.create(this.defaultSlotResourceProfile, this.numSlotsPerWorker));
    }

    private boolean isMaxSlotNumExceededAfterAdding(int numNewSlot) {
        return this.getNumberRegisteredSlots() + this.getNumberPendingTaskManagerSlots() + numNewSlot > this.maxSlotNum;
    }

    private Collection<ResourceDeclaration> getResourceDeclaration() {
        int pendingWorkerNum = MathUtils.divideRoundUp((int)this.getNumberPendingTaskManagerSlots(), (int)this.numSlotsPerWorker);
        HashSet<InstanceID> neededRegisteredWorkers = new HashSet<InstanceID>(this.taskManagerRegistrations.keySet());
        neededRegisteredWorkers.removeAll(this.unWantedWorkers);
        int totalWorkerNum = pendingWorkerNum + neededRegisteredWorkers.size();
        return Collections.singleton(new ResourceDeclaration(this.defaultWorkerResourceSpec, totalWorkerNum, new HashSet<InstanceID>(this.unWantedWorkers)));
    }

    private void declareNeededResourcesWithDelay() {
        Preconditions.checkState((boolean)this.resourceAllocator.isSupported());
        if (this.declareNeededResourceDelay.toMillis() <= 0L) {
            this.declareNeededResources();
        } else if (this.declareNeededResourceFuture == null || this.declareNeededResourceFuture.isDone()) {
            this.declareNeededResourceFuture = new CompletableFuture();
            this.scheduledExecutor.schedule(() -> this.mainThreadExecutor.execute(() -> {
                this.declareNeededResources();
                ((CompletableFuture)Preconditions.checkNotNull(this.declareNeededResourceFuture)).complete(null);
            }), this.declareNeededResourceDelay.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private void declareNeededResources() {
        this.resourceAllocator.declareResourceNeeded(this.getResourceDeclaration());
    }

    @VisibleForTesting
    int getNumberPendingTaskManagerSlots() {
        return this.pendingSlots.size();
    }

    private void checkTaskManagerTimeoutsAndRedundancy() {
        if (!this.taskManagerRegistrations.isEmpty()) {
            long currentTime = System.currentTimeMillis();
            ArrayList<TaskManagerRegistration> timedOutTaskManagers = new ArrayList<TaskManagerRegistration>(this.taskManagerRegistrations.size());
            for (TaskManagerRegistration taskManagerRegistration : this.taskManagerRegistrations.values()) {
                if (currentTime - taskManagerRegistration.getIdleSince() < this.taskManagerTimeout.toMilliseconds()) continue;
                timedOutTaskManagers.add(taskManagerRegistration);
            }
            int slotsDiff = this.redundantTaskManagerNum * this.numSlotsPerWorker - this.getNumberFreeSlots();
            if (slotsDiff > 0) {
                int requiredTaskManagers = MathUtils.divideRoundUp((int)slotsDiff, (int)this.numSlotsPerWorker);
                this.allocateRedundantTaskManagers(requiredTaskManagers);
            } else {
                int maxReleaseNum = -slotsDiff / this.numSlotsPerWorker;
                this.releaseIdleTaskExecutors(timedOutTaskManagers, Math.min(maxReleaseNum, timedOutTaskManagers.size()));
            }
        }
    }

    private void allocateRedundantTaskManagers(int number) {
        LOG.debug("Allocating {} task executors for redundancy.", (Object)number);
        int allocatedNumber = this.allocateWorkers(number);
        if (number != allocatedNumber) {
            LOG.warn("Expect to allocate {} taskManagers. Actually allocate {} taskManagers.", (Object)number, (Object)allocatedNumber);
        }
    }

    private int allocateWorkers(int workerNum) {
        int allocatedWorkerNum = 0;
        for (int i = 0; i < workerNum && this.allocateWorker(this.defaultSlotResourceProfile).isPresent(); ++i) {
            ++allocatedWorkerNum;
        }
        return allocatedWorkerNum;
    }

    private void releaseIdleTaskExecutors(ArrayList<TaskManagerRegistration> timedOutTaskManagers, int releaseNum) {
        for (int index = 0; index < releaseNum; ++index) {
            if (this.waitResultConsumedBeforeRelease) {
                this.releaseIdleTaskExecutorIfPossible(timedOutTaskManagers.get(index));
                continue;
            }
            this.releaseIdleTaskExecutor(timedOutTaskManagers.get(index).getInstanceId());
        }
    }

    private void releaseIdleTaskExecutorIfPossible(TaskManagerRegistration taskManagerRegistration) {
        long idleSince = taskManagerRegistration.getIdleSince();
        taskManagerRegistration.getTaskManagerConnection().getTaskExecutorGateway().canBeReleased().thenAcceptAsync(canBeReleased -> {
            boolean stillIdle;
            InstanceID timedOutTaskManagerId = taskManagerRegistration.getInstanceId();
            boolean bl = stillIdle = idleSince == taskManagerRegistration.getIdleSince();
            if (stillIdle && canBeReleased.booleanValue()) {
                this.releaseIdleTaskExecutor(timedOutTaskManagerId);
            }
        }, this.mainThreadExecutor);
    }

    private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) {
        if (this.resourceAllocator.isSupported()) {
            LOG.debug("Release TaskExecutor {} because it exceeded the idle timeout.", (Object)timedOutTaskManagerId);
            this.unWantedWorkers.add(timedOutTaskManagerId);
            this.declareNeededResourcesWithDelay();
        }
    }

    public ResourceProfile getTotalRegisteredResources() {
        return this.taskManagerRegistrations.values().stream().map(TaskManagerRegistration::getTotalResource).reduce(ResourceProfile.ZERO, ResourceProfile::merge);
    }

    public ResourceProfile getTotalRegisteredResourcesOf(InstanceID instanceID) {
        return Optional.ofNullable(this.taskManagerRegistrations.get((Object)instanceID)).map(TaskManagerRegistration::getTotalResource).orElse(ResourceProfile.ZERO);
    }

    public ResourceProfile getTotalFreeResources() {
        return this.taskManagerRegistrations.values().stream().map(taskManagerRegistration -> taskManagerRegistration.getDefaultSlotResourceProfile().multiply(taskManagerRegistration.getNumberFreeSlots())).reduce(ResourceProfile.ZERO, ResourceProfile::merge);
    }

    public ResourceProfile getTotalFreeResourcesOf(InstanceID instanceID) {
        return Optional.ofNullable(this.taskManagerRegistrations.get((Object)instanceID)).map(taskManagerRegistration -> taskManagerRegistration.getDefaultSlotResourceProfile().multiply(taskManagerRegistration.getNumberFreeSlots())).orElse(ResourceProfile.ZERO);
    }

    public Iterable<SlotID> getSlotsOf(InstanceID instanceId) {
        return this.taskManagerRegistrations.get((Object)instanceId).getSlots();
    }

    public int getNumberRegisteredSlots() {
        return this.taskManagerRegistrations.values().stream().map(TaskManagerRegistration::getNumberRegisteredSlots).reduce(0, Integer::sum);
    }

    public int getNumberRegisteredSlotsOf(InstanceID instanceId) {
        TaskManagerRegistration taskManagerRegistration = this.taskManagerRegistrations.get((Object)instanceId);
        if (taskManagerRegistration != null) {
            return taskManagerRegistration.getNumberRegisteredSlots();
        }
        return 0;
    }

    public int getNumberFreeSlots() {
        return this.taskManagerRegistrations.values().stream().map(TaskManagerRegistration::getNumberFreeSlots).reduce(0, Integer::sum);
    }

    public int getNumberFreeSlotsOf(InstanceID instanceId) {
        TaskManagerRegistration taskManagerRegistration = this.taskManagerRegistrations.get((Object)instanceId);
        if (taskManagerRegistration != null) {
            return taskManagerRegistration.getNumberFreeSlots();
        }
        return 0;
    }

    public Collection<PendingTaskManagerSlot> getPendingTaskManagerSlots() {
        return this.pendingSlots.values();
    }

    public void removePendingTaskManagerSlots(ResourceCounter unusedResourceCounter) {
        if (!this.resourceAllocator.isSupported()) {
            return;
        }
        Preconditions.checkState((unusedResourceCounter.getResources().size() == 1 ? 1 : 0) != 0);
        Preconditions.checkState((boolean)unusedResourceCounter.getResources().contains(this.defaultSlotResourceProfile));
        int wantedPendingSlotsNumber = this.pendingSlots.size() - unusedResourceCounter.getResourceCount(this.defaultSlotResourceProfile);
        this.pendingSlots.entrySet().removeIf(ignore -> this.pendingSlots.size() > wantedPendingSlotsNumber);
        this.declareNeededResourcesWithDelay();
    }

    public void clearPendingTaskManagerSlots() {
        if (!this.resourceAllocator.isSupported()) {
            return;
        }
        if (!this.pendingSlots.isEmpty()) {
            this.pendingSlots.clear();
            this.declareNeededResourcesWithDelay();
        }
    }

    public void occupySlot(InstanceID instanceId) {
        this.taskManagerRegistrations.get((Object)instanceId).occupySlot();
    }

    public void freeSlot(InstanceID instanceId) {
        this.taskManagerRegistrations.get((Object)instanceId).freeSlot();
    }

    public void markUsed(InstanceID instanceID) {
        this.taskManagerRegistrations.get((Object)instanceID).markUsed();
    }
}

