/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive.allocator;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.state.KeyedStateHandle;

@Internal
public class StateSizeEstimates {
    private final Map<ExecutionVertexID, Long> stateSizes;

    public StateSizeEstimates() {
        this(Collections.emptyMap());
    }

    public StateSizeEstimates(Map<ExecutionVertexID, Long> stateSizes) {
        this.stateSizes = stateSizes;
    }

    public Optional<Long> estimate(ExecutionVertexID jobVertexId) {
        return Optional.ofNullable(this.stateSizes.get(jobVertexId));
    }

    static StateSizeEstimates empty() {
        return new StateSizeEstimates();
    }

    public static StateSizeEstimates fromGraph(@Nullable ExecutionGraph executionGraph) {
        return Optional.ofNullable(executionGraph).flatMap(graph -> Optional.ofNullable(graph.getCheckpointCoordinator())).flatMap(coordinator -> Optional.ofNullable(coordinator.getCheckpointStore())).flatMap(store -> Optional.ofNullable(store.getLatestCheckpoint())).map(cp -> new StateSizeEstimates(StateSizeEstimates.merge(StateSizeEstimates.fromCompletedCheckpoint(cp), StateSizeEstimates.mapVerticesToOperators(executionGraph)))).orElse(StateSizeEstimates.empty());
    }

    private static Map<ExecutionVertexID, Long> merge(Map<OperatorID, Map<Integer, Long>> operatorsToSubtaskSizes, Map<JobVertexID, Set<OperatorID>> verticesToOperators) {
        HashMap<ExecutionVertexID, Long> result = new HashMap<ExecutionVertexID, Long>();
        for (Map.Entry<JobVertexID, Set<OperatorID>> vertexAndOperators : verticesToOperators.entrySet()) {
            for (OperatorID operatorID : vertexAndOperators.getValue()) {
                for (Map.Entry subtaskIdAndSize : operatorsToSubtaskSizes.getOrDefault((Object)operatorID, Collections.emptyMap()).entrySet()) {
                    result.merge(new ExecutionVertexID(vertexAndOperators.getKey(), (Integer)subtaskIdAndSize.getKey()), (Long)subtaskIdAndSize.getValue(), Long::sum);
                }
            }
        }
        return result;
    }

    private static Map<JobVertexID, Set<OperatorID>> mapVerticesToOperators(ExecutionGraph executionGraph) {
        return executionGraph.getAllVertices().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> StateSizeEstimates.getOperatorIDS((ExecutionJobVertex)e.getValue())));
    }

    private static Set<OperatorID> getOperatorIDS(ExecutionJobVertex v) {
        return v.getOperatorIDs().stream().map(OperatorIDPair::getGeneratedOperatorID).collect(Collectors.toSet());
    }

    private static Map<OperatorID, Map<Integer, Long>> fromCompletedCheckpoint(CompletedCheckpoint cp) {
        HashMap<OperatorID, Map<Integer, Long>> result = new HashMap<OperatorID, Map<Integer, Long>>();
        for (Map.Entry<OperatorID, OperatorState> e : cp.getOperatorStates().entrySet()) {
            result.put(e.getKey(), StateSizeEstimates.calculateStateSizeInBytes(e.getValue()));
        }
        return result;
    }

    private static Map<Integer, Long> calculateStateSizeInBytes(OperatorState state) {
        HashMap<Integer, Long> sizesPerSubtask = new HashMap<Integer, Long>();
        for (Map.Entry<Integer, OperatorSubtaskState> e : state.getSubtaskStates().entrySet()) {
            for (KeyedStateHandle handle : e.getValue().getManagedKeyedState()) {
                sizesPerSubtask.merge(e.getKey(), handle.getStateSize(), Long::sum);
            }
        }
        return sizesPerSubtask;
    }
}

