/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.analyzer.plugins;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.analyzer.Analyzer;
import org.apache.tez.analyzer.CSVResult;
import org.apache.tez.analyzer.plugins.TezAnalyzerBase;
import org.apache.tez.analyzer.utils.SVGUtils;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.history.parser.datamodel.Container;
import org.apache.tez.history.parser.datamodel.DagInfo;
import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
import org.apache.tez.history.parser.datamodel.TaskInfo;
import org.apache.tez.history.parser.datamodel.VertexInfo;

public class CriticalPathAnalyzer
extends TezAnalyzerBase
implements Analyzer {
    String succeededState = StringInterner.weakIntern((String)TaskAttemptState.SUCCEEDED.name());
    String failedState = StringInterner.weakIntern((String)TaskAttemptState.FAILED.name());
    public static final String DRAW_SVG = "tez.critical-path-analyzer.draw-svg";
    public static final String OUTPUT_DIR = "output-dir";
    List<CriticalPathStep> criticalPath = Lists.newLinkedList();
    Map<String, TaskAttemptInfo> attempts = Maps.newHashMap();
    int maxConcurrency = 0;
    ArrayList<TimeInfo> concurrencyByTime = Lists.newArrayList();

    public CriticalPathAnalyzer() {
    }

    public CriticalPathAnalyzer(Configuration conf) {
        this.setConf(conf);
    }

    @Override
    public void analyze(DagInfo dagInfo) throws TezException {
        TaskAttemptInfo lastAttempt = null;
        long lastAttemptFinishTime = 0L;
        for (VertexInfo vertex : dagInfo.getVertices()) {
            for (TaskInfo task : vertex.getTasks()) {
                for (TaskAttemptInfo attempt : task.getTaskAttempts()) {
                    this.attempts.put(attempt.getTaskAttemptId(), attempt);
                    if (!attempt.getStatus().equals(this.succeededState) && !attempt.getStatus().equals(this.failedState) || lastAttemptFinishTime >= attempt.getFinishTime()) continue;
                    lastAttempt = attempt;
                    lastAttemptFinishTime = attempt.getFinishTime();
                }
            }
        }
        if (lastAttempt == null) {
            System.out.println("Cannot find last attempt to finish in DAG " + dagInfo.getDagId());
            return;
        }
        this.createCriticalPath(dagInfo, lastAttempt, lastAttemptFinishTime, this.attempts);
        this.analyzeCriticalPath(dagInfo);
        if (this.getConf().getBoolean(DRAW_SVG, true)) {
            this.saveCriticalPathAsSVG(dagInfo);
        }
    }

    public List<CriticalPathStep> getCriticalPath() {
        return this.criticalPath;
    }

    private void saveCriticalPathAsSVG(DagInfo dagInfo) {
        SVGUtils svg = new SVGUtils();
        String outputDir = this.getOutputDir();
        if (outputDir == null) {
            outputDir = this.getConf().get(OUTPUT_DIR);
        }
        String outputFileName = outputDir + File.separator + dagInfo.getDagId() + ".svg";
        System.out.println("Writing output to: " + outputFileName);
        svg.saveCriticalPathAsSVG(dagInfo, outputFileName, this.criticalPath);
    }

    private void determineConcurrency(DagInfo dag) {
        ArrayList timeInfo = Lists.newArrayList();
        for (VertexInfo v : dag.getVertices()) {
            for (Object t : v.getTasks()) {
                for (TaskAttemptInfo a : t.getTaskAttempts()) {
                    if (a.getStartTime() <= 0L) continue;
                    timeInfo.add(new TimeInfo(a.getStartTime(), true));
                    timeInfo.add(new TimeInfo(a.getFinishTime(), false));
                }
            }
        }
        Collections.sort(timeInfo);
        int concurrency = 0;
        Object lastTimeInfo = null;
        for (Object t : timeInfo) {
            int n = this.maxConcurrency = (concurrency += t.start ? 1 : -1) > this.maxConcurrency ? concurrency : this.maxConcurrency;
            if (lastTimeInfo == null || lastTimeInfo.timestamp < t.timestamp) {
                lastTimeInfo = t;
                lastTimeInfo.count = concurrency;
                this.concurrencyByTime.add((TimeInfo)lastTimeInfo);
                continue;
            }
            lastTimeInfo.count = concurrency;
        }
    }

    private int getIntervalMaxConcurrency(long begin, long end) {
        int concurrency = 0;
        for (TimeInfo timeInfo : this.concurrencyByTime) {
            if (timeInfo.timestamp < begin) continue;
            if (timeInfo.timestamp > end) break;
            if (timeInfo.count <= concurrency) continue;
            concurrency = timeInfo.count;
        }
        return concurrency;
    }

    private void analyzeAllocationOverhead(DagInfo dag) {
        ArrayList preemptedAttempts = Lists.newArrayList();
        for (VertexInfo v : dag.getVertices()) {
            for (TaskInfo t : v.getTasks()) {
                for (TaskAttemptInfo a : t.getTaskAttempts()) {
                    if (!a.getTerminationCause().equals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION.name())) continue;
                    System.out.println("Found preempted attempt " + a.getTaskAttemptId());
                    preemptedAttempts.add(a);
                }
            }
        }
        for (int i = 0; i < this.criticalPath.size(); ++i) {
            Container container;
            CriticalPathStep step = this.criticalPath.get(i);
            TaskAttemptInfo attempt = step.attempt;
            if (step.getType() != CriticalPathStep.EntityType.ATTEMPT) continue;
            long creationTime = attempt.getCreationTime();
            long allocationTime = attempt.getAllocationTime();
            long finishTime = attempt.getFinishTime();
            if (allocationTime < step.startCriticalPathTime || (container = attempt.getContainer()) == null) continue;
            Collection attempts = dag.getContainerMapping().get((Object)container);
            if (attempts != null && !attempts.isEmpty()) {
                TaskAttemptInfo containerAttempt;
                ArrayList attemptsList = Lists.newArrayList((Iterable)attempts);
                Collections.sort(attemptsList, TaskAttemptInfo.orderingOnAllocationTime());
                long containerPreviousAllocatedTime = 0L;
                int reUsesForVertex = 1;
                Iterator i$ = attemptsList.iterator();
                while (i$.hasNext() && !(containerAttempt = (TaskAttemptInfo)i$.next()).getTaskAttemptId().equals(attempt.getTaskAttemptId())) {
                    if (containerAttempt.getTaskInfo().getVertexInfo().getVertexId().equals(attempt.getTaskInfo().getVertexInfo().getVertexId())) {
                        ++reUsesForVertex;
                    }
                    long cAllocTime = containerAttempt.getAllocationTime();
                    long cFinishTime = containerAttempt.getFinishTime();
                    if (cFinishTime <= creationTime) continue;
                    containerPreviousAllocatedTime += cFinishTime - (cAllocTime > creationTime ? cAllocTime : creationTime);
                }
                int numVertexTasks = attempt.getTaskInfo().getVertexInfo().getNumTasks();
                int intervalMaxConcurrency = this.getIntervalMaxConcurrency(creationTime, finishTime);
                double numWaves = this.getWaves(numVertexTasks, intervalMaxConcurrency);
                if (reUsesForVertex > 1) {
                    step.notes.add("Container ran multiple tasks for this vertex. ");
                    if (numWaves < 1.0) {
                        step.notes.add("Vertex potentially seeing contention from other branches in the DAG. ");
                    }
                }
                if (containerPreviousAllocatedTime == 0L) {
                    step.notes.add("Container newly allocated.");
                } else if (containerPreviousAllocatedTime >= attempt.getCreationToAllocationTimeInterval()) {
                    step.notes.add("Container was fully allocated");
                } else {
                    step.notes.add("Container in use for " + SVGUtils.getTimeStr(containerPreviousAllocatedTime) + " out of " + SVGUtils.getTimeStr(attempt.getCreationToAllocationTimeInterval()) + " of allocation wait time");
                }
            }
            for (TaskAttemptInfo a : preemptedAttempts) {
                if (a.getTaskInfo().getVertexInfo().getVertexId().equals(attempt.getTaskInfo().getVertexInfo().getVertexId()) || a.getFinishTime() <= creationTime || a.getFinishTime() >= allocationTime) continue;
                step.notes.add("Potentially waited for preemption of " + a.getShortName());
            }
        }
    }

    private double getWaves(int numTasks, int concurrency) {
        double numWaves = (double)numTasks * 1.0 / (double)concurrency;
        numWaves = (double)Math.round(numWaves * 10.0) / 10.0;
        return numWaves;
    }

    private void analyzeWaves(DagInfo dag) {
        for (int i = 0; i < this.criticalPath.size(); ++i) {
            CriticalPathStep step = this.criticalPath.get(i);
            TaskAttemptInfo attempt = step.attempt;
            if (step.getType() != CriticalPathStep.EntityType.ATTEMPT) continue;
            long creationTime = attempt.getCreationTime();
            long finishTime = attempt.getFinishTime();
            int numVertexTasks = attempt.getTaskInfo().getVertexInfo().getNumTasks();
            if (numVertexTasks <= 1) continue;
            int intervalMaxConcurrency = this.getIntervalMaxConcurrency(creationTime, finishTime);
            double numWaves = this.getWaves(numVertexTasks, intervalMaxConcurrency);
            step.notes.add("Vertex ran " + numVertexTasks + " tasks in " + numWaves + " waves with available concurrency of " + intervalMaxConcurrency);
            if (!(numWaves > 1.0) || !(numWaves % 1.0 < 0.5)) continue;
            step.notes.add("Last partial wave did not use full concurrency. ");
        }
    }

    private void analyzeStragglers(DagInfo dag) {
        long dagStartTime = dag.getStartTime();
        long dagTime = dag.getFinishTime() - dagStartTime;
        long totalAttemptCriticalTime = 0L;
        for (int i = 0; i < this.criticalPath.size(); ++i) {
            long attemptExecTime;
            long avgPostDataExecutionTime;
            CriticalPathStep step = this.criticalPath.get(i);
            totalAttemptCriticalTime += step.stopCriticalPathTime - step.startCriticalPathTime;
            TaskAttemptInfo attempt = step.attempt;
            if (step.getType() != CriticalPathStep.EntityType.ATTEMPT || attempt.getLastDataEvents().size() > 1 || (avgPostDataExecutionTime = attempt.getTaskInfo().getVertexInfo().getAvgPostDataExecutionTimeInterval()) <= 0L || !((double)avgPostDataExecutionTime * 1.25 < (double)(attemptExecTime = attempt.getPostDataExecutionTimeInterval()))) continue;
            step.notes.add("Potential straggler. Post Data Execution time " + SVGUtils.getTimeStr(attemptExecTime) + " compared to vertex average of " + SVGUtils.getTimeStr(avgPostDataExecutionTime));
        }
        System.out.println("DAG time taken: " + dagTime + " TotalAttemptTime: " + totalAttemptCriticalTime + " DAG finish time: " + dag.getFinishTime() + " DAG start time: " + dagStartTime);
    }

    private void analyzeCriticalPath(DagInfo dag) {
        if (!this.criticalPath.isEmpty()) {
            this.determineConcurrency(dag);
            this.analyzeStragglers(dag);
            this.analyzeWaves(dag);
            this.analyzeAllocationOverhead(dag);
        }
    }

    private void createCriticalPath(DagInfo dagInfo, TaskAttemptInfo lastAttempt, long lastAttemptFinishTime, Map<String, TaskAttemptInfo> attempts) {
        LinkedList tempCP = Lists.newLinkedList();
        if (lastAttempt != null) {
            TaskAttemptInfo currentAttempt = lastAttempt;
            CriticalPathStep currentStep = new CriticalPathStep(currentAttempt, CriticalPathStep.EntityType.DAG_COMMIT);
            long currentAttemptStopCriticalPathTime = lastAttemptFinishTime;
            currentStep.stopCriticalPathTime = dagInfo.getFinishTime() > 0L ? dagInfo.getFinishTime() : currentAttemptStopCriticalPathTime;
            currentStep.startCriticalPathTime = currentAttemptStopCriticalPathTime;
            currentStep.reason = CriticalPathDependency.COMMIT_DEPENDENCY;
            tempCP.add(currentStep);
            while (true) {
                Preconditions.checkState((currentAttempt != null ? 1 : 0) != 0);
                Preconditions.checkState((currentAttemptStopCriticalPathTime > 0L ? 1 : 0) != 0);
                System.out.println("Step: " + tempCP.size() + " Attempt: " + currentAttempt.getTaskAttemptId());
                currentStep = new CriticalPathStep(currentAttempt, CriticalPathStep.EntityType.ATTEMPT);
                currentStep.stopCriticalPathTime = currentAttemptStopCriticalPathTime;
                long currentStepLastDataEventTime = 0L;
                String currentStepLastDataTA = null;
                TaskAttemptInfo.DataDependencyEvent item = currentAttempt.getLastDataEventInfo(currentStep.stopCriticalPathTime);
                if (item != null) {
                    currentStepLastDataEventTime = item.getTimestamp();
                    currentStepLastDataTA = item.getTaskAttemptId();
                }
                for (CriticalPathStep previousStep : tempCP) {
                    if (previousStep.type != CriticalPathStep.EntityType.ATTEMPT || !previousStep.attempt.getTaskAttemptId().equals(currentAttempt.getTaskAttemptId())) continue;
                    List dataEvents = currentAttempt.getLastDataEvents();
                    Preconditions.checkState((dataEvents.size() > 1 ? 1 : 0) != 0);
                    Preconditions.checkState((currentStepLastDataEventTime < ((TaskAttemptInfo.DataDependencyEvent)dataEvents.get(dataEvents.size() - 1)).getTimestamp() ? 1 : 0) != 0);
                }
                tempCP.add(currentStep);
                boolean dataDependency = false;
                if (currentStepLastDataEventTime > currentAttempt.getCreationTime()) {
                    dataDependency = true;
                }
                long startCriticalPathTime = 0L;
                String nextAttemptId = null;
                CriticalPathDependency reason = null;
                if (dataDependency) {
                    System.out.println("Has data dependency");
                    if (!Strings.isNullOrEmpty((String)currentStepLastDataTA)) {
                        nextAttemptId = currentStepLastDataTA;
                        reason = CriticalPathDependency.DATA_DEPENDENCY;
                        startCriticalPathTime = currentStepLastDataEventTime;
                        System.out.println("Using data dependency " + nextAttemptId);
                    } else {
                        VertexInfo vertex = currentAttempt.getTaskInfo().getVertexInfo();
                        Preconditions.checkState((!vertex.getAdditionalInputInfoList().isEmpty() ? 1 : 0) != 0, (Object)("Vertex: " + vertex.getVertexId() + " has no external inputs but the last data event " + "TA is null for " + currentAttempt.getTaskAttemptId()));
                        nextAttemptId = null;
                        reason = CriticalPathDependency.INIT_DEPENDENCY;
                        System.out.println("Using init dependency");
                    }
                } else {
                    System.out.println("Has scheduling dependency");
                    if (!Strings.isNullOrEmpty((String)currentAttempt.getCreationCausalTA())) {
                        nextAttemptId = currentAttempt.getCreationCausalTA();
                        reason = CriticalPathDependency.RETRY_DEPENDENCY;
                        TaskAttemptInfo nextAttempt = attempts.get(nextAttemptId);
                        if (nextAttemptId != null) {
                            VertexInfo currentVertex = currentAttempt.getTaskInfo().getVertexInfo();
                            VertexInfo nextVertex = nextAttempt.getTaskInfo().getVertexInfo();
                            if (!nextVertex.getVertexName().equals(currentVertex.getVertexName())) {
                                for (VertexInfo outVertex : currentVertex.getOutputVertices()) {
                                    if (!nextVertex.getVertexName().equals(outVertex.getVertexName())) continue;
                                    reason = CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY;
                                    break;
                                }
                            }
                        }
                        if (reason == CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY) {
                            startCriticalPathTime = currentAttempt.getCreationTime();
                        } else {
                            Preconditions.checkState((nextAttempt != null ? 1 : 0) != 0);
                            Preconditions.checkState((boolean)nextAttempt.getTaskInfo().getTaskId().equals(currentAttempt.getTaskInfo().getTaskId()));
                            startCriticalPathTime = nextAttempt.getFinishTime();
                        }
                        System.out.println("Using scheduling dependency " + nextAttemptId);
                    } else if (!Strings.isNullOrEmpty((String)currentStepLastDataTA)) {
                        nextAttemptId = currentStepLastDataTA;
                        reason = CriticalPathDependency.DATA_DEPENDENCY;
                        startCriticalPathTime = currentStepLastDataEventTime;
                        long overhead = currentAttempt.getCreationTime() - currentStepLastDataEventTime;
                        currentStep.notes.add("Initializer/VertexManager scheduling overhead " + SVGUtils.getTimeStr(overhead));
                        System.out.println("Using data dependency " + nextAttemptId);
                    } else {
                        nextAttemptId = null;
                        reason = CriticalPathDependency.INIT_DEPENDENCY;
                        System.out.println("Using init dependency");
                    }
                }
                currentStep.startCriticalPathTime = startCriticalPathTime;
                currentStep.reason = reason;
                Preconditions.checkState((currentStep.stopCriticalPathTime >= currentStep.startCriticalPathTime ? 1 : 0) != 0);
                if (Strings.isNullOrEmpty((String)nextAttemptId)) {
                    Preconditions.checkState((boolean)reason.equals((Object)CriticalPathDependency.INIT_DEPENDENCY));
                    Preconditions.checkState((startCriticalPathTime == 0L ? 1 : 0) != 0);
                    long initStepStopCriticalTime = currentStep.startCriticalPathTime = currentStep.attempt.getCreationTime();
                    currentStep = new CriticalPathStep(currentAttempt, CriticalPathStep.EntityType.VERTEX_INIT);
                    currentStep.stopCriticalPathTime = initStepStopCriticalTime;
                    currentStep.startCriticalPathTime = dagInfo.getStartTime();
                    currentStep.reason = CriticalPathDependency.INIT_DEPENDENCY;
                    tempCP.add(currentStep);
                    if (!tempCP.isEmpty()) {
                        for (int i = tempCP.size() - 1; i >= 0; --i) {
                            this.criticalPath.add((CriticalPathStep)tempCP.get(i));
                        }
                    }
                    return;
                }
                currentAttempt = attempts.get(nextAttemptId);
                currentAttemptStopCriticalPathTime = startCriticalPathTime;
            }
        }
    }

    @Override
    public CSVResult getResult() throws TezException {
        String[] headers = new String[]{"Entity", "PathReason", "Status", "CriticalStartTime", "CriticalStopTime", "Notes"};
        CSVResult csvResult = new CSVResult(headers);
        for (CriticalPathStep step : this.criticalPath) {
            String entity = step.getType() == CriticalPathStep.EntityType.ATTEMPT ? step.getAttempt().getTaskAttemptId() : (step.getType() == CriticalPathStep.EntityType.VERTEX_INIT ? step.attempt.getTaskInfo().getVertexInfo().getVertexName() : "DAG COMMIT");
            String[] record = new String[]{entity, step.getReason().name(), step.getAttempt().getDetailedStatus(), String.valueOf(step.getStartCriticalTime()), String.valueOf(step.getStopCriticalTime()), Joiner.on((String)";").join(step.getNotes())};
            csvResult.addRecord(record);
        }
        return csvResult;
    }

    @Override
    public String getName() {
        return "CriticalPathAnalyzer";
    }

    @Override
    public String getDescription() {
        return "Analyze critical path of the DAG";
    }

    @Override
    public Configuration getConfiguration() {
        return this.getConf();
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run((Configuration)new Configuration(), (Tool)new CriticalPathAnalyzer(), (String[])args);
        System.exit(res);
    }

    static class TimeInfo
    implements Comparable<TimeInfo> {
        long timestamp;
        int count;
        boolean start;

        TimeInfo(long timestamp, boolean start) {
            this.timestamp = timestamp;
            this.start = start;
        }

        @Override
        public int compareTo(TimeInfo o) {
            return Long.compare(this.timestamp, o.timestamp);
        }

        public int hashCode() {
            return (int)(this.timestamp >> 32 ^ this.timestamp);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null) {
                return false;
            }
            if (o.getClass() == this.getClass()) {
                TimeInfo other = (TimeInfo)o;
                return this.compareTo(other) == 0;
            }
            return false;
        }
    }

    public static class CriticalPathStep {
        EntityType type;
        TaskAttemptInfo attempt;
        CriticalPathDependency reason;
        long startCriticalPathTime;
        long stopCriticalPathTime;
        List<String> notes = Lists.newLinkedList();

        public CriticalPathStep(TaskAttemptInfo attempt, EntityType type) {
            this.type = type;
            this.attempt = attempt;
        }

        public EntityType getType() {
            return this.type;
        }

        public TaskAttemptInfo getAttempt() {
            return this.attempt;
        }

        public long getStartCriticalTime() {
            return this.startCriticalPathTime;
        }

        public long getStopCriticalTime() {
            return this.stopCriticalPathTime;
        }

        public CriticalPathDependency getReason() {
            return this.reason;
        }

        public List<String> getNotes() {
            return this.notes;
        }

        public static enum EntityType {
            ATTEMPT,
            VERTEX_INIT,
            DAG_COMMIT;

        }
    }

    public static enum CriticalPathDependency {
        DATA_DEPENDENCY,
        INIT_DEPENDENCY,
        COMMIT_DEPENDENCY,
        RETRY_DEPENDENCY,
        OUTPUT_RECREATE_DEPENDENCY;

    }
}

