/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.dag.impl;

import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
import org.apache.tez.dag.records.TezTaskID;

public class DAGSchedulerMRR
implements DAGScheduler {
    private static final Log LOG = LogFactory.getLog(DAGSchedulerMRR.class);
    private final DAG dag;
    private final TaskSchedulerEventHandler taskScheduler;
    private final EventHandler handler;
    private final float minReservedShuffleResource;
    private Vertex currentPartitioner = null;
    private Vertex currentShuffler = null;
    private int currentShufflerDepth = 0;
    int numShuffleTasksScheduled = 0;
    List<TaskAttempt> pendingShuffleTasks = new LinkedList<TaskAttempt>();
    Set<TezTaskID> unassignedShuffleTasks = new HashSet<TezTaskID>();
    Resource realShufflerResource = null;
    Set<TezTaskID> unassignedPartitionTasks = new HashSet<TezTaskID>();
    Resource realPartitionerResource = null;

    public DAGSchedulerMRR(DAG dag, EventHandler dispatcher, TaskSchedulerEventHandler taskScheduler, float minReservedShuffleResource) {
        this.dag = dag;
        this.handler = dispatcher;
        this.taskScheduler = taskScheduler;
        this.minReservedShuffleResource = minReservedShuffleResource;
    }

    @Override
    public void vertexCompleted(Vertex vertex) {
        if (this.currentPartitioner != null) {
            if (vertex != this.currentPartitioner) {
                String message = vertex.getVertexId() + " finished. Expecting" + " current partitioner " + this.currentPartitioner.getVertexId() + " to finish.";
                LOG.fatal((Object)message);
                throw new TezUncheckedException(message);
            }
            LOG.info((Object)("Current partitioner " + this.currentPartitioner.getVertexId() + " is completed. " + (this.currentShuffler != null ? this.currentShuffler.getVertexId() + " is new partitioner" : "No current shuffler to replace the partitioner")));
            this.currentPartitioner = this.currentShuffler;
            assert (this.unassignedPartitionTasks.isEmpty());
            this.unassignedPartitionTasks.addAll(this.unassignedShuffleTasks);
            this.unassignedShuffleTasks.clear();
            this.realPartitionerResource = this.realShufflerResource;
            this.realShufflerResource = null;
            this.currentShuffler = null;
            this.schedulePendingShuffles(this.pendingShuffleTasks.size());
            assert (this.pendingShuffleTasks.isEmpty());
            this.numShuffleTasksScheduled = 0;
        }
    }

    @Override
    public void scheduleTask(DAGEventSchedulerUpdate event) {
        TaskAttempt attempt = event.getAttempt();
        Vertex vertex = this.dag.getVertex(attempt.getVertexID());
        int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
        LOG.info((Object)("Schedule task: " + attempt.getID()));
        if (this.currentPartitioner == null) {
            this.currentPartitioner = vertex;
            this.currentShufflerDepth = vertexDistanceFromRoot;
            assert (this.realPartitionerResource == null);
            Resource partitionerResource = this.currentPartitioner.getTaskResource();
            this.realPartitionerResource = Resource.newInstance((int)partitionerResource.getMemory(), (int)partitionerResource.getVirtualCores());
            LOG.info((Object)(vertex.getVertexId() + " is new partitioner at depth " + vertexDistanceFromRoot));
        } else if (this.currentShuffler == null && vertexDistanceFromRoot > this.currentShufflerDepth) {
            this.currentShuffler = vertex;
            this.currentShufflerDepth = vertexDistanceFromRoot;
            assert (this.realShufflerResource == null);
            Resource shufflerResource = this.currentShuffler.getTaskResource();
            this.realShufflerResource = Resource.newInstance((int)shufflerResource.getMemory(), (int)shufflerResource.getVirtualCores());
            LOG.info((Object)(vertex.getVertexId() + " is new shuffler at depth " + this.currentShufflerDepth));
        }
        if (this.currentShuffler == vertex) {
            this.pendingShuffleTasks.add(attempt);
            this.unassignedShuffleTasks.add(attempt.getTaskID());
            this.schedulePendingShuffles(this.getNumShufflesToSchedule());
            return;
        }
        if (this.currentPartitioner == vertex) {
            this.unassignedPartitionTasks.add(attempt.getTaskID());
        }
        if (this.currentPartitioner != vertex && this.currentShuffler != vertex && vertexDistanceFromRoot >= this.currentPartitioner.getDistanceFromRoot()) {
            String message = vertex.getVertexId() + " is neither the " + " current partitioner: " + this.currentPartitioner.getVertexId() + " nor the current shuffler: " + this.currentShuffler.getVertexId();
            LOG.fatal((Object)message);
            throw new TezUncheckedException(message);
        }
        this.scheduleTaskAttempt(attempt);
    }

    @Override
    public void taskScheduled(DAGEventSchedulerUpdateTAAssigned event) {
        TaskAttempt attempt = event.getAttempt();
        Vertex vertex = this.dag.getVertex(attempt.getVertexID());
        LOG.info((Object)("Task assigned: " + attempt.getID() + " Vertex: Total:" + vertex.getTotalTasks() + " succeeded: " + vertex.getSucceededTasks() + " Resource: " + event.getContainer().getResource().getMemory()));
        if (this.currentPartitioner == vertex) {
            this.unassignedPartitionTasks.remove(attempt.getTaskID());
            Resource resource = event.getContainer().getResource();
            if (resource.getMemory() > this.realPartitionerResource.getMemory()) {
                this.realPartitionerResource.setMemory(resource.getMemory());
            }
        } else if (this.currentShuffler == vertex) {
            this.unassignedShuffleTasks.remove(attempt.getTaskID());
            Resource resource = event.getContainer().getResource();
            if (resource.getMemory() > this.realShufflerResource.getMemory()) {
                this.realShufflerResource.setMemory(resource.getMemory());
            }
        }
        this.schedulePendingShuffles(this.getNumShufflesToSchedule());
    }

    @Override
    public void taskSucceeded(DAGEventSchedulerUpdate event) {
        TaskAttempt attempt = event.getAttempt();
        Vertex vertex = this.dag.getVertex(attempt.getVertexID());
        LOG.info((Object)("Task succeeded: " + attempt.getID() + " Vertex: Total:" + vertex.getTotalTasks() + " succeeded: " + vertex.getSucceededTasks()));
        this.schedulePendingShuffles(this.getNumShufflesToSchedule());
    }

    int getNumShufflesToSchedule() {
        int maxShufflerMem;
        int numPartionersSucceeded;
        assert (this.currentPartitioner != null);
        if (this.pendingShuffleTasks.isEmpty()) {
            return 0;
        }
        if (this.unassignedPartitionTasks.isEmpty()) {
            LOG.info((Object)"All partitioners assigned. Scheduling all shufflers.");
            return this.pendingShuffleTasks.size();
        }
        assert (this.currentShuffler != null);
        Resource totalResources = this.taskScheduler.getTotalResources();
        Resource freeResources = this.taskScheduler.getAvailableResources();
        int totalMem = totalResources.getMemory();
        int freeMem = freeResources.getMemory();
        int partitionerTaskMem = this.realPartitionerResource.getMemory();
        int shufflerTaskMem = this.realShufflerResource.getMemory();
        int shufflerMemAssigned = shufflerTaskMem * this.numShuffleTasksScheduled;
        int numPartitioners = this.currentPartitioner.getTotalTasks();
        int numPartionersLeft = numPartitioners - (numPartionersSucceeded = this.currentPartitioner.getSucceededTasks());
        int partitionerMemNeeded = numPartionersLeft * partitionerTaskMem;
        int shufflerMemLeft = totalMem - partitionerMemNeeded;
        if (shufflerMemLeft < (maxShufflerMem = (int)((float)totalMem * Math.min(this.minReservedShuffleResource, (float)numPartionersSucceeded / (float)numPartitioners)))) {
            shufflerMemLeft = maxShufflerMem;
        }
        LOG.info((Object)("TotalMem: " + totalMem + " Headroom: " + freeMem + " PartitionerTaskMem: " + partitionerTaskMem + " ShufflerTaskMem: " + shufflerTaskMem + " MaxShuffleMem: " + maxShufflerMem + " PartitionerMemNeeded:" + partitionerMemNeeded + " ShufflerMemAssigned: " + shufflerMemAssigned + " ShufflerMemLeft: " + (shufflerMemLeft -= shufflerMemAssigned) + " Pending shufflers: " + this.pendingShuffleTasks.size()));
        if (shufflerMemLeft < 0) {
            return 0;
        }
        if (shufflerTaskMem == 0) {
            return this.pendingShuffleTasks.size();
        }
        int shufflersToSchedule = shufflerMemLeft / shufflerTaskMem;
        if (totalMem - (shufflerMemAssigned += shufflerTaskMem * shufflersToSchedule) < partitionerTaskMem) {
            LOG.info((Object)"Not scheduling more shufflers as it starves partitioners");
            return 0;
        }
        return shufflersToSchedule;
    }

    void schedulePendingShuffles(int scheduleCount) {
        while (!this.pendingShuffleTasks.isEmpty() && scheduleCount > 0) {
            --scheduleCount;
            TaskAttempt shuffleAttempt = this.pendingShuffleTasks.remove(0);
            this.scheduleTaskAttempt(shuffleAttempt);
            if (shuffleAttempt.getIsRescheduled()) continue;
            ++this.numShuffleTasksScheduled;
        }
    }

    void scheduleTaskAttempt(TaskAttempt attempt) {
        boolean reOrderPriority = false;
        Vertex vertex = this.dag.getVertex(attempt.getVertexID());
        int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
        int priority = (vertexDistanceFromRoot + 1) * 3;
        if (this.currentShuffler == vertex) {
            assert (this.currentPartitioner != null);
            if (!this.unassignedPartitionTasks.isEmpty()) {
                reOrderPriority = true;
            }
        }
        if (reOrderPriority) {
            priority -= 4;
        } else if (attempt.getIsRescheduled()) {
            priority -= 2;
        }
        LOG.info((Object)("Scheduling " + attempt.getID() + " with depth " + vertexDistanceFromRoot + " at priority " + priority));
        TaskAttemptEventSchedule attemptEvent = new TaskAttemptEventSchedule(attempt.getID(), Priority.newInstance((int)priority));
        this.sendEvent(attemptEvent);
    }

    void sendEvent(TaskAttemptEventSchedule event) {
        this.handler.handle((Event)event);
    }
}

