/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.job;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.job.AbstractJobVertexHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.FlameGraphTypeQueryParameter;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexFlameGraphHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexFlameGraphParameters;
import org.apache.flink.runtime.rest.messages.SubtaskIndexQueryParameter;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.stats.JobVertexStatsTracker;
import org.apache.flink.runtime.webmonitor.threadinfo.VertexFlameGraph;
import org.apache.flink.runtime.webmonitor.threadinfo.VertexFlameGraphFactory;
import org.apache.flink.runtime.webmonitor.threadinfo.VertexThreadInfoStats;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

public class JobVertexFlameGraphHandler
extends AbstractJobVertexHandler<VertexFlameGraph, JobVertexFlameGraphParameters> {
    private final JobVertexStatsTracker<VertexThreadInfoStats> threadInfoOperatorTracker;

    public JobVertexFlameGraphHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, ExecutionGraphCache executionGraphCache, Executor executor, JobVertexStatsTracker<VertexThreadInfoStats> threadInfoOperatorTracker) {
        super(leaderRetriever, timeout, responseHeaders, JobVertexFlameGraphHeaders.getInstance(), executionGraphCache, executor);
        this.threadInfoOperatorTracker = threadInfoOperatorTracker;
    }

    @Override
    protected VertexFlameGraph handleRequest(HandlerRequest<EmptyRequestBody> request, AccessExecutionJobVertex jobVertex) throws RestHandlerException {
        Optional<VertexFlameGraph> operatorFlameGraph;
        Integer subtaskIndex = JobVertexFlameGraphHandler.getSubtaskIndex(request, jobVertex);
        if (this.isTerminated(jobVertex, subtaskIndex)) {
            return VertexFlameGraph.terminated();
        }
        Optional<VertexThreadInfoStats> threadInfoSample = this.threadInfoOperatorTracker.getVertexStats((JobID)request.getPathParameter(JobIDPathParameter.class), jobVertex);
        if (subtaskIndex != null) {
            threadInfoSample = threadInfoSample.map(this.generateThreadInfoStatsForSubtask(subtaskIndex));
        }
        FlameGraphTypeQueryParameter.Type flameGraphType = JobVertexFlameGraphHandler.getFlameGraphType(request);
        switch (flameGraphType) {
            case FULL: {
                operatorFlameGraph = threadInfoSample.map(VertexFlameGraphFactory::createFullFlameGraphFrom);
                break;
            }
            case ON_CPU: {
                operatorFlameGraph = threadInfoSample.map(VertexFlameGraphFactory::createOnCpuFlameGraph);
                break;
            }
            case OFF_CPU: {
                operatorFlameGraph = threadInfoSample.map(VertexFlameGraphFactory::createOffCpuFlameGraph);
                break;
            }
            default: {
                throw new RestHandlerException("Unknown Flame Graph type " + (Object)((Object)flameGraphType) + '.', HttpResponseStatus.BAD_REQUEST);
            }
        }
        return operatorFlameGraph.orElse(VertexFlameGraph.waiting());
    }

    private Function<VertexThreadInfoStats, VertexThreadInfoStats> generateThreadInfoStatsForSubtask(Integer subtaskIndex) {
        return stats -> new VertexThreadInfoStats(stats.getRequestId(), stats.getStartTime(), stats.getEndTime(), stats.getSamplesBySubtask().entrySet().stream().filter(entry -> ((ExecutionAttemptID)entry.getKey()).getSubtaskIndex() == subtaskIndex.intValue()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
    }

    private boolean isTerminated(AccessExecutionJobVertex jobVertex, @Nullable Integer subtaskIndex) {
        if (subtaskIndex == null) {
            return jobVertex.getAggregateState().isTerminal();
        }
        AccessExecutionVertex executionVertex = jobVertex.getTaskVertices()[subtaskIndex];
        return executionVertex.getExecutionState().isTerminal();
    }

    private static FlameGraphTypeQueryParameter.Type getFlameGraphType(HandlerRequest<?> request) {
        List flameGraphTypeParameter = request.getQueryParameter(FlameGraphTypeQueryParameter.class);
        if (flameGraphTypeParameter.isEmpty()) {
            return FlameGraphTypeQueryParameter.Type.FULL;
        }
        return (FlameGraphTypeQueryParameter.Type)((Object)flameGraphTypeParameter.get(0));
    }

    @Nullable
    private static Integer getSubtaskIndex(HandlerRequest<?> request, AccessExecutionJobVertex jobVertex) throws RestHandlerException {
        List subtaskIndexParameter = request.getQueryParameter(SubtaskIndexQueryParameter.class);
        if (subtaskIndexParameter.isEmpty()) {
            return null;
        }
        int subtaskIndex = (Integer)subtaskIndexParameter.get(0);
        if (subtaskIndex >= jobVertex.getTaskVertices().length || subtaskIndex < 0) {
            throw new RestHandlerException("Invalid subtask index for vertex " + jobVertex.getJobVertexId(), HttpResponseStatus.NOT_FOUND);
        }
        return subtaskIndex;
    }

    public void close() throws Exception {
        this.threadInfoOperatorTracker.shutDown();
    }

    public static AbstractRestHandler<?, ?, ?, ?> disabledHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders) {
        return new DisabledJobVertexFlameGraphHandler(leaderRetriever, timeout, responseHeaders);
    }

    private static class DisabledJobVertexFlameGraphHandler
    extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, VertexFlameGraph, JobVertexFlameGraphParameters> {
        protected DisabledJobVertexFlameGraphHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders) {
            super(leaderRetriever, timeout, responseHeaders, JobVertexFlameGraphHeaders.getInstance());
        }

        @Override
        protected CompletableFuture<VertexFlameGraph> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
            return CompletableFuture.completedFuture(VertexFlameGraph.disabled());
        }
    }
}

