/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo;
import org.apache.flink.runtime.scheduler.MutableVertexParallelismStore;
import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;

public class DefaultVertexParallelismStore
implements MutableVertexParallelismStore {
    private static final Function<Integer, Optional<String>> RESCALE_MAX_REJECT = maxParallelism -> Optional.of("Cannot change the max parallelism.");
    private final Map<JobVertexID, VertexParallelismInformation> vertexToParallelismInfo = new HashMap<JobVertexID, VertexParallelismInformation>();

    public static Optional<VertexParallelismStore> applyJobResourceRequirements(VertexParallelismStore oldVertexParallelismStore, JobResourceRequirements jobResourceRequirements) {
        DefaultVertexParallelismStore newVertexParallelismStore = new DefaultVertexParallelismStore();
        boolean changed = false;
        for (JobVertexID jobVertexId : jobResourceRequirements.getJobVertices()) {
            VertexParallelismInformation oldVertexParallelismInfo = oldVertexParallelismStore.getParallelismInfo(jobVertexId);
            int parallelism = jobResourceRequirements.getParallelism(jobVertexId).getUpperBound();
            newVertexParallelismStore.setParallelismInfo(jobVertexId, new DefaultVertexParallelismInfo(parallelism, oldVertexParallelismInfo.getMaxParallelism(), RESCALE_MAX_REJECT));
            changed |= oldVertexParallelismInfo.getParallelism() != parallelism;
        }
        return changed ? Optional.of(newVertexParallelismStore) : Optional.empty();
    }

    @Override
    public void setParallelismInfo(JobVertexID vertexId, VertexParallelismInformation info) {
        this.vertexToParallelismInfo.put(vertexId, info);
    }

    @Override
    public VertexParallelismInformation getParallelismInfo(JobVertexID vertexId) {
        return Optional.ofNullable(this.vertexToParallelismInfo.get(vertexId)).orElseThrow(() -> new IllegalStateException(String.format("No parallelism information set for vertex %s", vertexId)));
    }
}

