/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.job;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.job.AbstractJobVertexHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.SubtasksTimesInfo;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

public class SubtasksTimesHandler
extends AbstractJobVertexHandler<SubtasksTimesInfo, JobVertexMessageParameters>
implements OnlyExecutionGraphJsonArchivist {
    public SubtasksTimesHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, SubtasksTimesInfo, JobVertexMessageParameters> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor) {
        super(leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
    }

    @Override
    protected SubtasksTimesInfo handleRequest(HandlerRequest<EmptyRequestBody> request, AccessExecutionJobVertex jobVertex) {
        return SubtasksTimesHandler.createSubtaskTimesInfo(jobVertex);
    }

    @Override
    public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
        Collection<? extends AccessExecutionJobVertex> allVertices = graph.getAllVertices().values();
        ArrayList<ArchivedJson> archive = new ArrayList<ArchivedJson>(allVertices.size());
        for (AccessExecutionJobVertex accessExecutionJobVertex : allVertices) {
            SubtasksTimesInfo json = SubtasksTimesHandler.createSubtaskTimesInfo(accessExecutionJobVertex);
            String path = this.getMessageHeaders().getTargetRestEndpointURL().replace(":jobid", graph.getJobID().toString()).replace(":vertexid", accessExecutionJobVertex.getJobVertexId().toString());
            archive.add(new ArchivedJson(path, json));
        }
        return archive;
    }

    private static SubtasksTimesInfo createSubtaskTimesInfo(AccessExecutionJobVertex jobVertex) {
        String id = jobVertex.getJobVertexId().toString();
        String name = jobVertex.getName();
        long now = System.currentTimeMillis();
        ArrayList<SubtasksTimesInfo.SubtaskTimeInfo> subtasks = new ArrayList<SubtasksTimesInfo.SubtaskTimeInfo>();
        int num = 0;
        for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
            long[] timestamps = vertex.getCurrentExecutionAttempt().getStateTimestamps();
            ExecutionState status = vertex.getExecutionState();
            long scheduledTime = timestamps[ExecutionState.SCHEDULED.ordinal()];
            long start = scheduledTime > 0L ? scheduledTime : -1L;
            long end = status.isTerminal() ? timestamps[status.ordinal()] : now;
            long duration = start >= 0L ? end - start : -1L;
            TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
            String locationString = location == null ? "(unassigned)" : location.getHostname();
            HashMap<ExecutionState, Long> timestampMap = new HashMap<ExecutionState, Long>(ExecutionState.values().length);
            for (ExecutionState state : ExecutionState.values()) {
                timestampMap.put(state, timestamps[state.ordinal()]);
            }
            subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(num++, locationString, duration, timestampMap));
        }
        return new SubtasksTimesInfo(id, name, now, subtasks);
    }
}

