package org.apache.flink.streaming.api.graph;

import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
import org.apache.flink.runtime.jobgraph.topology.LogicalPipelinedRegion;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.UdfStreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.ShuffleMode;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.class */
public class StreamingJobGraphGenerator {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
    private static final int MANAGED_MEMORY_FRACTION_SCALE = 16;
    private final StreamGraph streamGraph;
    private final JobGraph jobGraph;
    private final StreamGraphHasher defaultStreamGraphHasher = new StreamGraphHasherV2();
    private final List<StreamGraphHasher> legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher());
    private final Map<Integer, JobVertex> jobVertices = new HashMap();
    private final Collection<Integer> builtVertices = new HashSet();
    private final Map<Integer, Map<Integer, StreamConfig>> chainedConfigs = new HashMap();
    private final Map<Integer, StreamConfig> vertexConfigs = new HashMap();
    private final Map<Integer, String> chainedNames = new HashMap();
    private final Map<Integer, ResourceSpec> chainedMinResources = new HashMap();
    private final Map<Integer, ResourceSpec> chainedPreferredResources = new HashMap();
    private final Map<Integer, InputOutputFormatContainer> chainedInputOutputFormats = new HashMap();
    private final List<StreamEdge> physicalEdgesInOrder = new ArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$streaming$api$transformations$ShuffleMode = new int[ShuffleMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$streaming$api$transformations$ShuffleMode[ShuffleMode.PIPELINED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$transformations$ShuffleMode[ShuffleMode.BATCH.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$transformations$ShuffleMode[ShuffleMode.UNDEFINED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public static JobGraph createJobGraph(StreamGraph streamGraph) {
        return createJobGraph(streamGraph, null);
    }

    public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
        return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
    }

    private StreamingJobGraphGenerator(StreamGraph streamGraph, @Nullable JobID jobID) {
        this.streamGraph = streamGraph;
        this.jobGraph = new JobGraph(jobID, streamGraph.getJobName());
    }

    private JobGraph createJobGraph() {
        preValidate();
        this.jobGraph.setScheduleMode(this.streamGraph.getScheduleMode());
        Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes = this.defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(this.streamGraph);
        ArrayList arrayList = new ArrayList(this.legacyStreamGraphHashers.size());
        Iterator<StreamGraphHasher> it = this.legacyStreamGraphHashers.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().traverseStreamGraphAndGenerateHashes(this.streamGraph));
        }
        setChaining(traverseStreamGraphAndGenerateHashes, arrayList, new HashMap());
        setPhysicalEdges();
        setSlotSharingAndCoLocation();
        setManagedMemoryFraction(Collections.unmodifiableMap(this.jobVertices), Collections.unmodifiableMap(this.vertexConfigs), Collections.unmodifiableMap(this.chainedConfigs), num -> {
            return this.streamGraph.getStreamNode(num).getMinResources();
        }, num2 -> {
            return Integer.valueOf(this.streamGraph.getStreamNode(num2).getManagedMemoryWeight());
        });
        configureCheckpointing();
        this.jobGraph.setSavepointRestoreSettings(this.streamGraph.getSavepointRestoreSettings());
        JobGraphGenerator.addUserArtifactEntries(this.streamGraph.getUserArtifacts(), this.jobGraph);
        try {
            this.jobGraph.setExecutionConfig(this.streamGraph.getExecutionConfig());
            return this.jobGraph;
        } catch (IOException e) {
            throw new IllegalConfigurationException("Could not serialize the ExecutionConfig.This indicates that non-serializable types (like custom serializers) were registered");
        }
    }

    private void preValidate() {
        CheckpointConfig checkpointConfig = this.streamGraph.getCheckpointConfig();
        if (checkpointConfig.isCheckpointingEnabled()) {
            if (this.streamGraph.isIterative() && !checkpointConfig.isForceCheckpointing()) {
                throw new UnsupportedOperationException("Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. \nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
            }
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            Iterator<StreamNode> it = this.streamGraph.getStreamNodes().iterator();
            while (it.hasNext()) {
                StreamOperatorFactory<?> operatorFactory = it.next().getOperatorFactory();
                if (operatorFactory != null) {
                    Class<? extends StreamOperator> streamOperatorClass = operatorFactory.getStreamOperatorClass(contextClassLoader);
                    if (InputSelectable.class.isAssignableFrom(streamOperatorClass)) {
                        throw new UnsupportedOperationException("Checkpointing is currently not supported for operators that implement InputSelectable:" + streamOperatorClass.getName());
                    }
                }
            }
        }
    }

    private void setPhysicalEdges() {
        HashMap hashMap = new HashMap();
        for (StreamEdge streamEdge : this.physicalEdgesInOrder) {
            ((List) hashMap.computeIfAbsent(Integer.valueOf(streamEdge.getTargetId()), num -> {
                return new ArrayList();
            })).add(streamEdge);
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            int intValue = ((Integer) entry.getKey()).intValue();
            this.vertexConfigs.get(Integer.valueOf(intValue)).setInPhysicalEdges((List) entry.getValue());
        }
    }

    private void setChaining(Map<Integer, byte[]> map, List<Map<Integer, byte[]>> list, Map<Integer, List<Tuple2<byte[], byte[]>>> map2) {
        for (Integer num : this.streamGraph.getSourceIDs()) {
            createChain(num, num, map, list, 0, map2);
        }
    }

    private List<StreamEdge> createChain(Integer num, Integer num2, Map<Integer, byte[]> map, List<Map<Integer, byte[]>> list, int i, Map<Integer, List<Tuple2<byte[], byte[]>>> map2) {
        if (this.builtVertices.contains(num)) {
            return new ArrayList();
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        StreamNode streamNode = this.streamGraph.getStreamNode(num2);
        for (StreamEdge streamEdge : streamNode.getOutEdges()) {
            if (isChainable(streamEdge, this.streamGraph)) {
                arrayList2.add(streamEdge);
            } else {
                arrayList3.add(streamEdge);
            }
        }
        Iterator<StreamEdge> it = arrayList2.iterator();
        while (it.hasNext()) {
            arrayList.addAll(createChain(num, Integer.valueOf(it.next().getTargetId()), map, list, i + 1, map2));
        }
        for (StreamEdge streamEdge2 : arrayList3) {
            arrayList.add(streamEdge2);
            createChain(Integer.valueOf(streamEdge2.getTargetId()), Integer.valueOf(streamEdge2.getTargetId()), map, list, 0, map2);
        }
        List<Tuple2<byte[], byte[]>> computeIfAbsent = map2.computeIfAbsent(num, num3 -> {
            return new ArrayList();
        });
        byte[] bArr = map.get(num2);
        OperatorID operatorID = new OperatorID(bArr);
        Iterator<Map<Integer, byte[]>> it2 = list.iterator();
        while (it2.hasNext()) {
            computeIfAbsent.add(new Tuple2<>(bArr, it2.next().get(num2)));
        }
        this.chainedNames.put(num2, createChainedName(num2, arrayList2));
        this.chainedMinResources.put(num2, createChainedMinResources(num2, arrayList2));
        this.chainedPreferredResources.put(num2, createChainedPreferredResources(num2, arrayList2));
        if (streamNode.getInputFormat() != null) {
            getOrCreateFormatContainer(num).addInputFormat(operatorID, streamNode.getInputFormat());
        }
        if (streamNode.getOutputFormat() != null) {
            getOrCreateFormatContainer(num).addOutputFormat(operatorID, streamNode.getOutputFormat());
        }
        StreamConfig createJobVertex = num2.equals(num) ? createJobVertex(num, map, list, map2) : new StreamConfig(new Configuration());
        setVertexConfig(num2, createJobVertex, arrayList2, arrayList3);
        if (num2.equals(num)) {
            createJobVertex.setChainStart();
            createJobVertex.setChainIndex(0);
            createJobVertex.setOperatorName(this.streamGraph.getStreamNode(num2).getOperatorName());
            createJobVertex.setOutEdgesInOrder(arrayList);
            createJobVertex.setOutEdges(this.streamGraph.getStreamNode(num2).getOutEdges());
            Iterator<StreamEdge> it3 = arrayList.iterator();
            while (it3.hasNext()) {
                connect(num, it3.next());
            }
            createJobVertex.setTransitiveChainedTaskConfigs(this.chainedConfigs.get(num));
        } else {
            this.chainedConfigs.computeIfAbsent(num, num4 -> {
                return new HashMap();
            });
            createJobVertex.setChainIndex(i);
            createJobVertex.setOperatorName(this.streamGraph.getStreamNode(num2).getOperatorName());
            this.chainedConfigs.get(num).put(num2, createJobVertex);
        }
        createJobVertex.setOperatorID(operatorID);
        if (arrayList2.isEmpty()) {
            createJobVertex.setChainEnd();
        }
        return arrayList;
    }

    private InputOutputFormatContainer getOrCreateFormatContainer(Integer num) {
        return this.chainedInputOutputFormats.computeIfAbsent(num, num2 -> {
            return new InputOutputFormatContainer(Thread.currentThread().getContextClassLoader());
        });
    }

    private String createChainedName(Integer num, List<StreamEdge> list) {
        String operatorName = this.streamGraph.getStreamNode(num).getOperatorName();
        if (list.size() <= 1) {
            return list.size() == 1 ? operatorName + " -> " + this.chainedNames.get(Integer.valueOf(list.get(0).getTargetId())) : operatorName;
        }
        ArrayList arrayList = new ArrayList();
        Iterator<StreamEdge> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(this.chainedNames.get(Integer.valueOf(it.next().getTargetId())));
        }
        return operatorName + " -> (" + StringUtils.join(arrayList, ", ") + ")";
    }

    private ResourceSpec createChainedMinResources(Integer num, List<StreamEdge> list) {
        ResourceSpec minResources = this.streamGraph.getStreamNode(num).getMinResources();
        Iterator<StreamEdge> it = list.iterator();
        while (it.hasNext()) {
            minResources = minResources.merge(this.chainedMinResources.get(Integer.valueOf(it.next().getTargetId())));
        }
        return minResources;
    }

    private ResourceSpec createChainedPreferredResources(Integer num, List<StreamEdge> list) {
        ResourceSpec preferredResources = this.streamGraph.getStreamNode(num).getPreferredResources();
        Iterator<StreamEdge> it = list.iterator();
        while (it.hasNext()) {
            preferredResources = preferredResources.merge(this.chainedPreferredResources.get(Integer.valueOf(it.next().getTargetId())));
        }
        return preferredResources;
    }

    private StreamConfig createJobVertex(Integer num, Map<Integer, byte[]> map, List<Map<Integer, byte[]>> list, Map<Integer, List<Tuple2<byte[], byte[]>>> map2) {
        JobVertex jobVertex;
        StreamNode streamNode = this.streamGraph.getStreamNode(num);
        byte[] bArr = map.get(num);
        if (bArr == null) {
            throw new IllegalStateException("Cannot find node hash. Did you generate them before calling this method?");
        }
        JobVertexID jobVertexID = new JobVertexID(bArr);
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<Map<Integer, byte[]>> it = list.iterator();
        while (it.hasNext()) {
            byte[] bArr2 = it.next().get(num);
            if (null != bArr2) {
                arrayList.add(new JobVertexID(bArr2));
            }
        }
        List<Tuple2<byte[], byte[]>> list2 = map2.get(num);
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        if (list2 != null) {
            for (Tuple2<byte[], byte[]> tuple2 : list2) {
                arrayList2.add(new OperatorID((byte[]) tuple2.f0));
                arrayList3.add(tuple2.f1 != null ? new OperatorID((byte[]) tuple2.f1) : null);
            }
        }
        if (this.chainedInputOutputFormats.containsKey(num)) {
            jobVertex = new InputOutputFormatVertex(this.chainedNames.get(num), jobVertexID, arrayList, arrayList2, arrayList3);
            this.chainedInputOutputFormats.get(num).write(new TaskConfig(jobVertex.getConfiguration()));
        } else {
            jobVertex = new JobVertex(this.chainedNames.get(num), jobVertexID, arrayList, arrayList2, arrayList3);
        }
        jobVertex.setResources(this.chainedMinResources.get(num), this.chainedPreferredResources.get(num));
        jobVertex.setInvokableClass(streamNode.getJobVertexClass());
        int parallelism = streamNode.getParallelism();
        if (parallelism > 0) {
            jobVertex.setParallelism(parallelism);
        } else {
            parallelism = jobVertex.getParallelism();
        }
        jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Parallelism set: {} for {}", Integer.valueOf(parallelism), num);
        }
        jobVertex.setInputDependencyConstraint(this.streamGraph.getExecutionConfig().getDefaultInputDependencyConstraint());
        this.jobVertices.put(num, jobVertex);
        this.builtVertices.add(num);
        this.jobGraph.addVertex(jobVertex);
        return new StreamConfig(jobVertex.getConfiguration());
    }

    private void setVertexConfig(Integer num, StreamConfig streamConfig, List<StreamEdge> list, List<StreamEdge> list2) {
        StreamNode streamNode = this.streamGraph.getStreamNode(num);
        streamConfig.setVertexID(num);
        streamConfig.setBufferTimeout(streamNode.getBufferTimeout());
        streamConfig.setTypeSerializerIn1(streamNode.getTypeSerializerIn1());
        streamConfig.setTypeSerializerIn2(streamNode.getTypeSerializerIn2());
        streamConfig.setTypeSerializerOut(streamNode.getTypeSerializerOut());
        for (StreamEdge streamEdge : list) {
            if (streamEdge.getOutputTag() != null) {
                streamConfig.setTypeSerializerSideOut(streamEdge.getOutputTag(), streamEdge.getOutputTag().getTypeInfo().createSerializer(this.streamGraph.getExecutionConfig()));
            }
        }
        for (StreamEdge streamEdge2 : list2) {
            if (streamEdge2.getOutputTag() != null) {
                streamConfig.setTypeSerializerSideOut(streamEdge2.getOutputTag(), streamEdge2.getOutputTag().getTypeInfo().createSerializer(this.streamGraph.getExecutionConfig()));
            }
        }
        streamConfig.setStreamOperatorFactory(streamNode.getOperatorFactory());
        streamConfig.setOutputSelectors(streamNode.getOutputSelectors());
        streamConfig.setNumberOfOutputs(list2.size());
        streamConfig.setNonChainedOutputs(list2);
        streamConfig.setChainedOutputs(list);
        streamConfig.setTimeCharacteristic(this.streamGraph.getTimeCharacteristic());
        CheckpointConfig checkpointConfig = this.streamGraph.getCheckpointConfig();
        streamConfig.setStateBackend(this.streamGraph.getStateBackend());
        streamConfig.setCheckpointingEnabled(checkpointConfig.isCheckpointingEnabled());
        if (checkpointConfig.isCheckpointingEnabled()) {
            streamConfig.setCheckpointMode(checkpointConfig.getCheckpointingMode());
        } else {
            streamConfig.setCheckpointMode(CheckpointingMode.AT_LEAST_ONCE);
        }
        streamConfig.setStatePartitioner(0, streamNode.getStatePartitioner1());
        streamConfig.setStatePartitioner(1, streamNode.getStatePartitioner2());
        streamConfig.setStateKeySerializer(streamNode.getStateKeySerializer());
        Class<? extends AbstractInvokable> jobVertexClass = streamNode.getJobVertexClass();
        if (jobVertexClass.equals(StreamIterationHead.class) || jobVertexClass.equals(StreamIterationTail.class)) {
            streamConfig.setIterationId(this.streamGraph.getBrokerID(num));
            streamConfig.setIterationWaitTime(this.streamGraph.getLoopTimeout(num));
        }
        this.vertexConfigs.put(num, streamConfig);
    }

    private void connect(Integer num, StreamEdge streamEdge) {
        ResultPartitionType resultPartitionType;
        this.physicalEdgesInOrder.add(streamEdge);
        Integer valueOf = Integer.valueOf(streamEdge.getTargetId());
        JobVertex jobVertex = this.jobVertices.get(num);
        JobVertex jobVertex2 = this.jobVertices.get(valueOf);
        StreamConfig streamConfig = new StreamConfig(jobVertex2.getConfiguration());
        streamConfig.setNumberOfInputs(streamConfig.getNumberOfInputs() + 1);
        StreamPartitioner<?> partitioner = streamEdge.getPartitioner();
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$api$transformations$ShuffleMode[streamEdge.getShuffleMode().ordinal()]) {
            case CheckpointConfig.DEFAULT_MAX_CONCURRENT_CHECKPOINTS /* 1 */:
                resultPartitionType = ResultPartitionType.PIPELINED_BOUNDED;
                break;
            case InternalTimerServiceSerializationProxy.VERSION /* 2 */:
                resultPartitionType = ResultPartitionType.BLOCKING;
                break;
            case 3:
                resultPartitionType = this.streamGraph.isBlockingConnectionsBetweenChains() ? ResultPartitionType.BLOCKING : ResultPartitionType.PIPELINED_BOUNDED;
                break;
            default:
                throw new UnsupportedOperationException("Data exchange mode " + streamEdge.getShuffleMode() + " is not supported yet.");
        }
        (((partitioner instanceof ForwardPartitioner) || (partitioner instanceof RescalePartitioner)) ? jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, resultPartitionType) : jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType)).setShipStrategyName(partitioner.toString());
        if (LOG.isDebugEnabled()) {
            LOG.debug("CONNECTED: {} - {} -> {}", new Object[]{partitioner.getClass().getSimpleName(), num, valueOf});
        }
    }

    public static boolean isChainable(StreamEdge streamEdge, StreamGraph streamGraph) {
        StreamNode sourceVertex = streamGraph.getSourceVertex(streamEdge);
        StreamNode targetVertex = streamGraph.getTargetVertex(streamEdge);
        StreamOperatorFactory<?> operatorFactory = sourceVertex.getOperatorFactory();
        StreamOperatorFactory<?> operatorFactory2 = targetVertex.getOperatorFactory();
        return targetVertex.getInEdges().size() == 1 && operatorFactory2 != null && operatorFactory != null && sourceVertex.isSameSlotSharingGroup(targetVertex) && operatorFactory2.getChainingStrategy() == ChainingStrategy.ALWAYS && (operatorFactory.getChainingStrategy() == ChainingStrategy.HEAD || operatorFactory.getChainingStrategy() == ChainingStrategy.ALWAYS) && (streamEdge.getPartitioner() instanceof ForwardPartitioner) && streamEdge.getShuffleMode() != ShuffleMode.BATCH && sourceVertex.getParallelism() == targetVertex.getParallelism() && streamGraph.isChainingEnabled();
    }

    private void setSlotSharingAndCoLocation() {
        setSlotSharing();
        setCoLocation();
    }

    private void setSlotSharing() {
        HashMap hashMap = new HashMap();
        Map<JobVertexID, SlotSharingGroup> buildVertexRegionSlotSharingGroups = buildVertexRegionSlotSharingGroups();
        for (Map.Entry<Integer, JobVertex> entry : this.jobVertices.entrySet()) {
            JobVertex value = entry.getValue();
            String slotSharingGroup = this.streamGraph.getStreamNode(entry.getKey()).getSlotSharingGroup();
            value.setSlotSharingGroup(slotSharingGroup == null ? null : slotSharingGroup.equals(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP) ? buildVertexRegionSlotSharingGroups.get(value.getID()) : (SlotSharingGroup) hashMap.computeIfAbsent(slotSharingGroup, str -> {
                return new SlotSharingGroup();
            }));
        }
    }

    private Map<JobVertexID, SlotSharingGroup> buildVertexRegionSlotSharingGroups() {
        HashMap hashMap = new HashMap();
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        boolean isAllVerticesInSameSlotSharingGroupByDefault = this.streamGraph.isAllVerticesInSameSlotSharingGroupByDefault();
        for (LogicalPipelinedRegion logicalPipelinedRegion : new DefaultLogicalTopology(this.jobGraph).getLogicalPipelinedRegions()) {
            SlotSharingGroup slotSharingGroup2 = isAllVerticesInSameSlotSharingGroupByDefault ? slotSharingGroup : new SlotSharingGroup();
            Iterator it = logicalPipelinedRegion.getVertexIDs().iterator();
            while (it.hasNext()) {
                hashMap.put((JobVertexID) it.next(), slotSharingGroup2);
            }
        }
        return hashMap;
    }

    private void setCoLocation() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<Integer, JobVertex> entry : this.jobVertices.entrySet()) {
            StreamNode streamNode = this.streamGraph.getStreamNode(entry.getKey());
            JobVertex value = entry.getValue();
            SlotSharingGroup slotSharingGroup = value.getSlotSharingGroup();
            String coLocationGroup = streamNode.getCoLocationGroup();
            if (coLocationGroup != null) {
                if (slotSharingGroup == null) {
                    throw new IllegalStateException("Cannot use a co-location constraint without a slot sharing group");
                }
                Tuple2 tuple2 = (Tuple2) hashMap.computeIfAbsent(coLocationGroup, str -> {
                    return new Tuple2(slotSharingGroup, new CoLocationGroup());
                });
                if (tuple2.f0 != slotSharingGroup) {
                    throw new IllegalStateException("Cannot co-locate operators from different slot sharing groups");
                }
                value.updateCoLocationGroup((CoLocationGroup) tuple2.f1);
                ((CoLocationGroup) tuple2.f1).addVertex(value);
            }
        }
    }

    private static void setManagedMemoryFraction(Map<Integer, JobVertex> map, Map<Integer, StreamConfig> map2, Map<Integer, Map<Integer, StreamConfig>> map3, Function<Integer, ResourceSpec> function, Function<Integer, Integer> function2) {
        Set newSetFromMap = Collections.newSetFromMap(new IdentityHashMap());
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry<Integer, JobVertex> entry : map.entrySet()) {
            int intValue = entry.getKey().intValue();
            JobVertex value = entry.getValue();
            SlotSharingGroup slotSharingGroup = value.getSlotSharingGroup();
            Preconditions.checkState(slotSharingGroup != null, "JobVertex slot sharing group must not be null");
            newSetFromMap.add(slotSharingGroup);
            hashMap.put(value.getID(), Integer.valueOf(intValue));
            HashSet hashSet = new HashSet();
            hashSet.add(Integer.valueOf(intValue));
            hashSet.addAll(map3.getOrDefault(Integer.valueOf(intValue), Collections.emptyMap()).keySet());
            hashMap2.put(value.getID(), hashSet);
        }
        Iterator it = newSetFromMap.iterator();
        while (it.hasNext()) {
            setManagedMemoryFractionForSlotSharingGroup((SlotSharingGroup) it.next(), hashMap, hashMap2, map2, map3, function, function2);
        }
    }

    private static void setManagedMemoryFractionForSlotSharingGroup(SlotSharingGroup slotSharingGroup, Map<JobVertexID, Integer> map, Map<JobVertexID, Set<Integer>> map2, Map<Integer, StreamConfig> map3, Map<Integer, Map<Integer, StreamConfig>> map4, Function<Integer, ResourceSpec> function, Function<Integer, Integer> function2) {
        Stream flatMap = slotSharingGroup.getJobVertexIds().stream().flatMap(jobVertexID -> {
            return ((Set) map2.get(jobVertexID)).stream();
        });
        function2.getClass();
        int sum = flatMap.mapToInt((v1) -> {
            return r1.apply(v1);
        }).sum();
        for (JobVertexID jobVertexID2 : slotSharingGroup.getJobVertexIds()) {
            Iterator<Integer> it = map2.get(jobVertexID2).iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                StreamConfig streamConfig = map3.get(Integer.valueOf(intValue));
                setManagedMemoryFractionForOperator(function.apply(Integer.valueOf(intValue)), slotSharingGroup.getResourceSpec(), function2.apply(Integer.valueOf(intValue)).intValue(), sum, streamConfig);
            }
            int intValue2 = map.get(jobVertexID2).intValue();
            map3.get(Integer.valueOf(intValue2)).setTransitiveChainedTaskConfigs(map4.get(Integer.valueOf(intValue2)));
        }
    }

    private static void setManagedMemoryFractionForOperator(ResourceSpec resourceSpec, ResourceSpec resourceSpec2, int i, int i2, StreamConfig streamConfig) {
        double fractionRoundedDown;
        if (resourceSpec2.equals(ResourceSpec.UNKNOWN)) {
            fractionRoundedDown = i2 > 0 ? getFractionRoundedDown(i, i2) : 0.0d;
        } else {
            long bytes = resourceSpec2.getManagedMemory().getBytes();
            fractionRoundedDown = bytes > 0 ? getFractionRoundedDown(resourceSpec.getManagedMemory().getBytes(), bytes) : 0.0d;
        }
        streamConfig.setManagedMemoryFraction(fractionRoundedDown);
    }

    private static double getFractionRoundedDown(long j, long j2) {
        return BigDecimal.valueOf(j).divide(BigDecimal.valueOf(j2), MANAGED_MEMORY_FRACTION_SCALE, 1).doubleValue();
    }

    private void configureCheckpointing() {
        CheckpointRetentionPolicy checkpointRetentionPolicy;
        boolean z;
        SerializedValue serializedValue;
        SerializedValue serializedValue2;
        CheckpointConfig checkpointConfig = this.streamGraph.getCheckpointConfig();
        long checkpointInterval = checkpointConfig.getCheckpointInterval();
        if (checkpointInterval < 10) {
            checkpointInterval = Long.MAX_VALUE;
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList(this.jobVertices.size());
        ArrayList arrayList3 = new ArrayList(this.jobVertices.size());
        for (JobVertex jobVertex : this.jobVertices.values()) {
            if (jobVertex.isInputVertex()) {
                arrayList.add(jobVertex.getID());
            }
            arrayList3.add(jobVertex.getID());
            arrayList2.add(jobVertex.getID());
        }
        if (checkpointConfig.isExternalizedCheckpointsEnabled()) {
            CheckpointConfig.ExternalizedCheckpointCleanup externalizedCheckpointCleanup = checkpointConfig.getExternalizedCheckpointCleanup();
            if (externalizedCheckpointCleanup == null) {
                throw new IllegalStateException("Externalized checkpoints enabled, but no cleanup mode configured.");
            }
            checkpointRetentionPolicy = externalizedCheckpointCleanup.deleteOnCancellation() ? CheckpointRetentionPolicy.RETAIN_ON_FAILURE : CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION;
        } else {
            checkpointRetentionPolicy = CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
        }
        CheckpointingMode checkpointingMode = checkpointConfig.getCheckpointingMode();
        if (checkpointingMode == CheckpointingMode.EXACTLY_ONCE) {
            z = true;
        } else {
            if (checkpointingMode != CheckpointingMode.AT_LEAST_ONCE) {
                throw new IllegalStateException("Unexpected checkpointing mode. Did not expect there to be another checkpointing mode besides exactly-once or at-least-once.");
            }
            z = false;
        }
        ArrayList arrayList4 = new ArrayList();
        for (StreamNode streamNode : this.streamGraph.getStreamNodes()) {
            if (streamNode.getOperatorFactory() instanceof UdfStreamOperatorFactory) {
                WithMasterCheckpointHook userFunction = ((UdfStreamOperatorFactory) streamNode.getOperatorFactory()).getUserFunction();
                if (userFunction instanceof WithMasterCheckpointHook) {
                    arrayList4.add(new FunctionMasterCheckpointHookFactory(userFunction));
                }
            }
        }
        if (arrayList4.isEmpty()) {
            serializedValue = null;
        } else {
            try {
                serializedValue = new SerializedValue((MasterTriggerRestoreHook.Factory[]) arrayList4.toArray(new MasterTriggerRestoreHook.Factory[arrayList4.size()]));
            } catch (IOException e) {
                throw new FlinkRuntimeException("Trigger/restore hook is not serializable", e);
            }
        }
        if (this.streamGraph.getStateBackend() == null) {
            serializedValue2 = null;
        } else {
            try {
                serializedValue2 = new SerializedValue(this.streamGraph.getStateBackend());
            } catch (IOException e2) {
                throw new FlinkRuntimeException("State backend is not serializable", e2);
            }
        }
        this.jobGraph.setSnapshotSettings(new JobCheckpointingSettings(arrayList, arrayList2, arrayList3, new CheckpointCoordinatorConfiguration(checkpointInterval, checkpointConfig.getCheckpointTimeout(), checkpointConfig.getMinPauseBetweenCheckpoints(), checkpointConfig.getMaxConcurrentCheckpoints(), checkpointRetentionPolicy, z, checkpointConfig.isPreferCheckpointForRecovery(), checkpointConfig.getTolerableCheckpointFailureNumber()), serializedValue2, serializedValue));
    }
}
