/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil;
import org.apache.flink.runtime.jobgraph.forwardgroup.StreamNodeForwardGroup;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.streaming.api.graph.AdaptiveGraphGenerator;
import org.apache.flink.streaming.api.graph.DefaultStreamGraphContext;
import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphContext;
import org.apache.flink.streaming.api.graph.StreamGraphHasher;
import org.apache.flink.streaming.api.graph.StreamGraphHasherV2;
import org.apache.flink.streaming.api.graph.StreamGraphUserHashHasher;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.api.graph.util.JobVertexBuildContext;
import org.apache.flink.streaming.api.graph.util.OperatorChainInfo;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.runtime.partitioner.ForwardForConsecutiveHashPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardForUnspecifiedPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.util.Preconditions;

@Internal
public class AdaptiveGraphManager
implements AdaptiveGraphGenerator,
StreamGraphContext.StreamGraphUpdateListener {
    private final StreamGraph streamGraph;
    private final JobGraph jobGraph;
    private final StreamGraphHasher defaultStreamGraphHasher;
    private final List<StreamGraphHasher> legacyStreamGraphHasher;
    private final Executor serializationExecutor;
    private final AtomicInteger vertexIndexId;
    private final StreamGraphContext streamGraphContext;
    private final Map<Integer, byte[]> hashes;
    private final List<Map<Integer, byte[]>> legacyHashes;
    private final Map<Integer, Integer> frozenNodeToStartNodeMap;
    private final Map<Integer, Map<StreamEdge, NonChainedOutput>> intermediateOutputsCaches;
    private final Map<IntermediateDataSetID, Integer> intermediateDataSetIdToProducerMap;
    private final Map<IntermediateDataSetID, List<StreamEdge>> intermediateDataSetIdToOutputEdgesMap;
    private final Map<String, IntermediateDataSet> consumerEdgeIdToIntermediateDataSetMap = new HashMap<String, IntermediateDataSet>();
    private final Map<Integer, StreamNodeForwardGroup> steamNodeIdToForwardGroupMap;
    private final Map<Integer, OperatorChainInfo> pendingChainEntryPoints;
    private final Map<JobVertexID, Integer> jobVertexToStartNodeMap;
    private final Map<JobVertexID, List<Integer>> jobVertexToChainedStreamNodeIdsMap;
    private final Map<Integer, JobVertex> startNodeToJobVertexMap;
    private final Map<Integer, JobVertexID> streamNodeIdsToJobVertexMap;
    private final Set<JobVertexID> finishedJobVertices;
    private final Set<Integer> finishedStreamNodeIds;
    private final AtomicBoolean hasHybridResultPartition;
    private final SlotSharingGroup defaultSlotSharingGroup;
    private String streamGraphJson;

    public AdaptiveGraphManager(ClassLoader userClassloader, StreamGraph streamGraph, Executor serializationExecutor) {
        StreamingJobGraphGenerator.preValidate(streamGraph, userClassloader);
        this.streamGraph = streamGraph;
        this.serializationExecutor = Preconditions.checkNotNull(serializationExecutor);
        this.defaultStreamGraphHasher = new StreamGraphHasherV2();
        this.legacyStreamGraphHasher = Collections.singletonList(new StreamGraphUserHashHasher());
        this.hashes = new HashMap<Integer, byte[]>();
        this.legacyHashes = Collections.singletonList(new HashMap());
        this.startNodeToJobVertexMap = new LinkedHashMap<Integer, JobVertex>();
        this.pendingChainEntryPoints = new TreeMap<Integer, OperatorChainInfo>();
        this.frozenNodeToStartNodeMap = new HashMap<Integer, Integer>();
        this.intermediateOutputsCaches = new HashMap<Integer, Map<StreamEdge, NonChainedOutput>>();
        this.intermediateDataSetIdToProducerMap = new HashMap<IntermediateDataSetID, Integer>();
        this.intermediateDataSetIdToOutputEdgesMap = new HashMap<IntermediateDataSetID, List<StreamEdge>>();
        this.steamNodeIdToForwardGroupMap = new HashMap<Integer, StreamNodeForwardGroup>();
        this.vertexIndexId = new AtomicInteger(0);
        this.hasHybridResultPartition = new AtomicBoolean(false);
        this.jobVertexToStartNodeMap = new HashMap<JobVertexID, Integer>();
        this.jobVertexToChainedStreamNodeIdsMap = new HashMap<JobVertexID, List<Integer>>();
        this.streamNodeIdsToJobVertexMap = new HashMap<Integer, JobVertexID>();
        this.finishedJobVertices = new HashSet<JobVertexID>();
        this.finishedStreamNodeIds = new HashSet<Integer>();
        this.streamGraphContext = new DefaultStreamGraphContext(streamGraph, this.steamNodeIdToForwardGroupMap, this.frozenNodeToStartNodeMap, this.intermediateOutputsCaches, this.consumerEdgeIdToIntermediateDataSetMap, this.finishedStreamNodeIds, userClassloader, this);
        this.jobGraph = StreamingJobGraphGenerator.createAndInitializeJobGraph(streamGraph, streamGraph.getJobID());
        this.defaultSlotSharingGroup = new SlotSharingGroup();
        this.initialization();
    }

    @Override
    public JobGraph getJobGraph() {
        return this.jobGraph;
    }

    @Override
    public StreamGraphContext getStreamGraphContext() {
        return this.streamGraphContext;
    }

    @Override
    public List<JobVertex> onJobVertexFinished(JobVertexID finishedJobVertexId) {
        this.finishedJobVertices.add(finishedJobVertexId);
        ArrayList<StreamNode> streamNodes = new ArrayList<StreamNode>();
        for (StreamEdge outEdge : this.getOutputEdgesByVertexId(finishedJobVertexId)) {
            streamNodes.add(this.streamGraph.getStreamNode(outEdge.getTargetId()));
        }
        return this.createJobVerticesAndUpdateGraph(streamNodes);
    }

    public void addFinishedStreamNodeIds(List<Integer> finishedStreamNodeIds) {
        this.finishedStreamNodeIds.addAll(finishedStreamNodeIds);
    }

    public StreamNodeForwardGroup getStreamNodeForwardGroupByVertexId(JobVertexID jobVertexId) {
        Integer startNodeId = this.jobVertexToStartNodeMap.get(jobVertexId);
        return this.steamNodeIdToForwardGroupMap.get(startNodeId);
    }

    public int getPendingOperatorsCount() {
        return this.streamGraph.getStreamNodes().size() - this.frozenNodeToStartNodeMap.size();
    }

    public List<Integer> getStreamNodeIdsByJobVertexId(JobVertexID jobVertexId) {
        return this.jobVertexToChainedStreamNodeIdsMap.get(jobVertexId);
    }

    public Integer getProducerStreamNodeId(IntermediateDataSetID intermediateDataSetId) {
        return this.intermediateDataSetIdToProducerMap.get(intermediateDataSetId);
    }

    public List<StreamEdge> getOutputStreamEdges(IntermediateDataSetID intermediateDataSetId) {
        return Collections.unmodifiableList(this.intermediateDataSetIdToOutputEdgesMap.get(intermediateDataSetId));
    }

    public Optional<JobVertexID> findVertexByStreamNodeId(int streamNodeId) {
        if (this.isNodeFrozen(streamNodeId)) {
            Integer startNodeId = this.getStartNodeId(streamNodeId);
            return Optional.of(this.startNodeToJobVertexMap.get(startNodeId).getID());
        }
        return Optional.empty();
    }

    private List<StreamEdge> getOutputEdgesByVertexId(JobVertexID jobVertexId) {
        JobVertex jobVertex = this.jobGraph.findVertexByID(jobVertexId);
        ArrayList<StreamEdge> outputEdges = new ArrayList<StreamEdge>();
        for (IntermediateDataSet result : jobVertex.getProducedDataSets()) {
            outputEdges.addAll((Collection<StreamEdge>)this.intermediateDataSetIdToOutputEdgesMap.get(result.getId()));
        }
        return outputEdges;
    }

    private void initialization() {
        ArrayList<StreamNode> sourceNodes = new ArrayList<StreamNode>();
        for (Integer sourceNodeId : this.streamGraph.getSourceIDs()) {
            sourceNodes.add(this.streamGraph.getStreamNode(sourceNodeId));
        }
        if (this.jobGraph.isDynamic()) {
            this.setVertexParallelismsForDynamicGraphIfNecessary();
        }
        this.createJobVerticesAndUpdateGraph(sourceNodes);
    }

    private List<JobVertex> createJobVerticesAndUpdateGraph(List<StreamNode> streamNodes) {
        JobVertexBuildContext jobVertexBuildContext = new JobVertexBuildContext(this.jobGraph, this.streamGraph, this.hasHybridResultPartition, this.hashes, this.legacyHashes, this.defaultSlotSharingGroup);
        this.createOperatorChainInfos(streamNodes, jobVertexBuildContext);
        this.recordCreatedJobVerticesInfo(jobVertexBuildContext);
        this.generateConfigForJobVertices(jobVertexBuildContext);
        this.generateStreamGraphJson();
        return new ArrayList<JobVertex>(jobVertexBuildContext.getJobVerticesInOrder().values());
    }

    private void generateConfigForJobVertices(JobVertexBuildContext jobVertexBuildContext) {
        HashMap<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs = new HashMap<Integer, Map<StreamEdge, NonChainedOutput>>();
        StreamingJobGraphGenerator.setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs, jobVertexBuildContext);
        this.setAllVertexNonChainedOutputsConfigs(opIntermediateOutputs, jobVertexBuildContext);
        this.connectToFinishedUpStreamVertex(jobVertexBuildContext);
        StreamingJobGraphGenerator.setPhysicalEdges(jobVertexBuildContext);
        StreamingJobGraphGenerator.markSupportingConcurrentExecutionAttempts(jobVertexBuildContext);
        StreamingJobGraphGenerator.validateHybridShuffleExecuteInBatchMode(jobVertexBuildContext);
        StreamingJobGraphGenerator.setSlotSharingAndCoLocation(jobVertexBuildContext);
        StreamingJobGraphGenerator.setManagedMemoryFraction(jobVertexBuildContext);
        StreamingJobGraphGenerator.addVertexIndexPrefixInVertexName(jobVertexBuildContext, this.vertexIndexId);
        StreamingJobGraphGenerator.setVertexDescription(jobVertexBuildContext);
        StreamingJobGraphGenerator.serializeOperatorCoordinatorsAndStreamConfig(this.serializationExecutor, jobVertexBuildContext);
    }

    private void setAllVertexNonChainedOutputsConfigs(Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs, JobVertexBuildContext jobVertexBuildContext) {
        jobVertexBuildContext.getJobVerticesInOrder().keySet().forEach(startNodeId -> this.setVertexNonChainedOutputsConfig((Integer)startNodeId, opIntermediateOutputs, jobVertexBuildContext));
    }

    private void setVertexNonChainedOutputsConfig(Integer startNodeId, Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs, JobVertexBuildContext jobVertexBuildContext) {
        OperatorChainInfo chainInfo = jobVertexBuildContext.getChainInfo(startNodeId);
        StreamConfig config = chainInfo.getOperatorInfo(startNodeId).getVertexConfig();
        List<StreamEdge> transitiveOutEdges = chainInfo.getTransitiveOutEdges();
        LinkedHashSet<NonChainedOutput> transitiveOutputs = new LinkedHashSet<NonChainedOutput>();
        for (StreamEdge edge : transitiveOutEdges) {
            NonChainedOutput output = opIntermediateOutputs.get(edge.getSourceId()).get(edge);
            transitiveOutputs.add(output);
            if (jobVertexBuildContext.getJobVerticesInOrder().containsKey(edge.getTargetId())) {
                StreamingJobGraphGenerator.connect(startNodeId, edge, output, this.startNodeToJobVertexMap, jobVertexBuildContext);
            } else {
                JobVertex jobVertex = jobVertexBuildContext.getJobVerticesInOrder().get(startNodeId);
                IntermediateDataSet dataSet = jobVertex.getOrCreateResultDataSet(output.getDataSetId(), output.getPartitionType());
                DistributionPattern distributionPattern = edge.getPartitioner().isPointwise() ? DistributionPattern.POINTWISE : DistributionPattern.ALL_TO_ALL;
                dataSet.configure(distributionPattern, edge.getPartitioner().isBroadcast(), edge.getPartitioner().getClass().equals(ForwardPartitioner.class));
                dataSet.increaseNumJobEdgesToCreate();
                this.intermediateDataSetIdToOutputEdgesMap.computeIfAbsent(dataSet.getId(), ignored -> new ArrayList()).add(edge);
                this.consumerEdgeIdToIntermediateDataSetMap.put(edge.getEdgeId(), dataSet);
                this.intermediateOutputsCaches.computeIfAbsent(edge.getSourceId(), k -> new HashMap()).put(edge, output);
            }
            this.intermediateDataSetIdToProducerMap.put(output.getDataSetId(), edge.getSourceId());
        }
        config.setVertexNonChainedOutputs(new ArrayList<NonChainedOutput>(transitiveOutputs));
    }

    private void connectToFinishedUpStreamVertex(JobVertexBuildContext jobVertexBuildContext) {
        Map<Integer, OperatorChainInfo> chainInfos = jobVertexBuildContext.getChainInfosInOrder();
        for (OperatorChainInfo chainInfo : chainInfos.values()) {
            List<StreamEdge> transitiveInEdges = chainInfo.getTransitiveInEdges();
            for (StreamEdge transitiveInEdge : transitiveInEdges) {
                NonChainedOutput output = this.intermediateOutputsCaches.get(transitiveInEdge.getSourceId()).get(transitiveInEdge);
                Integer sourceStartNodeId = this.getStartNodeId(transitiveInEdge.getSourceId());
                StreamingJobGraphGenerator.connect(sourceStartNodeId, transitiveInEdge, output, this.startNodeToJobVertexMap, jobVertexBuildContext);
            }
        }
    }

    private void recordCreatedJobVerticesInfo(JobVertexBuildContext jobVertexBuildContext) {
        Map<Integer, OperatorChainInfo> chainInfos = jobVertexBuildContext.getChainInfosInOrder();
        for (OperatorChainInfo chainInfo : chainInfos.values()) {
            JobVertex jobVertex = jobVertexBuildContext.getJobVertex(chainInfo.getStartNodeId());
            this.startNodeToJobVertexMap.put(chainInfo.getStartNodeId(), jobVertex);
            this.jobVertexToStartNodeMap.put(jobVertex.getID(), chainInfo.getStartNodeId());
            chainInfo.getAllChainedNodes().forEach(node -> {
                this.frozenNodeToStartNodeMap.put(node.getId(), chainInfo.getStartNodeId());
                this.jobVertexToChainedStreamNodeIdsMap.computeIfAbsent(jobVertex.getID(), key -> new ArrayList()).add(node.getId());
                this.streamNodeIdsToJobVertexMap.put(node.getId(), jobVertex.getID());
            });
        }
    }

    private void createOperatorChainInfos(List<StreamNode> streamNodes, JobVertexBuildContext jobVertexBuildContext) {
        Map<Integer, OperatorChainInfo> chainEntryPoints = this.buildAndGetChainEntryPoints(streamNodes, jobVertexBuildContext);
        Collection initialEntryPoints = chainEntryPoints.entrySet().stream().sorted(Map.Entry.comparingByKey()).map(Map.Entry::getValue).collect(Collectors.toList());
        for (OperatorChainInfo info : initialEntryPoints) {
            StreamingJobGraphGenerator.createChain(info.getStartNodeId(), 1, info, chainEntryPoints, false, this.serializationExecutor, jobVertexBuildContext, this::generateHashesByStreamNodeId);
            StreamNode startNode = this.streamGraph.getStreamNode(info.getStartNodeId());
            for (StreamEdge inEdge : startNode.getInEdges()) {
                if (!this.isNodeFrozen(inEdge.getSourceId())) continue;
                info.addTransitiveInEdge(inEdge);
            }
        }
    }

    private Map<Integer, OperatorChainInfo> buildAndGetChainEntryPoints(List<StreamNode> streamNodes, JobVertexBuildContext jobVertexBuildContext) {
        Collection<Integer> sourceNodeIds = this.streamGraph.getSourceIDs();
        for (StreamNode streamNode : streamNodes) {
            int streamNodeId = streamNode.getId();
            if (sourceNodeIds.contains(streamNodeId) && StreamingJobGraphGenerator.isChainableSource(streamNode, this.streamGraph)) {
                this.generateHashesByStreamNodeId(streamNodeId);
                StreamingJobGraphGenerator.createSourceChainInfo(streamNode, this.pendingChainEntryPoints, jobVertexBuildContext);
                continue;
            }
            this.pendingChainEntryPoints.computeIfAbsent(streamNodeId, ignored -> new OperatorChainInfo(streamNodeId));
        }
        return this.getChainEntryPointsReadyForJobVertex();
    }

    private Map<Integer, OperatorChainInfo> getChainEntryPointsReadyForJobVertex() {
        HashMap<Integer, OperatorChainInfo> chainEntryPoints = new HashMap<Integer, OperatorChainInfo>();
        Iterator<Map.Entry<Integer, OperatorChainInfo>> iterator = this.pendingChainEntryPoints.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<Integer, OperatorChainInfo> entry = iterator.next();
            Integer startNodeId = entry.getKey();
            OperatorChainInfo chainInfo = entry.getValue();
            if (!this.isReadyToCreateJobVertex(chainInfo)) continue;
            chainEntryPoints.put(startNodeId, chainInfo);
            iterator.remove();
        }
        return chainEntryPoints;
    }

    private void setVertexParallelismsForDynamicGraphIfNecessary() {
        List<StreamNode> topologicallySortedStreamNodes = this.streamGraph.getStreamNodesSortedTopologicallyFromSources();
        topologicallySortedStreamNodes.forEach(streamNode -> streamNode.getOutEdges().forEach(this::tryConvertPartitionerForChainableEdge));
        topologicallySortedStreamNodes.forEach(streamNode -> {
            if (!streamNode.isParallelismConfigured() && this.streamGraph.isAutoParallelismEnabled()) {
                streamNode.setParallelism(-1, false);
            }
        });
        HashMap forwardProducersByConsumerNodeId = new HashMap();
        topologicallySortedStreamNodes.forEach(streamNode -> {
            Set forwardConsumers = streamNode.getOutEdges().stream().filter(edge -> edge.getPartitioner().getClass().equals(ForwardPartitioner.class)).map(StreamEdge::getTargetId).map(this.streamGraph::getStreamNode).collect(Collectors.toSet());
            for (StreamNode forwardConsumer : forwardConsumers) {
                forwardProducersByConsumerNodeId.compute(forwardConsumer, (ignored, producers) -> {
                    if (producers == null) {
                        producers = new HashSet<StreamNode>();
                    }
                    producers.add(streamNode);
                    return producers;
                });
            }
        });
        this.steamNodeIdToForwardGroupMap.putAll(ForwardGroupComputeUtil.computeStreamNodeForwardGroup(topologicallySortedStreamNodes, startNode -> forwardProducersByConsumerNodeId.getOrDefault(startNode, Collections.emptySet())));
        topologicallySortedStreamNodes.forEach(streamNode -> {
            StreamNodeForwardGroup forwardGroup = this.steamNodeIdToForwardGroupMap.get(streamNode.getId());
            if (forwardGroup != null && forwardGroup.isParallelismDecided()) {
                streamNode.setParallelism(forwardGroup.getParallelism(), true);
            }
            if (forwardGroup != null && forwardGroup.isMaxParallelismDecided()) {
                streamNode.setMaxParallelism(forwardGroup.getMaxParallelism());
            }
        });
    }

    private void tryConvertPartitionerForChainableEdge(StreamEdge edge) {
        StreamPartitioner<?> partitioner = edge.getPartitioner();
        if ((partitioner instanceof ForwardForConsecutiveHashPartitioner || partitioner instanceof ForwardForUnspecifiedPartitioner) && (this.streamGraph.getSourceIDs().contains(edge.getSourceId()) && StreamingJobGraphGenerator.isChainableSource(this.streamGraph.getStreamNode(edge.getSourceId()), this.streamGraph) || StreamingJobGraphGenerator.isChainable(edge, this.streamGraph))) {
            edge.setPartitioner(new ForwardPartitioner());
            if (partitioner instanceof ForwardForConsecutiveHashPartitioner && edge.getExchangeMode() == StreamExchangeMode.BATCH) {
                edge.setExchangeMode(StreamExchangeMode.UNDEFINED);
            }
            if (partitioner instanceof ForwardForUnspecifiedPartitioner) {
                edge.setIntraInputKeyCorrelated(false);
            }
        }
    }

    private void generateHashesByStreamNodeId(Integer streamNodeId) {
        if (this.hashes.containsKey(streamNodeId)) {
            return;
        }
        for (int i = 0; i < this.legacyStreamGraphHasher.size(); ++i) {
            this.legacyStreamGraphHasher.get(i).generateHashesByStreamNodeId(streamNodeId, this.streamGraph, this.legacyHashes.get(i));
        }
        Preconditions.checkState(this.defaultStreamGraphHasher.generateHashesByStreamNodeId(streamNodeId, this.streamGraph, this.hashes), "Failed to generate hash for streamNode with ID '%s'", streamNodeId);
    }

    private boolean isReadyToCreateJobVertex(OperatorChainInfo chainInfo) {
        Integer startNodeId = chainInfo.getStartNodeId();
        if (this.isNodeFrozen(startNodeId)) {
            return false;
        }
        StreamNode startNode = this.streamGraph.getStreamNode(startNodeId);
        for (StreamEdge inEdges : startNode.getInEdges()) {
            Optional<JobVertexID> upstreamJobVertex;
            Integer sourceNodeId = inEdges.getSourceId();
            if (!this.hashes.containsKey(sourceNodeId)) {
                return false;
            }
            if (chainInfo.getChainedSources().containsKey(sourceNodeId) || !(upstreamJobVertex = this.findVertexByStreamNodeId(sourceNodeId)).isEmpty() && this.finishedJobVertices.contains(upstreamJobVertex.get())) continue;
            return false;
        }
        return true;
    }

    private boolean isNodeFrozen(Integer streamNodeId) {
        return this.frozenNodeToStartNodeMap.containsKey(streamNodeId);
    }

    private Integer getStartNodeId(Integer streamNodeId) {
        return this.frozenNodeToStartNodeMap.get(streamNodeId);
    }

    private void generateStreamGraphJson() {
        this.streamGraphJson = JsonPlanGenerator.generateStreamGraphJson(this.streamGraph, this.streamNodeIdsToJobVertexMap);
    }

    public String getStreamGraphJson() {
        return this.streamGraphJson;
    }

    @Override
    public void onStreamGraphUpdated() {
        this.generateStreamGraphJson();
    }
}

