/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app;

import com.google.common.annotations.VisibleForTesting;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.dag.app.DAGAppMaster;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.AMLaunchedEvent;
import org.apache.tez.dag.history.events.AMStartedEvent;
import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
import org.apache.tez.dag.history.events.ContainerStoppedEvent;
import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGInitializedEvent;
import org.apache.tez.dag.history.events.DAGKillRequestEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecoveryParser {
    private static final Logger LOG = LoggerFactory.getLogger(RecoveryParser.class);
    private final DAGAppMaster dagAppMaster;
    private final FileSystem recoveryFS;
    private final Path recoveryDataDir;
    private final Path currentAttemptRecoveryDataDir;
    private final int recoveryBufferSize;
    private final int currentAttemptId;

    public RecoveryParser(DAGAppMaster dagAppMaster, FileSystem recoveryFS, Path recoveryDataDir, int currentAttemptId) throws IOException {
        this.dagAppMaster = dagAppMaster;
        this.recoveryFS = recoveryFS;
        this.recoveryDataDir = recoveryDataDir;
        this.currentAttemptId = currentAttemptId;
        this.currentAttemptRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath((Path)recoveryDataDir, (int)currentAttemptId);
        this.recoveryBufferSize = dagAppMaster.getConfig().getInt("tez.dag.recovery.io.buffer.size", 8192);
        this.recoveryFS.mkdirs(this.currentAttemptRecoveryDataDir);
    }

    private static void parseSummaryFile(FSDataInputStream inputStream) throws IOException {
        while (true) {
            RecoveryProtos.SummaryEventProto proto;
            if ((proto = RecoveryProtos.SummaryEventProto.parseDelimitedFrom((InputStream)inputStream)) == null) break;
            LOG.info("[SUMMARY] dagId=" + proto.getDagId() + ", timestamp=" + proto.getTimestamp() + ", event=" + (Object)((Object)HistoryEventType.values()[proto.getEventType()]));
        }
        LOG.info("Reached end of summary stream");
    }

    private static HistoryEvent getNextEvent(FSDataInputStream inputStream) throws IOException {
        HistoryEvent event;
        int eventTypeOrdinal = -1;
        try {
            eventTypeOrdinal = inputStream.readInt();
        }
        catch (EOFException eof) {
            return null;
        }
        if (eventTypeOrdinal < 0 || eventTypeOrdinal >= HistoryEventType.values().length) {
            throw new IOException("Corrupt data found when trying to read next event type, eventTypeOrdinal=" + eventTypeOrdinal);
        }
        HistoryEventType eventType = HistoryEventType.values()[eventTypeOrdinal];
        switch (eventType) {
            case AM_LAUNCHED: {
                event = new AMLaunchedEvent();
                break;
            }
            case AM_STARTED: {
                event = new AMStartedEvent();
                break;
            }
            case DAG_SUBMITTED: {
                event = new DAGSubmittedEvent();
                break;
            }
            case DAG_INITIALIZED: {
                event = new DAGInitializedEvent();
                break;
            }
            case DAG_STARTED: {
                event = new DAGStartedEvent();
                break;
            }
            case DAG_COMMIT_STARTED: {
                event = new DAGCommitStartedEvent();
                break;
            }
            case DAG_FINISHED: {
                event = new DAGFinishedEvent();
                break;
            }
            case DAG_KILL_REQUEST: {
                event = new DAGKillRequestEvent();
                break;
            }
            case CONTAINER_LAUNCHED: {
                event = new ContainerLaunchedEvent();
                break;
            }
            case CONTAINER_STOPPED: {
                event = new ContainerStoppedEvent();
                break;
            }
            case VERTEX_INITIALIZED: {
                event = new VertexInitializedEvent();
                break;
            }
            case VERTEX_STARTED: {
                event = new VertexStartedEvent();
                break;
            }
            case VERTEX_PARALLELISM_UPDATED: {
                event = new VertexParallelismUpdatedEvent();
                break;
            }
            case VERTEX_COMMIT_STARTED: {
                event = new VertexCommitStartedEvent();
                break;
            }
            case VERTEX_GROUP_COMMIT_STARTED: {
                event = new VertexGroupCommitStartedEvent();
                break;
            }
            case VERTEX_GROUP_COMMIT_FINISHED: {
                event = new VertexGroupCommitFinishedEvent();
                break;
            }
            case VERTEX_FINISHED: {
                event = new VertexFinishedEvent();
                break;
            }
            case TASK_STARTED: {
                event = new TaskStartedEvent();
                break;
            }
            case TASK_FINISHED: {
                event = new TaskFinishedEvent();
                break;
            }
            case TASK_ATTEMPT_STARTED: {
                event = new TaskAttemptStartedEvent();
                break;
            }
            case TASK_ATTEMPT_FINISHED: {
                event = new TaskAttemptFinishedEvent();
                break;
            }
            case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: {
                event = new VertexRecoverableEventsGeneratedEvent();
                break;
            }
            default: {
                throw new IOException("Invalid data found, unknown event type " + (Object)((Object)eventType));
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Parsing event from input stream, eventType=" + (Object)((Object)eventType));
        }
        try {
            event.fromProtoStream((InputStream)inputStream);
        }
        catch (EOFException eof) {
            return null;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Parsed event from input stream, eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
        }
        return event;
    }

    public static List<HistoryEvent> parseDAGRecoveryFile(FSDataInputStream inputStream) throws IOException {
        ArrayList<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>();
        while (true) {
            HistoryEvent historyEvent;
            if ((historyEvent = RecoveryParser.getNextEvent(inputStream)) == null) break;
            historyEvents.add(historyEvent);
        }
        LOG.info("Reached end of stream");
        return historyEvents;
    }

    public static void main(String[] argv) throws IOException {
        Configuration conf = new Configuration();
        String summaryPath = argv[0];
        ArrayList<String> dagPaths = new ArrayList<String>();
        if (argv.length > 1) {
            for (int i = 1; i < argv.length; ++i) {
                dagPaths.add(argv[i]);
            }
        }
        FileSystem fs = FileSystem.get((Configuration)conf);
        LOG.info("Parsing Summary file " + summaryPath);
        RecoveryParser.parseSummaryFile(fs.open(new Path(summaryPath)));
        for (String dagPath : dagPaths) {
            LOG.info("Parsing DAG recovery file " + dagPath);
            List<HistoryEvent> historyEvents = RecoveryParser.parseDAGRecoveryFile(fs.open(new Path(dagPath)));
            for (HistoryEvent historyEvent : historyEvents) {
                LOG.info("Parsed event from recovery stream, eventType=" + (Object)((Object)historyEvent.getEventType()) + ", event=" + historyEvent);
            }
        }
    }

    private Path getSummaryPath(Path attemptRrecoveryDataDir) {
        return TezCommonUtils.getSummaryRecoveryPath((Path)attemptRrecoveryDataDir);
    }

    private FSDataInputStream getSummaryStream(Path summaryPath) throws IOException {
        if (!this.recoveryFS.exists(summaryPath)) {
            return null;
        }
        return this.recoveryFS.open(summaryPath, this.recoveryBufferSize);
    }

    private Path getDAGRecoveryFilePath(Path recoveryDataDir, TezDAGID dagID) {
        return new Path(recoveryDataDir, dagID.toString() + ".recovery");
    }

    @VisibleForTesting
    DAGSummaryData getLastCompletedOrInProgressDAG(Map<TezDAGID, DAGSummaryData> dagSummaryDataMap) {
        DAGSummaryData inProgressDAG = null;
        DAGSummaryData lastCompletedDAG = null;
        for (Map.Entry<TezDAGID, DAGSummaryData> entry : dagSummaryDataMap.entrySet()) {
            if (!entry.getValue().completed) {
                if (inProgressDAG != null) {
                    throw new RuntimeException("Multiple in progress DAGs seen, dagId=" + inProgressDAG.dagId + ", dagId=" + entry.getKey());
                }
                inProgressDAG = entry.getValue();
                continue;
            }
            if (lastCompletedDAG != null && lastCompletedDAG.dagId.getId() >= entry.getValue().dagId.getId()) continue;
            lastCompletedDAG = entry.getValue();
        }
        if (inProgressDAG == null) {
            return lastCompletedDAG;
        }
        return inProgressDAG;
    }

    private String isDAGRecoverable(DAGSummaryData data) {
        if (!data.dagCommitCompleted) {
            return "DAG Commit was in progress, not recoverable, dagId=" + data.dagId;
        }
        if (!data.vertexCommitStatus.isEmpty()) {
            for (Map.Entry<TezVertexID, Boolean> entry : data.vertexCommitStatus.entrySet()) {
                if (entry.getValue().booleanValue()) continue;
                return "Vertex Commit was in progress, not recoverable, dagId=" + data.dagId + ", vertexId=" + entry.getKey();
            }
        }
        if (!data.vertexGroupCommitStatus.isEmpty()) {
            for (Map.Entry<Object, Boolean> entry : data.vertexGroupCommitStatus.entrySet()) {
                if (entry.getValue().booleanValue()) continue;
                return "Vertex Group Commit was in progress, not recoverable, dagId=" + data.dagId + ", vertexGroup=" + (String)entry.getKey();
            }
        }
        return null;
    }

    private List<Path> getSummaryFiles() throws IOException {
        ArrayList<Path> summaryFiles = new ArrayList<Path>();
        for (int i = 1; i < this.currentAttemptId; ++i) {
            Path attemptPath = TezCommonUtils.getAttemptRecoveryPath((Path)this.recoveryDataDir, (int)i);
            Path fatalErrorOccurred = new Path(attemptPath, "RecoveryFatalErrorOccurred");
            if (this.recoveryFS.exists(fatalErrorOccurred)) {
                throw new IOException("Found that a fatal error occurred in recovery during previous attempt, foundFile=" + fatalErrorOccurred.toString());
            }
            Path summaryFile = this.getSummaryPath(attemptPath);
            if (!this.recoveryFS.exists(summaryFile)) continue;
            summaryFiles.add(summaryFile);
        }
        return summaryFiles;
    }

    private List<Path> getDAGRecoveryFiles(TezDAGID dagId) throws IOException {
        ArrayList<Path> recoveryFiles = new ArrayList<Path>();
        for (int i = 1; i < this.currentAttemptId; ++i) {
            Path attemptPath = TezCommonUtils.getAttemptRecoveryPath((Path)this.recoveryDataDir, (int)i);
            Path recoveryFile = this.getDAGRecoveryFilePath(attemptPath, dagId);
            if (!this.recoveryFS.exists(recoveryFile)) continue;
            recoveryFiles.add(recoveryFile);
        }
        return recoveryFiles;
    }

    public RecoveredDAGData parseRecoveryData() throws IOException {
        String nonRecoverableReason;
        int dagCounter = 0;
        HashMap<TezDAGID, DAGSummaryData> dagSummaryDataMap = new HashMap<TezDAGID, DAGSummaryData>();
        List<Path> summaryFiles = this.getSummaryFiles();
        for (Path summaryFile : summaryFiles) {
            FileStatus summaryFileStatus = this.recoveryFS.getFileStatus(summaryFile);
            LOG.info("Parsing summary file, path=" + summaryFile.toString() + ", len=" + summaryFileStatus.getLen() + ", lastModTime=" + summaryFileStatus.getModificationTime());
            FSDataInputStream summaryStream = this.getSummaryStream(summaryFile);
            while (true) {
                TezDAGID dagId;
                RecoveryProtos.SummaryEventProto proto;
                block70: {
                    try {
                        proto = RecoveryProtos.SummaryEventProto.parseDelimitedFrom((InputStream)summaryStream);
                        if (proto == null) {
                            LOG.info("Reached end of summary stream");
                        }
                        break block70;
                    }
                    catch (EOFException eof) {
                        LOG.info("Reached end of summary stream");
                    }
                    break;
                }
                HistoryEventType eventType = HistoryEventType.values()[proto.getEventType()];
                if (LOG.isDebugEnabled()) {
                    LOG.debug("[RECOVERY SUMMARY] dagId=" + proto.getDagId() + ", timestamp=" + proto.getTimestamp() + ", event=" + (Object)((Object)eventType));
                }
                if ((dagId = TezDAGID.fromString((String)proto.getDagId())) == null) {
                    throw new IOException("null dagId, summary records may be corrupted");
                }
                if (dagCounter < dagId.getId()) {
                    dagCounter = dagId.getId();
                }
                if (!dagSummaryDataMap.containsKey(dagId)) {
                    dagSummaryDataMap.put(dagId, new DAGSummaryData(dagId));
                }
                try {
                    ((DAGSummaryData)dagSummaryDataMap.get(dagId)).handleSummaryEvent(proto);
                }
                catch (Exception e) {
                    throw new IOException("Error when parsing summary event proto", e);
                }
            }
            summaryStream.close();
        }
        this.dagAppMaster.setDAGCounter(dagCounter);
        for (DAGSummaryData dagSummaryData : dagSummaryDataMap.values()) {
            this.dagAppMaster.dagIDs.add(dagSummaryData.dagId.toString());
        }
        DAGSummaryData lastInProgressDAGData = this.getLastCompletedOrInProgressDAG(dagSummaryDataMap);
        if (lastInProgressDAGData == null) {
            LOG.info("Nothing to recover as no uncompleted/completed DAGs found");
            return null;
        }
        TezDAGID lastInProgressDAG = lastInProgressDAGData.dagId;
        if (lastInProgressDAG == null) {
            LOG.info("Nothing to recover as no uncompleted/completed DAGs found");
            return null;
        }
        LOG.info("Checking if DAG is in recoverable state, dagId=" + lastInProgressDAGData.dagId);
        RecoveredDAGData recoveredDAGData = new RecoveredDAGData();
        if (lastInProgressDAGData.completed) {
            recoveredDAGData.isCompleted = true;
            recoveredDAGData.dagState = lastInProgressDAGData.dagState;
        }
        if ((nonRecoverableReason = this.isDAGRecoverable(lastInProgressDAGData)) != null) {
            LOG.warn("Found last inProgress DAG but not recoverable: " + lastInProgressDAGData);
            recoveredDAGData.nonRecoverable = true;
            recoveredDAGData.reason = nonRecoverableReason;
        }
        List<Path> dagRecoveryFiles = this.getDAGRecoveryFiles(lastInProgressDAG);
        boolean skipAllOtherEvents = false;
        Path lastRecoveryFile = null;
        for (Path dagRecoveryFile : dagRecoveryFiles) {
            if (skipAllOtherEvents) {
                LOG.warn("Other recovery files will be skipped due to error in the previous recovery file" + lastRecoveryFile);
                break;
            }
            lastRecoveryFile = dagRecoveryFile;
            LOG.info("Trying to recover dag from recovery file, dagId=" + lastInProgressDAG.toString() + ", dagRecoveryFile=" + dagRecoveryFile);
            FSDataInputStream dagRecoveryStream = this.recoveryFS.open(dagRecoveryFile, this.recoveryBufferSize);
            while (true) {
                HistoryEvent event;
                block71: {
                    try {
                        event = RecoveryParser.getNextEvent(dagRecoveryStream);
                        if (event == null) {
                            LOG.info("Reached end of dag recovery stream");
                        }
                        break block71;
                    }
                    catch (EOFException eof) {
                        LOG.info("Reached end of dag recovery stream");
                    }
                    catch (IOException ioe) {
                        LOG.warn("Corrupt data found when trying to read next event", (Throwable)ioe);
                    }
                    break;
                }
                if (skipAllOtherEvents) break;
                HistoryEventType eventType = event.getEventType();
                switch (eventType) {
                    case DAG_SUBMITTED: {
                        DAGSubmittedEvent submittedEvent = (DAGSubmittedEvent)event;
                        LOG.info("Recovering from event, eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
                        recoveredDAGData.recoveredDAG = this.dagAppMaster.createDAG(submittedEvent.getDAGPlan(), lastInProgressDAG);
                        recoveredDAGData.cumulativeAdditionalResources = submittedEvent.getCumulativeAdditionalLocalResources();
                        recoveredDAGData.recoveredDagID = recoveredDAGData.recoveredDAG.getID();
                        this.dagAppMaster.setCurrentDAG(recoveredDAGData.recoveredDAG);
                        if (!recoveredDAGData.nonRecoverable) break;
                        skipAllOtherEvents = true;
                        break;
                    }
                    case DAG_INITIALIZED: {
                        LOG.info("Recovering from event, eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
                        assert (recoveredDAGData.recoveredDAG != null);
                        recoveredDAGData.recoveredDAG.restoreFromEvent(event);
                        break;
                    }
                    case DAG_STARTED: {
                        LOG.info("Recovering from event, eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
                        assert (recoveredDAGData.recoveredDAG != null);
                        recoveredDAGData.recoveredDAG.restoreFromEvent(event);
                        break;
                    }
                    case DAG_COMMIT_STARTED: {
                        LOG.info("Recovering from event, eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
                        assert (recoveredDAGData.recoveredDAG != null);
                        recoveredDAGData.recoveredDAG.restoreFromEvent(event);
                        break;
                    }
                    case VERTEX_GROUP_COMMIT_STARTED: {
                        LOG.info("Recovering from event, eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
                        assert (recoveredDAGData.recoveredDAG != null);
                        recoveredDAGData.recoveredDAG.restoreFromEvent(event);
                        break;
                    }
                    case VERTEX_GROUP_COMMIT_FINISHED: {
                        LOG.info("Recovering from event, eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
                        assert (recoveredDAGData.recoveredDAG != null);
                        recoveredDAGData.recoveredDAG.restoreFromEvent(event);
                        break;
                    }
                    case DAG_KILL_REQUEST: {
                        LOG.info("Recovering from event, eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
                        break;
                    }
                    case DAG_FINISHED: {
                        LOG.info("Recovering from event, eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
                        assert (recoveredDAGData.recoveredDAG != null);
                        recoveredDAGData.recoveredDAG.restoreFromEvent(event);
                        recoveredDAGData.isCompleted = true;
                        recoveredDAGData.dagState = ((DAGFinishedEvent)event).getState();
                        skipAllOtherEvents = true;
                        break;
                    }
                    case CONTAINER_LAUNCHED: {
                        break;
                    }
                    case VERTEX_INITIALIZED: {
                        LOG.info("Recovering from event, eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
                        assert (recoveredDAGData.recoveredDAG != null);
                        HistoryEvent vEvent = (VertexInitializedEvent)event;
                        Vertex v = recoveredDAGData.recoveredDAG.getVertex(((VertexInitializedEvent)vEvent).getVertexID());
                        v.restoreFromEvent(vEvent);
                        break;
                    }
                    case VERTEX_STARTED: {
                        LOG.info("Recovering from event, eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
                        assert (recoveredDAGData.recoveredDAG != null);
                        HistoryEvent vEvent = (VertexStartedEvent)event;
                        Vertex v = recoveredDAGData.recoveredDAG.getVertex(((VertexStartedEvent)vEvent).getVertexID());
                        v.restoreFromEvent(vEvent);
                        break;
                    }
                    case VERTEX_PARALLELISM_UPDATED: {
                        LOG.info("Recovering from event, eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
                        assert (recoveredDAGData.recoveredDAG != null);
                        HistoryEvent vEvent = (VertexParallelismUpdatedEvent)event;
                        Vertex v = recoveredDAGData.recoveredDAG.getVertex(((VertexParallelismUpdatedEvent)vEvent).getVertexID());
                        v.restoreFromEvent(vEvent);
                        break;
                    }
                    case VERTEX_COMMIT_STARTED: {
                        LOG.info("Recovering from event, eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
                        assert (recoveredDAGData.recoveredDAG != null);
                        HistoryEvent vEvent = (VertexCommitStartedEvent)event;
                        Vertex v = recoveredDAGData.recoveredDAG.getVertex(((VertexCommitStartedEvent)vEvent).getVertexID());
                        v.restoreFromEvent(vEvent);
                        break;
                    }
                    case VERTEX_FINISHED: {
                        LOG.info("Recovering from event, eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
                        assert (recoveredDAGData.recoveredDAG != null);
                        HistoryEvent vEvent = (VertexFinishedEvent)event;
                        Vertex v = recoveredDAGData.recoveredDAG.getVertex(((VertexFinishedEvent)vEvent).getVertexID());
                        v.restoreFromEvent(vEvent);
                        break;
                    }
                    case TASK_STARTED: {
                        LOG.info("Recovering from event, eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
                        assert (recoveredDAGData.recoveredDAG != null);
                        HistoryEvent tEvent = (TaskStartedEvent)event;
                        Task task = recoveredDAGData.recoveredDAG.getVertex(((TaskStartedEvent)tEvent).getTaskID().getVertexID()).getTask(((TaskStartedEvent)tEvent).getTaskID());
                        task.restoreFromEvent(tEvent);
                        break;
                    }
                    case TASK_FINISHED: {
                        LOG.info("Recovering from event, eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
                        assert (recoveredDAGData.recoveredDAG != null);
                        HistoryEvent tEvent = (TaskFinishedEvent)event;
                        Task task = recoveredDAGData.recoveredDAG.getVertex(((TaskFinishedEvent)tEvent).getTaskID().getVertexID()).getTask(((TaskFinishedEvent)tEvent).getTaskID());
                        task.restoreFromEvent(tEvent);
                        break;
                    }
                    case TASK_ATTEMPT_STARTED: {
                        LOG.info("Recovering from event, eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
                        assert (recoveredDAGData.recoveredDAG != null);
                        HistoryEvent tEvent = (TaskAttemptStartedEvent)event;
                        Task task = recoveredDAGData.recoveredDAG.getVertex(((TaskAttemptStartedEvent)tEvent).getTaskAttemptID().getTaskID().getVertexID()).getTask(((TaskAttemptStartedEvent)tEvent).getTaskAttemptID().getTaskID());
                        task.restoreFromEvent(tEvent);
                        break;
                    }
                    case TASK_ATTEMPT_FINISHED: {
                        LOG.info("Recovering from event, eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
                        assert (recoveredDAGData.recoveredDAG != null);
                        HistoryEvent tEvent = (TaskAttemptFinishedEvent)event;
                        Task task = recoveredDAGData.recoveredDAG.getVertex(((TaskAttemptFinishedEvent)tEvent).getTaskAttemptID().getTaskID().getVertexID()).getTask(((TaskAttemptFinishedEvent)tEvent).getTaskAttemptID().getTaskID());
                        task.restoreFromEvent(tEvent);
                        break;
                    }
                    case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: {
                        LOG.info("Recovering from event, eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
                        assert (recoveredDAGData.recoveredDAG != null);
                        HistoryEvent vEvent = (VertexRecoverableEventsGeneratedEvent)event;
                        Vertex v = recoveredDAGData.recoveredDAG.getVertex(((VertexRecoverableEventsGeneratedEvent)vEvent).getVertexID());
                        v.restoreFromEvent(vEvent);
                        break;
                    }
                    default: {
                        throw new RuntimeException("Invalid data found, unknown event type " + (Object)((Object)eventType));
                    }
                }
                if (!LOG.isDebugEnabled()) continue;
                LOG.debug("[DAG RECOVERY] dagId=" + lastInProgressDAG + ", eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
            }
            dagRecoveryStream.close();
        }
        if (!(recoveredDAGData.isCompleted || recoveredDAGData.nonRecoverable || lastInProgressDAGData.bufferedSummaryEvents == null || lastInProgressDAGData.bufferedSummaryEvents.isEmpty())) {
            block39: for (HistoryEvent bufferedEvent : lastInProgressDAGData.bufferedSummaryEvents) {
                assert (recoveredDAGData.recoveredDAG != null);
                switch (bufferedEvent.getEventType()) {
                    case VERTEX_GROUP_COMMIT_STARTED: {
                        recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent);
                        continue block39;
                    }
                    case VERTEX_GROUP_COMMIT_FINISHED: {
                        recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent);
                        continue block39;
                    }
                    case VERTEX_FINISHED: {
                        VertexFinishedEvent vertexFinishedEvent = (VertexFinishedEvent)bufferedEvent;
                        Vertex vertex = recoveredDAGData.recoveredDAG.getVertex(vertexFinishedEvent.getVertexID());
                        if (vertex == null) {
                            recoveredDAGData.nonRecoverable = true;
                            recoveredDAGData.reason = "All state could not be recovered, vertex completed but events not flushed, vertexId=" + vertexFinishedEvent.getVertexID();
                            continue block39;
                        }
                        vertex.restoreFromEvent(vertexFinishedEvent);
                        continue block39;
                    }
                    case DAG_KILL_REQUEST: {
                        DAGKillRequestEvent killRequestEvent = (DAGKillRequestEvent)bufferedEvent;
                        recoveredDAGData.isSessionStopped = killRequestEvent.isSessionStopped();
                        recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent);
                        continue block39;
                    }
                }
                throw new RuntimeException("Invalid data found in buffered summary events, unknown event type " + (Object)((Object)bufferedEvent.getEventType()));
            }
        }
        return recoveredDAGData;
    }

    @VisibleForTesting
    static class DAGSummaryData {
        final TezDAGID dagId;
        boolean completed = false;
        boolean dagCommitCompleted = true;
        DAGState dagState;
        Map<TezVertexID, Boolean> vertexCommitStatus = new HashMap<TezVertexID, Boolean>();
        Map<String, Boolean> vertexGroupCommitStatus = new HashMap<String, Boolean>();
        List<HistoryEvent> bufferedSummaryEvents = new ArrayList<HistoryEvent>();

        DAGSummaryData(TezDAGID dagId) {
            this.dagId = dagId;
        }

        void handleSummaryEvent(RecoveryProtos.SummaryEventProto proto) throws IOException {
            HistoryEventType eventType = HistoryEventType.values()[proto.getEventType()];
            switch (eventType) {
                case DAG_SUBMITTED: {
                    this.completed = false;
                    DAGSubmittedEvent dagSubmittedEvent = new DAGSubmittedEvent();
                    dagSubmittedEvent.fromSummaryProtoStream(proto);
                    break;
                }
                case DAG_FINISHED: {
                    this.completed = true;
                    this.dagCommitCompleted = true;
                    DAGFinishedEvent dagFinishedEvent = new DAGFinishedEvent();
                    dagFinishedEvent.fromSummaryProtoStream(proto);
                    this.dagState = dagFinishedEvent.getState();
                    break;
                }
                case DAG_KILL_REQUEST: {
                    DAGKillRequestEvent killRequestEvent = new DAGKillRequestEvent();
                    killRequestEvent.fromSummaryProtoStream(proto);
                    this.bufferedSummaryEvents.add(killRequestEvent);
                    break;
                }
                case DAG_COMMIT_STARTED: {
                    this.dagCommitCompleted = false;
                    break;
                }
                case VERTEX_COMMIT_STARTED: {
                    VertexCommitStartedEvent vertexCommitStartedEvent = new VertexCommitStartedEvent();
                    vertexCommitStartedEvent.fromSummaryProtoStream(proto);
                    this.vertexCommitStatus.put(vertexCommitStartedEvent.getVertexID(), false);
                    break;
                }
                case VERTEX_FINISHED: {
                    VertexFinishedEvent vertexFinishedEvent = new VertexFinishedEvent();
                    vertexFinishedEvent.fromSummaryProtoStream(proto);
                    if (!this.vertexCommitStatus.containsKey(vertexFinishedEvent.getVertexID())) break;
                    this.vertexCommitStatus.put(vertexFinishedEvent.getVertexID(), true);
                    this.bufferedSummaryEvents.add(vertexFinishedEvent);
                    break;
                }
                case VERTEX_GROUP_COMMIT_STARTED: {
                    VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent = new VertexGroupCommitStartedEvent();
                    vertexGroupCommitStartedEvent.fromSummaryProtoStream(proto);
                    this.bufferedSummaryEvents.add(vertexGroupCommitStartedEvent);
                    this.vertexGroupCommitStatus.put(vertexGroupCommitStartedEvent.getVertexGroupName(), false);
                    break;
                }
                case VERTEX_GROUP_COMMIT_FINISHED: {
                    VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent = new VertexGroupCommitFinishedEvent();
                    vertexGroupCommitFinishedEvent.fromSummaryProtoStream(proto);
                    this.bufferedSummaryEvents.add(vertexGroupCommitFinishedEvent);
                    this.vertexGroupCommitStatus.put(vertexGroupCommitFinishedEvent.getVertexGroupName(), true);
                    break;
                }
                default: {
                    String message = "Found invalid summary event that was not handled, eventType=" + eventType.name();
                    throw new IOException(message);
                }
            }
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("dagId=").append(this.dagId);
            sb.append(", dagCompleted=").append(this.completed);
            if (!this.vertexCommitStatus.isEmpty()) {
                sb.append(", vertexCommitStatuses=[");
                for (Map.Entry<TezVertexID, Boolean> entry : this.vertexCommitStatus.entrySet()) {
                    sb.append("{ vertexId=").append(entry.getKey()).append(", committed=").append(entry.getValue()).append("}, ");
                }
                sb.append("]");
            }
            if (!this.vertexGroupCommitStatus.isEmpty()) {
                sb.append(", vertexGroupCommitStatuses=[");
                for (Map.Entry<Object, Boolean> entry : this.vertexGroupCommitStatus.entrySet()) {
                    sb.append("{ vertexGroup=").append((String)entry.getKey()).append(", committed=").append(entry.getValue()).append("}, ");
                }
                sb.append("]");
            }
            return sb.toString();
        }
    }

    public static class RecoveredDAGData {
        public TezDAGID recoveredDagID = null;
        public DAGImpl recoveredDAG = null;
        public DAGState dagState = null;
        public boolean isCompleted = false;
        public boolean nonRecoverable = false;
        public boolean isSessionStopped = false;
        public String reason = null;
        public Map<String, LocalResource> cumulativeAdditionalResources = null;
    }
}

