/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.CodedInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.common.Preconditions;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.DAGAppMaster;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.AMLaunchedEvent;
import org.apache.tez.dag.history.events.AMStartedEvent;
import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
import org.apache.tez.dag.history.events.ContainerStoppedEvent;
import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGInitializedEvent;
import org.apache.tez.dag.history.events.DAGKillRequestEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecoveryParser {
    private static final Logger LOG = LoggerFactory.getLogger(RecoveryParser.class);
    private final DAGAppMaster dagAppMaster;
    private final FileSystem recoveryFS;
    private final Path recoveryDataDir;
    private final Path currentAttemptRecoveryDataDir;
    private final int recoveryBufferSize;
    private final int currentAttemptId;

    public RecoveryParser(DAGAppMaster dagAppMaster, FileSystem recoveryFS, Path recoveryDataDir, int currentAttemptId) throws IOException {
        this.dagAppMaster = dagAppMaster;
        this.recoveryFS = recoveryFS;
        this.recoveryDataDir = recoveryDataDir;
        this.currentAttemptId = currentAttemptId;
        this.currentAttemptRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath((Path)recoveryDataDir, (int)currentAttemptId);
        this.recoveryBufferSize = dagAppMaster.getConfig().getInt("tez.dag.recovery.io.buffer.size", 8192);
        this.recoveryFS.mkdirs(this.currentAttemptRecoveryDataDir);
    }

    private static void parseSummaryFile(FSDataInputStream inputStream) throws IOException {
        while (true) {
            RecoveryProtos.SummaryEventProto proto;
            if ((proto = RecoveryProtos.SummaryEventProto.parseDelimitedFrom((InputStream)inputStream)) == null) break;
            LOG.info("[SUMMARY] dagId=" + proto.getDagId() + ", timestamp=" + proto.getTimestamp() + ", event=" + (Object)((Object)HistoryEventType.values()[proto.getEventType()]));
        }
        LOG.info("Reached end of summary stream");
    }

    private static HistoryEvent getNextEvent(CodedInputStream inputStream) throws IOException {
        HistoryEvent event;
        boolean isAtEnd = inputStream.isAtEnd();
        if (isAtEnd) {
            return null;
        }
        int eventTypeOrdinal = -1;
        try {
            eventTypeOrdinal = inputStream.readFixed32();
        }
        catch (EOFException eof) {
            return null;
        }
        if (eventTypeOrdinal < 0 || eventTypeOrdinal >= HistoryEventType.values().length) {
            throw new IOException("Corrupt data found when trying to read next event type, eventTypeOrdinal=" + eventTypeOrdinal);
        }
        HistoryEventType eventType = HistoryEventType.values()[eventTypeOrdinal];
        switch (eventType) {
            case AM_LAUNCHED: {
                event = new AMLaunchedEvent();
                break;
            }
            case AM_STARTED: {
                event = new AMStartedEvent();
                break;
            }
            case DAG_SUBMITTED: {
                event = new DAGSubmittedEvent();
                break;
            }
            case DAG_INITIALIZED: {
                event = new DAGInitializedEvent();
                break;
            }
            case DAG_STARTED: {
                event = new DAGStartedEvent();
                break;
            }
            case DAG_COMMIT_STARTED: {
                event = new DAGCommitStartedEvent();
                break;
            }
            case DAG_FINISHED: {
                event = new DAGFinishedEvent();
                break;
            }
            case DAG_KILL_REQUEST: {
                event = new DAGKillRequestEvent();
                break;
            }
            case CONTAINER_LAUNCHED: {
                event = new ContainerLaunchedEvent();
                break;
            }
            case CONTAINER_STOPPED: {
                event = new ContainerStoppedEvent();
                break;
            }
            case VERTEX_INITIALIZED: {
                event = new VertexInitializedEvent();
                break;
            }
            case VERTEX_CONFIGURE_DONE: {
                event = new VertexConfigurationDoneEvent();
                break;
            }
            case VERTEX_STARTED: {
                event = new VertexStartedEvent();
                break;
            }
            case VERTEX_COMMIT_STARTED: {
                event = new VertexCommitStartedEvent();
                break;
            }
            case VERTEX_GROUP_COMMIT_STARTED: {
                event = new VertexGroupCommitStartedEvent();
                break;
            }
            case VERTEX_GROUP_COMMIT_FINISHED: {
                event = new VertexGroupCommitFinishedEvent();
                break;
            }
            case VERTEX_FINISHED: {
                event = new VertexFinishedEvent();
                break;
            }
            case TASK_STARTED: {
                event = new TaskStartedEvent();
                break;
            }
            case TASK_FINISHED: {
                event = new TaskFinishedEvent();
                break;
            }
            case TASK_ATTEMPT_STARTED: {
                event = new TaskAttemptStartedEvent();
                break;
            }
            case TASK_ATTEMPT_FINISHED: {
                event = new TaskAttemptFinishedEvent();
                break;
            }
            default: {
                throw new IOException("Invalid data found, unknown event type " + (Object)((Object)eventType));
            }
        }
        try {
            event.fromProtoStream(inputStream);
        }
        catch (EOFException eof) {
            return null;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Parsed event from input stream, eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
        }
        return event;
    }

    public static List<HistoryEvent> parseDAGRecoveryFile(FSDataInputStream inputStream) throws IOException {
        ArrayList<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>();
        CodedInputStream codedInputStream = CodedInputStream.newInstance((InputStream)inputStream);
        codedInputStream.setSizeLimit(Integer.MAX_VALUE);
        while (true) {
            HistoryEvent historyEvent;
            if ((historyEvent = RecoveryParser.getNextEvent(codedInputStream)) == null) break;
            LOG.debug("Read HistoryEvent, eventType={}, event={}", (Object)historyEvent.getEventType(), (Object)historyEvent);
            historyEvents.add(historyEvent);
        }
        LOG.info("Reached end of stream");
        return historyEvents;
    }

    public static List<HistoryEvent> readRecoveryEvents(TezConfiguration tezConf, ApplicationId appId, int attempt) throws IOException {
        Path tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath((Configuration)tezConf, (String)appId.toString());
        Path recoveryDataDir = TezCommonUtils.getRecoveryPath((Path)tezSystemStagingDir, (Configuration)tezConf);
        FileSystem fs = tezSystemStagingDir.getFileSystem((Configuration)tezConf);
        ArrayList<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>();
        for (int i = 1; i <= attempt; ++i) {
            Path currentAttemptRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath((Path)recoveryDataDir, (int)i);
            Path recoveryFilePath = new Path(currentAttemptRecoveryDataDir, appId.toString().replace("application", "dag") + "_1" + ".recovery");
            if (!fs.exists(recoveryFilePath)) continue;
            LOG.info("Read recovery file:" + recoveryFilePath);
            try (FSDataInputStream in = null;){
                in = fs.open(recoveryFilePath);
                historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile(in));
                continue;
            }
        }
        return historyEvents;
    }

    public static void main(String[] argv) throws IOException {
        Configuration conf = new Configuration();
        String summaryPath = argv[0];
        ArrayList<String> dagPaths = new ArrayList<String>();
        if (argv.length > 1) {
            for (int i = 1; i < argv.length; ++i) {
                dagPaths.add(argv[i]);
            }
        }
        FileSystem fs = FileSystem.get((Configuration)conf);
        LOG.info("Parsing Summary file " + summaryPath);
        RecoveryParser.parseSummaryFile(fs.open(new Path(summaryPath)));
        for (String dagPath : dagPaths) {
            LOG.info("Parsing DAG recovery file " + dagPath);
            List<HistoryEvent> historyEvents = RecoveryParser.parseDAGRecoveryFile(fs.open(new Path(dagPath)));
            for (HistoryEvent historyEvent : historyEvents) {
                LOG.info("Parsed event from recovery stream, eventType=" + (Object)((Object)historyEvent.getEventType()) + ", event=" + historyEvent);
            }
        }
    }

    private Path getSummaryPath(Path attemptRrecoveryDataDir) {
        return TezCommonUtils.getSummaryRecoveryPath((Path)attemptRrecoveryDataDir);
    }

    private FSDataInputStream getSummaryStream(Path summaryPath) throws IOException {
        if (!this.recoveryFS.exists(summaryPath)) {
            return null;
        }
        return this.recoveryFS.open(summaryPath, this.recoveryBufferSize);
    }

    private Path getDAGRecoveryFilePath(Path recoveryDataDir, TezDAGID dagID) {
        return new Path(recoveryDataDir, dagID.toString() + ".recovery");
    }

    @VisibleForTesting
    DAGSummaryData getLastCompletedOrInProgressDAG(Map<TezDAGID, DAGSummaryData> dagSummaryDataMap) {
        DAGSummaryData inProgressDAG = null;
        DAGSummaryData lastCompletedDAG = null;
        for (Map.Entry<TezDAGID, DAGSummaryData> entry : dagSummaryDataMap.entrySet()) {
            if (!entry.getValue().completed) {
                if (inProgressDAG != null) {
                    throw new RuntimeException("Multiple in progress DAGs seen, dagId=" + inProgressDAG.dagId + ", dagId=" + entry.getKey());
                }
                inProgressDAG = entry.getValue();
                continue;
            }
            if (lastCompletedDAG != null && lastCompletedDAG.dagId.getId() >= entry.getValue().dagId.getId()) continue;
            lastCompletedDAG = entry.getValue();
        }
        if (inProgressDAG == null) {
            return lastCompletedDAG;
        }
        return inProgressDAG;
    }

    private List<Path> getSummaryFiles() throws IOException {
        ArrayList<Path> summaryFiles = new ArrayList<Path>();
        for (int i = 1; i < this.currentAttemptId; ++i) {
            Path attemptPath = TezCommonUtils.getAttemptRecoveryPath((Path)this.recoveryDataDir, (int)i);
            Path fatalErrorOccurred = new Path(attemptPath, "RecoveryFatalErrorOccurred");
            if (this.recoveryFS.exists(fatalErrorOccurred)) {
                throw new IOException("Found that a fatal error occurred in recovery during previous attempt, foundFile=" + fatalErrorOccurred.toString());
            }
            Path summaryFile = this.getSummaryPath(attemptPath);
            if (!this.recoveryFS.exists(summaryFile)) continue;
            summaryFiles.add(summaryFile);
        }
        return summaryFiles;
    }

    private List<Path> getDAGRecoveryFiles(TezDAGID dagId) throws IOException {
        ArrayList<Path> recoveryFiles = new ArrayList<Path>();
        for (int i = 1; i < this.currentAttemptId; ++i) {
            Path attemptPath = TezCommonUtils.getAttemptRecoveryPath((Path)this.recoveryDataDir, (int)i);
            Path recoveryFile = this.getDAGRecoveryFilePath(attemptPath, dagId);
            if (!this.recoveryFS.exists(recoveryFile)) continue;
            recoveryFiles.add(recoveryFile);
        }
        return recoveryFiles;
    }

    public DAGRecoveryData parseRecoveryData() throws IOException {
        int dagCounter = 0;
        HashMap<TezDAGID, DAGSummaryData> dagSummaryDataMap = new HashMap<TezDAGID, DAGSummaryData>();
        List<Path> summaryFiles = this.getSummaryFiles();
        LOG.debug("SummaryFile size:" + summaryFiles.size());
        for (Path summaryFile : summaryFiles) {
            FileStatus summaryFileStatus = this.recoveryFS.getFileStatus(summaryFile);
            LOG.info("Parsing summary file, path=" + summaryFile.toString() + ", len=" + summaryFileStatus.getLen() + ", lastModTime=" + summaryFileStatus.getModificationTime());
            FSDataInputStream summaryStream = this.getSummaryStream(summaryFile);
            while (true) {
                TezDAGID dagId;
                RecoveryProtos.SummaryEventProto proto;
                block39: {
                    try {
                        proto = RecoveryProtos.SummaryEventProto.parseDelimitedFrom((InputStream)summaryStream);
                        if (proto == null) {
                            LOG.info("Reached end of summary stream");
                        }
                        break block39;
                    }
                    catch (EOFException eof) {
                        LOG.info("Reached end of summary stream");
                    }
                    break;
                }
                HistoryEventType eventType = HistoryEventType.values()[proto.getEventType()];
                if (LOG.isDebugEnabled()) {
                    LOG.debug("[RECOVERY SUMMARY] dagId=" + proto.getDagId() + ", timestamp=" + proto.getTimestamp() + ", event=" + (Object)((Object)eventType));
                }
                try {
                    dagId = TezDAGID.fromString((String)proto.getDagId());
                }
                catch (IllegalArgumentException e) {
                    throw new IOException("Invalid dagId, summary records may be corrupted", e);
                }
                if (dagCounter < dagId.getId()) {
                    dagCounter = dagId.getId();
                }
                if (!dagSummaryDataMap.containsKey(dagId)) {
                    dagSummaryDataMap.put(dagId, new DAGSummaryData(dagId));
                }
                try {
                    ((DAGSummaryData)dagSummaryDataMap.get(dagId)).handleSummaryEvent(proto);
                }
                catch (Exception e) {
                    throw new IOException("Error when parsing summary event proto", e);
                }
            }
            summaryStream.close();
        }
        this.dagAppMaster.setDAGCounter(dagCounter);
        for (DAGSummaryData dagSummaryData : dagSummaryDataMap.values()) {
            this.dagAppMaster.dagIDs.add(dagSummaryData.dagId.toString());
        }
        DAGSummaryData lastInProgressDAGData = this.getLastCompletedOrInProgressDAG(dagSummaryDataMap);
        if (lastInProgressDAGData == null) {
            LOG.info("Nothing to recover as no uncompleted/completed DAGs found");
            return null;
        }
        TezDAGID lastInProgressDAG = lastInProgressDAGData.dagId;
        if (lastInProgressDAG == null) {
            LOG.info("Nothing to recover as no uncompleted/completed DAGs found");
            return null;
        }
        LOG.info("Checking if DAG is in recoverable state, dagId=" + lastInProgressDAGData.dagId);
        DAGRecoveryData recoveredDAGData = new DAGRecoveryData(lastInProgressDAGData);
        List<Path> dagRecoveryFiles = this.getDAGRecoveryFiles(lastInProgressDAG);
        boolean skipAllOtherEvents = false;
        Path lastRecoveryFile = null;
        for (Path dagRecoveryFile : dagRecoveryFiles) {
            if (skipAllOtherEvents) {
                LOG.warn("Other recovery files will be skipped due to error in the previous recovery file" + lastRecoveryFile);
                break;
            }
            FileStatus fileStatus = this.recoveryFS.getFileStatus(dagRecoveryFile);
            lastRecoveryFile = dagRecoveryFile;
            LOG.info("Trying to recover dag from recovery file, dagId=" + lastInProgressDAG.toString() + ", dagRecoveryFile=" + dagRecoveryFile + ", len=" + fileStatus.getLen());
            FSDataInputStream dagRecoveryStream = this.recoveryFS.open(dagRecoveryFile, this.recoveryBufferSize);
            CodedInputStream codedInputStream = CodedInputStream.newInstance((InputStream)dagRecoveryStream);
            codedInputStream.setSizeLimit(Integer.MAX_VALUE);
            while (true) {
                HistoryEvent event;
                block40: {
                    try {
                        event = RecoveryParser.getNextEvent(codedInputStream);
                        if (event == null) {
                            LOG.info("Reached end of dag recovery stream");
                        }
                        break block40;
                    }
                    catch (EOFException eof) {
                        LOG.info("Reached end of dag recovery stream");
                    }
                    catch (IOException ioe) {
                        LOG.warn("Corrupt data found when trying to read next event", (Throwable)ioe);
                    }
                    break;
                }
                if (skipAllOtherEvents) break;
                HistoryEventType eventType = event.getEventType();
                LOG.info("Recovering from event, eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
                switch (eventType) {
                    case DAG_SUBMITTED: {
                        DAGSubmittedEvent submittedEvent = (DAGSubmittedEvent)event;
                        recoveredDAGData.recoveredDAG = this.dagAppMaster.createDAG(submittedEvent.getDAGPlan(), lastInProgressDAG);
                        recoveredDAGData.cumulativeAdditionalResources = submittedEvent.getCumulativeAdditionalLocalResources();
                        recoveredDAGData.recoveredDagID = recoveredDAGData.recoveredDAG.getID();
                        this.dagAppMaster.setCurrentDAG(recoveredDAGData.recoveredDAG);
                        if (!recoveredDAGData.nonRecoverable) break;
                        skipAllOtherEvents = true;
                        break;
                    }
                    case DAG_INITIALIZED: {
                        recoveredDAGData.dagInitedEvent = (DAGInitializedEvent)event;
                        break;
                    }
                    case DAG_STARTED: {
                        recoveredDAGData.dagStartedEvent = (DAGStartedEvent)event;
                        break;
                    }
                    case DAG_FINISHED: {
                        recoveredDAGData.dagFinishedEvent = (DAGFinishedEvent)event;
                        skipAllOtherEvents = true;
                        break;
                    }
                    case DAG_COMMIT_STARTED: 
                    case CONTAINER_LAUNCHED: 
                    case VERTEX_GROUP_COMMIT_STARTED: 
                    case VERTEX_GROUP_COMMIT_FINISHED: {
                        break;
                    }
                    case DAG_KILL_REQUEST: {
                        break;
                    }
                    case VERTEX_INITIALIZED: {
                        VertexInitializedEvent vertexInitEvent = (VertexInitializedEvent)event;
                        VertexRecoveryData vertexRecoveryData = recoveredDAGData.maybeCreateVertexRecoveryData(vertexInitEvent.getVertexID());
                        vertexRecoveryData.vertexInitedEvent = vertexInitEvent;
                        break;
                    }
                    case VERTEX_CONFIGURE_DONE: {
                        VertexConfigurationDoneEvent reconfigureDoneEvent = (VertexConfigurationDoneEvent)event;
                        VertexRecoveryData vertexRecoveryData = recoveredDAGData.maybeCreateVertexRecoveryData(reconfigureDoneEvent.getVertexID());
                        vertexRecoveryData.vertexConfigurationDoneEvent = reconfigureDoneEvent;
                        break;
                    }
                    case VERTEX_STARTED: {
                        VertexStartedEvent vertexStartedEvent = (VertexStartedEvent)event;
                        VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(vertexStartedEvent.getVertexID());
                        Preconditions.checkArgument((vertexRecoveryData != null ? 1 : 0) != 0, (Object)"No VertexInitializedEvent before VertexStartedEvent");
                        vertexRecoveryData.vertexStartedEvent = vertexStartedEvent;
                        break;
                    }
                    case VERTEX_COMMIT_STARTED: {
                        break;
                    }
                    case VERTEX_FINISHED: {
                        VertexFinishedEvent vertexFinishedEvent = (VertexFinishedEvent)event;
                        VertexRecoveryData vertexRecoveryData = recoveredDAGData.maybeCreateVertexRecoveryData(vertexFinishedEvent.getVertexID());
                        vertexRecoveryData.vertexFinishedEvent = vertexFinishedEvent;
                        break;
                    }
                    case TASK_STARTED: {
                        TaskStartedEvent taskStartedEvent = (TaskStartedEvent)event;
                        VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskStartedEvent.getTaskID().getVertexID());
                        Preconditions.checkArgument((vertexRecoveryData != null ? 1 : 0) != 0, (Object)("Invalid TaskStartedEvent, its vertex does not exist:" + taskStartedEvent.getTaskID().getVertexID()));
                        TaskRecoveryData taskRecoveryData = vertexRecoveryData.maybeCreateTaskRecoveryData(taskStartedEvent.getTaskID());
                        taskRecoveryData.taskStartedEvent = taskStartedEvent;
                        break;
                    }
                    case TASK_FINISHED: {
                        TaskFinishedEvent taskFinishedEvent = (TaskFinishedEvent)event;
                        VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskFinishedEvent.getTaskID().getVertexID());
                        Preconditions.checkArgument((vertexRecoveryData != null ? 1 : 0) != 0, (Object)("Invalid TaskFinishedEvent, its vertex does not exist:" + taskFinishedEvent.getTaskID().getVertexID()));
                        TaskRecoveryData taskRecoveryData = vertexRecoveryData.maybeCreateTaskRecoveryData(taskFinishedEvent.getTaskID());
                        taskRecoveryData.taskFinishedEvent = taskFinishedEvent;
                        break;
                    }
                    case TASK_ATTEMPT_STARTED: {
                        TaskAttemptStartedEvent taStartedEvent = (TaskAttemptStartedEvent)event;
                        VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taStartedEvent.getTaskAttemptID().getTaskID().getVertexID());
                        Preconditions.checkArgument((vertexRecoveryData != null ? 1 : 0) != 0, (Object)("Invalid TaskAttemptStartedEvent, its vertexId does not exist, taId=" + taStartedEvent.getTaskAttemptID()));
                        TaskRecoveryData taskRecoveryData = (TaskRecoveryData)vertexRecoveryData.taskRecoveryDataMap.get(taStartedEvent.getTaskAttemptID().getTaskID());
                        Preconditions.checkArgument((taskRecoveryData != null ? 1 : 0) != 0, (Object)("Invalid TaskAttemptStartedEvent, its taskId does not exist, taId=" + taStartedEvent.getTaskAttemptID()));
                        TaskAttemptRecoveryData taRecoveryData = taskRecoveryData.maybeCreateTaskAttemptRecoveryData(taStartedEvent.getTaskAttemptID());
                        taRecoveryData.taStartedEvent = taStartedEvent;
                        break;
                    }
                    case TASK_ATTEMPT_FINISHED: {
                        TaskAttemptFinishedEvent taFinishedEvent = (TaskAttemptFinishedEvent)event;
                        VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taFinishedEvent.getTaskAttemptID().getTaskID().getVertexID());
                        Preconditions.checkArgument((vertexRecoveryData != null ? 1 : 0) != 0, (Object)("Invalid TaskAttemtFinishedEvent, its vertexId does not exist, taId=" + taFinishedEvent.getTaskAttemptID()));
                        TaskRecoveryData taskRecoveryData = (TaskRecoveryData)vertexRecoveryData.taskRecoveryDataMap.get(taFinishedEvent.getTaskAttemptID().getTaskID());
                        Preconditions.checkArgument((taskRecoveryData != null ? 1 : 0) != 0, (Object)("Invalid TaskAttemptFinishedEvent, its taskId does not exist, taId=" + taFinishedEvent.getTaskAttemptID()));
                        TaskAttemptRecoveryData taRecoveryData = taskRecoveryData.maybeCreateTaskAttemptRecoveryData(taFinishedEvent.getTaskAttemptID());
                        taRecoveryData.taFinishedEvent = taFinishedEvent;
                        break;
                    }
                    default: {
                        throw new RuntimeException("Invalid data found, unknown event type " + (Object)((Object)eventType));
                    }
                }
                if (!LOG.isDebugEnabled()) continue;
                LOG.debug("[DAG RECOVERY] dagId=" + lastInProgressDAG + ", eventType=" + (Object)((Object)eventType) + ", event=" + event.toString());
            }
            dagRecoveryStream.close();
        }
        recoveredDAGData.checkRecoverableNonSummary();
        return recoveredDAGData;
    }

    public static class TaskAttemptRecoveryData {
        private TaskAttemptStartedEvent taStartedEvent;
        private TaskAttemptFinishedEvent taFinishedEvent;

        public TaskAttemptRecoveryData() {
        }

        @VisibleForTesting
        public TaskAttemptRecoveryData(TaskAttemptStartedEvent taStartedEvent, TaskAttemptFinishedEvent taFinishedEvent) {
            this.taStartedEvent = taStartedEvent;
            this.taFinishedEvent = taFinishedEvent;
        }

        public TaskAttemptStartedEvent getTaskAttemptStartedEvent() {
            return this.taStartedEvent;
        }

        public TaskAttemptFinishedEvent getTaskAttemptFinishedEvent() {
            return this.taFinishedEvent;
        }

        public boolean isTaskAttemptSucceeded() {
            TaskAttemptFinishedEvent taFinishedEvent = this.getTaskAttemptFinishedEvent();
            return taFinishedEvent == null ? false : taFinishedEvent.getState() == TaskAttemptState.SUCCEEDED;
        }
    }

    public static class TaskRecoveryData {
        private TaskStartedEvent taskStartedEvent;
        private TaskFinishedEvent taskFinishedEvent;
        private Map<TezTaskAttemptID, TaskAttemptRecoveryData> taRecoveryDataMap = new HashMap<TezTaskAttemptID, TaskAttemptRecoveryData>();

        public TaskRecoveryData() {
        }

        @VisibleForTesting
        public TaskRecoveryData(TaskStartedEvent taskStartedEvent, TaskFinishedEvent taskFinishedEvent, Map<TezTaskAttemptID, TaskAttemptRecoveryData> taRecoveryDataMap) {
            this.taskStartedEvent = taskStartedEvent;
            this.taskFinishedEvent = taskFinishedEvent;
            this.taRecoveryDataMap = taRecoveryDataMap;
        }

        public TaskStartedEvent getTaskStartedEvent() {
            return this.taskStartedEvent;
        }

        public TaskFinishedEvent getTaskFinishedEvent() {
            return this.taskFinishedEvent;
        }

        public boolean isTaskStarted() {
            return this.getTaskStartedEvent() != null;
        }

        public boolean isTaskAttemptSucceeded(TezTaskAttemptID taId) {
            TaskAttemptRecoveryData taRecoveryData = this.taRecoveryDataMap.get(taId);
            return taRecoveryData == null ? false : taRecoveryData.isTaskAttemptSucceeded();
        }

        public TaskAttemptRecoveryData maybeCreateTaskAttemptRecoveryData(TezTaskAttemptID taId) {
            TaskAttemptRecoveryData taRecoveryData = this.taRecoveryDataMap.get(taId);
            if (taRecoveryData == null) {
                taRecoveryData = new TaskAttemptRecoveryData();
                this.taRecoveryDataMap.put(taId, taRecoveryData);
            }
            return taRecoveryData;
        }
    }

    public static class VertexRecoveryData {
        private VertexInitializedEvent vertexInitedEvent;
        private VertexConfigurationDoneEvent vertexConfigurationDoneEvent;
        private VertexStartedEvent vertexStartedEvent;
        private VertexFinishedEvent vertexFinishedEvent;
        private Map<TezTaskID, TaskRecoveryData> taskRecoveryDataMap = new HashMap<TezTaskID, TaskRecoveryData>();
        private boolean commited;

        @VisibleForTesting
        public VertexRecoveryData(VertexInitializedEvent vertexInitedEvent, VertexConfigurationDoneEvent vertexReconfigureDoneEvent, VertexStartedEvent vertexStartedEvent, VertexFinishedEvent vertexFinishedEvent, Map<TezTaskID, TaskRecoveryData> taskRecoveryDataMap, boolean commited) {
            this.vertexInitedEvent = vertexInitedEvent;
            this.vertexConfigurationDoneEvent = vertexReconfigureDoneEvent;
            this.vertexStartedEvent = vertexStartedEvent;
            this.vertexFinishedEvent = vertexFinishedEvent;
            this.taskRecoveryDataMap = taskRecoveryDataMap;
            this.commited = commited;
        }

        public VertexRecoveryData(boolean committed) {
            this.commited = committed;
        }

        public VertexInitializedEvent getVertexInitedEvent() {
            return this.vertexInitedEvent;
        }

        public VertexStartedEvent getVertexStartedEvent() {
            return this.vertexStartedEvent;
        }

        public VertexFinishedEvent getVertexFinishedEvent() {
            return this.vertexFinishedEvent;
        }

        public VertexConfigurationDoneEvent getVertexConfigurationDoneEvent() {
            return this.vertexConfigurationDoneEvent;
        }

        public boolean isReconfigureDone() {
            return this.vertexConfigurationDoneEvent != null;
        }

        public boolean isVertexInited() {
            return this.vertexInitedEvent != null;
        }

        public boolean shouldSkipInit() {
            return this.vertexInitedEvent != null && this.vertexConfigurationDoneEvent != null;
        }

        public boolean isVertexTasksStarted() {
            return this.taskRecoveryDataMap != null && !this.taskRecoveryDataMap.isEmpty();
        }

        public boolean isVertexStarted() {
            return this.vertexStartedEvent != null;
        }

        public boolean isVertexSucceeded() {
            if (this.vertexFinishedEvent == null) {
                return false;
            }
            return this.vertexFinishedEvent.getState().equals((Object)VertexState.SUCCEEDED);
        }

        public boolean isVertexFinished() {
            return this.vertexFinishedEvent != null;
        }

        public boolean isVertexCommitted() {
            return this.commited;
        }

        public TaskRecoveryData getTaskRecoveryData(TezTaskID taskId) {
            return this.taskRecoveryDataMap.get(taskId);
        }

        public TaskRecoveryData maybeCreateTaskRecoveryData(TezTaskID taskId) {
            TaskRecoveryData taskRecoveryData = this.taskRecoveryDataMap.get(taskId);
            if (taskRecoveryData == null) {
                taskRecoveryData = new TaskRecoveryData();
                this.taskRecoveryDataMap.put(taskId, taskRecoveryData);
            }
            return taskRecoveryData;
        }

        public String toString() {
            StringBuilder builder = new StringBuilder();
            builder.append("VertexInitedEvent=" + this.vertexInitedEvent);
            builder.append("");
            return builder.toString();
        }
    }

    @VisibleForTesting
    static class DAGSummaryData {
        final TezDAGID dagId;
        boolean completed = false;
        boolean dagCommitCompleted = true;
        boolean nonRecoverable = false;
        String reason;
        DAGState dagState;
        public Map<TezVertexID, Boolean> vertexCommitStatus = new HashMap<TezVertexID, Boolean>();
        public Map<String, Boolean> vertexGroupCommitStatus = new HashMap<String, Boolean>();
        public Map<TezVertexID, Boolean> vertexGroupMemberCommitStatus = new HashMap<TezVertexID, Boolean>();

        DAGSummaryData(TezDAGID dagId) {
            this.dagId = dagId;
        }

        void handleSummaryEvent(RecoveryProtos.SummaryEventProto proto) throws IOException {
            HistoryEventType eventType = HistoryEventType.values()[proto.getEventType()];
            switch (eventType) {
                case DAG_SUBMITTED: {
                    this.completed = false;
                    DAGSubmittedEvent dagSubmittedEvent = new DAGSubmittedEvent();
                    dagSubmittedEvent.fromSummaryProtoStream(proto);
                    break;
                }
                case DAG_FINISHED: {
                    this.completed = true;
                    this.dagCommitCompleted = true;
                    DAGFinishedEvent dagFinishedEvent = new DAGFinishedEvent();
                    dagFinishedEvent.fromSummaryProtoStream(proto);
                    this.dagState = dagFinishedEvent.getState();
                    break;
                }
                case DAG_KILL_REQUEST: {
                    DAGKillRequestEvent killRequestEvent = new DAGKillRequestEvent();
                    killRequestEvent.fromSummaryProtoStream(proto);
                    break;
                }
                case DAG_COMMIT_STARTED: {
                    this.dagCommitCompleted = false;
                    break;
                }
                case VERTEX_COMMIT_STARTED: {
                    VertexCommitStartedEvent vertexCommitStartedEvent = new VertexCommitStartedEvent();
                    vertexCommitStartedEvent.fromSummaryProtoStream(proto);
                    this.vertexCommitStatus.put(vertexCommitStartedEvent.getVertexID(), false);
                    break;
                }
                case VERTEX_FINISHED: {
                    VertexFinishedEvent vertexFinishedEvent = new VertexFinishedEvent();
                    vertexFinishedEvent.fromSummaryProtoStream(proto);
                    if (!this.vertexCommitStatus.containsKey(vertexFinishedEvent.getVertexID())) break;
                    this.vertexCommitStatus.put(vertexFinishedEvent.getVertexID(), true);
                    break;
                }
                case VERTEX_GROUP_COMMIT_STARTED: {
                    VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent = new VertexGroupCommitStartedEvent();
                    vertexGroupCommitStartedEvent.fromSummaryProtoStream(proto);
                    this.vertexGroupCommitStatus.put(vertexGroupCommitStartedEvent.getVertexGroupName(), false);
                    for (TezVertexID member : vertexGroupCommitStartedEvent.getVertexIds()) {
                        this.vertexGroupMemberCommitStatus.put(member, false);
                    }
                    break;
                }
                case VERTEX_GROUP_COMMIT_FINISHED: {
                    VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent = new VertexGroupCommitFinishedEvent();
                    vertexGroupCommitFinishedEvent.fromSummaryProtoStream(proto);
                    this.vertexGroupCommitStatus.put(vertexGroupCommitFinishedEvent.getVertexGroupName(), true);
                    for (TezVertexID member : vertexGroupCommitFinishedEvent.getVertexIds()) {
                        this.vertexGroupMemberCommitStatus.put(member, true);
                    }
                    break;
                }
                default: {
                    String message = "Found invalid summary event that was not handled, eventType=" + eventType.name();
                    throw new IOException(message);
                }
            }
        }

        private void checkRecoverableSummary() {
            if (!this.dagCommitCompleted) {
                this.nonRecoverable = true;
                this.reason = "DAG Commit was in progress, not recoverable, dagId=" + this.dagId;
            }
            if (!this.vertexCommitStatus.isEmpty()) {
                for (Map.Entry<TezVertexID, Boolean> entry : this.vertexCommitStatus.entrySet()) {
                    if (entry.getValue().booleanValue()) continue;
                    this.nonRecoverable = true;
                    this.reason = "Vertex Commit was in progress, not recoverable, dagId=" + this.dagId + ", vertexId=" + entry.getKey();
                }
            }
            if (!this.vertexGroupCommitStatus.isEmpty()) {
                for (Map.Entry<Object, Boolean> entry : this.vertexGroupCommitStatus.entrySet()) {
                    if (entry.getValue().booleanValue()) continue;
                    this.nonRecoverable = true;
                    this.reason = "Vertex Group Commit was in progress, not recoverable, dagId=" + this.dagId + ", vertexGroup=" + (String)entry.getKey();
                }
            }
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("dagId=").append(this.dagId);
            sb.append(", dagCompleted=").append(this.completed);
            if (!this.vertexCommitStatus.isEmpty()) {
                sb.append(", vertexCommitStatuses=[");
                for (Map.Entry<TezVertexID, Boolean> entry : this.vertexCommitStatus.entrySet()) {
                    sb.append("{ vertexId=").append(entry.getKey()).append(", committed=").append(entry.getValue()).append("}, ");
                }
                sb.append("]");
            }
            if (!this.vertexGroupCommitStatus.isEmpty()) {
                sb.append(", vertexGroupCommitStatuses=[");
                for (Map.Entry<Object, Boolean> entry : this.vertexGroupCommitStatus.entrySet()) {
                    sb.append("{ vertexGroup=").append((String)entry.getKey()).append(", committed=").append(entry.getValue()).append("}, ");
                }
                sb.append("]");
            }
            return sb.toString();
        }
    }

    public static class DAGRecoveryData {
        public TezDAGID recoveredDagID = null;
        public DAGImpl recoveredDAG = null;
        public DAGState dagState = null;
        public boolean isCompleted = false;
        public boolean nonRecoverable = false;
        public boolean isSessionStopped = false;
        public String reason = null;
        public Map<String, LocalResource> cumulativeAdditionalResources = null;
        public List<URL> additionalUrlsForClasspath = null;
        public Map<TezVertexID, VertexRecoveryData> vertexRecoveryDataMap = new HashMap<TezVertexID, VertexRecoveryData>();
        private DAGInitializedEvent dagInitedEvent;
        private DAGStartedEvent dagStartedEvent;
        private DAGFinishedEvent dagFinishedEvent;
        private Map<TezVertexID, Boolean> vertexCommitStatus = new HashMap<TezVertexID, Boolean>();
        private Map<String, Boolean> vertexGroupCommitStatus = new HashMap<String, Boolean>();
        private Map<TezVertexID, Boolean> vertexGroupMemberCommitStatus = new HashMap<TezVertexID, Boolean>();

        public DAGRecoveryData(DAGSummaryData dagSummaryData) {
            if (dagSummaryData.completed) {
                this.isCompleted = true;
                this.dagState = dagSummaryData.dagState;
            }
            dagSummaryData.checkRecoverableSummary();
            this.nonRecoverable = dagSummaryData.nonRecoverable;
            this.reason = dagSummaryData.reason;
            this.vertexCommitStatus = dagSummaryData.vertexCommitStatus;
            this.vertexGroupCommitStatus = dagSummaryData.vertexGroupCommitStatus;
            this.vertexGroupMemberCommitStatus = dagSummaryData.vertexGroupMemberCommitStatus;
        }

        public void checkRecoverableNonSummary() {
            boolean commitFinished;
            TezVertexID vertexId;
            if (this.isCompleted) {
                return;
            }
            for (Map.Entry<TezVertexID, Boolean> entry : this.vertexCommitStatus.entrySet()) {
                vertexId = entry.getKey();
                commitFinished = entry.getValue();
                if (!commitFinished || this.vertexRecoveryDataMap.containsKey(vertexId) && this.vertexRecoveryDataMap.get(vertexId).getVertexFinishedEvent() != null) continue;
                this.nonRecoverable = true;
                this.reason = "Vertex has been committed, but its full recovery events are not seen, vertexId=" + vertexId;
                return;
            }
            for (Map.Entry<TezVertexID, Boolean> entry : this.vertexGroupMemberCommitStatus.entrySet()) {
                vertexId = entry.getKey();
                commitFinished = entry.getValue();
                if (!commitFinished || this.vertexRecoveryDataMap.containsKey(vertexId) && this.vertexRecoveryDataMap.get(vertexId).getVertexFinishedEvent() != null) continue;
                this.nonRecoverable = true;
                this.reason = "Vertex has been committed as member of vertex group, but its full recovery events are not seen, vertexId=" + vertexId;
                return;
            }
        }

        public DAGInitializedEvent getDAGInitializedEvent() {
            return this.dagInitedEvent;
        }

        public DAGStartedEvent getDAGStartedEvent() {
            return this.dagStartedEvent;
        }

        public DAGFinishedEvent getDAGFinishedEvent() {
            return this.dagFinishedEvent;
        }

        public boolean isVertexGroupCommitted(String groupName) {
            return this.vertexGroupCommitStatus.containsKey(groupName) && this.vertexGroupCommitStatus.get(groupName) != false;
        }

        public VertexRecoveryData getVertexRecoveryData(TezVertexID vertexId) {
            return this.vertexRecoveryDataMap.get(vertexId);
        }

        public TaskRecoveryData getTaskRecoveryData(TezTaskID taskId) {
            VertexRecoveryData vertexRecoveryData = this.getVertexRecoveryData(taskId.getVertexID());
            if (vertexRecoveryData != null) {
                return (TaskRecoveryData)vertexRecoveryData.taskRecoveryDataMap.get(taskId);
            }
            return null;
        }

        public TaskAttemptRecoveryData getTaskAttemptRecoveryData(TezTaskAttemptID taId) {
            TaskRecoveryData taskRecoveryData = this.getTaskRecoveryData(taId.getTaskID());
            if (taskRecoveryData != null) {
                return (TaskAttemptRecoveryData)taskRecoveryData.taRecoveryDataMap.get(taId);
            }
            return null;
        }

        public VertexRecoveryData maybeCreateVertexRecoveryData(TezVertexID vertexId) {
            VertexRecoveryData vRecoveryData = this.vertexRecoveryDataMap.get(vertexId);
            if (vRecoveryData == null) {
                vRecoveryData = new VertexRecoveryData(this.vertexCommitStatus.containsKey(vertexId) ? this.vertexCommitStatus.get(vertexId) : false);
                this.vertexRecoveryDataMap.put(vertexId, vRecoveryData);
            }
            return vRecoveryData;
        }
    }
}

