/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.util.SlotSharingGroupUtils;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobgraph.ExecutionPlanUtils;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.lineage.LineageGraph;
import org.apache.flink.streaming.api.lineage.LineageGraphUtils;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionCheckpointStorage;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionStateBackend;
import org.apache.flink.streaming.api.transformations.BroadcastStateTransformation;
import org.apache.flink.streaming.api.transformations.CacheTransformation;
import org.apache.flink.streaming.api.transformations.GlobalCommitterTransform;
import org.apache.flink.streaming.api.transformations.KeyedBroadcastStateTransformation;
import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.api.transformations.ReduceTransformation;
import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.api.transformations.SourceTransformationWrapper;
import org.apache.flink.streaming.api.transformations.StubTransformation;
import org.apache.flink.streaming.api.transformations.TimestampsAndWatermarksTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.apache.flink.streaming.api.transformations.WithBoundedness;
import org.apache.flink.streaming.runtime.translators.BroadcastStateTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.CacheTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.GlobalCommitterTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.KeyedBroadcastStateTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.LegacySinkTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.MultiInputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.OneInputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.PartitionTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.ReduceTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.SideOutputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.SinkTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.SourceTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.StubTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.TimestampsAndWatermarksTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.TwoInputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.UnionTransformationTranslator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class StreamGraphGenerator {
    private static final Logger LOG = LoggerFactory.getLogger(StreamGraphGenerator.class);
    public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 128;
    public static final String DEFAULT_STREAMING_JOB_NAME = "Flink Streaming Job";
    public static final String DEFAULT_BATCH_JOB_NAME = "Flink Batch Job";
    public static final String DEFAULT_SLOT_SHARING_GROUP = "default";
    private final List<Transformation<?>> transformations;
    private final ExecutionConfig executionConfig;
    private final CheckpointConfig checkpointConfig;
    private final Configuration configuration;
    private final Map<String, ResourceProfile> slotSharingGroupResources = new HashMap<String, ResourceProfile>();
    private SavepointRestoreSettings savepointRestoreSettings;
    private boolean shouldExecuteInBatchMode;
    private static final Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>> translatorMap;
    protected static Integer iterationIdCounter;
    private StreamGraph streamGraph;
    private Map<Transformation<?>, Collection<Integer>> alreadyTransformed;

    public static int getNewIterationNodeId() {
        Integer n = iterationIdCounter;
        Integer n2 = iterationIdCounter = Integer.valueOf(iterationIdCounter - 1);
        return iterationIdCounter;
    }

    public StreamGraphGenerator(List<Transformation<?>> transformations, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig) {
        this(transformations, executionConfig, checkpointConfig, new Configuration());
    }

    public StreamGraphGenerator(List<Transformation<?>> transformations, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig, Configuration configuration) {
        this.transformations = Preconditions.checkNotNull(transformations);
        this.executionConfig = Preconditions.checkNotNull(executionConfig);
        this.checkpointConfig = new CheckpointConfig(checkpointConfig);
        this.configuration = Preconditions.checkNotNull(configuration);
        this.savepointRestoreSettings = SavepointRestoreSettings.fromConfiguration(configuration);
    }

    public StreamGraphGenerator setSlotSharingGroupResource(Map<String, ResourceProfile> slotSharingGroupResources) {
        slotSharingGroupResources.forEach((name, profile) -> {
            if (!profile.equals(ResourceProfile.UNKNOWN)) {
                this.slotSharingGroupResources.put((String)name, (ResourceProfile)profile);
            }
        });
        return this;
    }

    public void setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) {
        this.savepointRestoreSettings = savepointRestoreSettings;
    }

    public StreamGraph generate() {
        this.streamGraph = new StreamGraph(this.configuration, this.executionConfig, this.checkpointConfig, this.savepointRestoreSettings);
        this.shouldExecuteInBatchMode = this.shouldExecuteInBatchMode();
        this.configureStreamGraph(this.streamGraph);
        this.alreadyTransformed = new IdentityHashMap();
        for (Transformation<?> transformation : this.transformations) {
            this.transform(transformation);
        }
        this.streamGraph.setSlotSharingGroupResource(this.slotSharingGroupResources);
        this.setFineGrainedGlobalStreamExchangeMode(this.streamGraph);
        LineageGraph lineageGraph = LineageGraphUtils.convertToLineageGraph(this.transformations);
        this.streamGraph.setLineageGraph(lineageGraph);
        for (StreamNode streamNode : this.streamGraph.getStreamNodes()) {
            if (!streamNode.getInEdges().stream().anyMatch(e -> !e.getPartitioner().isSupportsUnalignedCheckpoint())) continue;
            for (StreamEdge edge : streamNode.getInEdges()) {
                edge.setSupportsUnalignedCheckpoints(false);
            }
        }
        Map<String, DistributedCache.DistributedCacheEntry> map = ExecutionPlanUtils.prepareUserArtifactEntries(((List)Optional.ofNullable(this.configuration.get(PipelineOptions.CACHED_FILES)).map(DistributedCache::parseCachedFilesFromString).orElse(new ArrayList())).stream().collect(Collectors.toMap(e -> (String)e.f0, e -> (DistributedCache.DistributedCacheEntry)e.f1)), this.streamGraph.getJobID());
        for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : map.entrySet()) {
            this.streamGraph.addUserArtifact(entry.getKey(), entry.getValue());
        }
        this.streamGraph.serializeAndSaveWatermarkDeclarations();
        StreamGraph streamGraph = this.streamGraph;
        this.alreadyTransformed.clear();
        this.alreadyTransformed = null;
        this.streamGraph = null;
        return streamGraph;
    }

    private void setDynamic(StreamGraph graph) {
        Optional<JobManagerOptions.SchedulerType> schedulerTypeOptional = this.executionConfig.getSchedulerType();
        boolean dynamic = this.shouldExecuteInBatchMode && schedulerTypeOptional.orElse(JobManagerOptions.SchedulerType.AdaptiveBatch) == JobManagerOptions.SchedulerType.AdaptiveBatch;
        graph.setDynamic(dynamic);
    }

    private void configureStreamGraph(StreamGraph graph) {
        Preconditions.checkNotNull(graph);
        graph.setVertexDescriptionMode(this.configuration.get(PipelineOptions.VERTEX_DESCRIPTION_MODE));
        graph.setVertexNameIncludeIndexPrefix(this.configuration.get(PipelineOptions.VERTEX_NAME_INCLUDE_INDEX_PREFIX));
        graph.setAutoParallelismEnabled(this.configuration.get(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_ENABLED));
        graph.setEnableCheckpointsAfterTasksFinish(this.configuration.get(CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH));
        this.setDynamic(graph);
        if (this.shouldExecuteInBatchMode) {
            this.configureStreamGraphBatch(graph);
            this.configuration.set((ConfigOption)ExecutionOptions.BUFFER_TIMEOUT_ENABLED, (Object)false);
        } else {
            this.configureStreamGraphStreaming(graph);
        }
    }

    private void configureStreamGraphBatch(StreamGraph graph) {
        graph.setJobType(JobType.BATCH);
        graph.setJobName(this.deriveJobName(DEFAULT_BATCH_JOB_NAME));
        if (this.checkpointConfig.isCheckpointingEnabled()) {
            LOG.info("Disabled Checkpointing. Checkpointing is not supported and not needed when executing jobs in BATCH mode.");
            this.checkpointConfig.disableCheckpointing();
        }
        this.setBatchStateBackendAndTimerService(graph);
        graph.setGlobalStreamExchangeMode(this.deriveGlobalStreamExchangeModeBatch());
        graph.setAllVerticesInSameSlotSharingGroupByDefault(false);
    }

    private void configureStreamGraphStreaming(StreamGraph graph) {
        graph.setJobType(JobType.STREAMING);
        graph.setJobName(this.deriveJobName(DEFAULT_STREAMING_JOB_NAME));
        graph.setGlobalStreamExchangeMode(this.deriveGlobalStreamExchangeModeStreaming());
    }

    private String deriveJobName(String defaultJobName) {
        return this.configuration.getOptional(PipelineOptions.NAME).orElse(defaultJobName);
    }

    private GlobalStreamExchangeMode deriveGlobalStreamExchangeModeBatch() {
        BatchShuffleMode shuffleMode = this.configuration.get(ExecutionOptions.BATCH_SHUFFLE_MODE);
        switch (shuffleMode) {
            case ALL_EXCHANGES_PIPELINED: {
                return GlobalStreamExchangeMode.ALL_EDGES_PIPELINED;
            }
            case ALL_EXCHANGES_BLOCKING: {
                return GlobalStreamExchangeMode.ALL_EDGES_BLOCKING;
            }
            case ALL_EXCHANGES_HYBRID_FULL: {
                return GlobalStreamExchangeMode.ALL_EDGES_HYBRID_FULL;
            }
            case ALL_EXCHANGES_HYBRID_SELECTIVE: {
                return GlobalStreamExchangeMode.ALL_EDGES_HYBRID_SELECTIVE;
            }
        }
        throw new IllegalArgumentException(String.format("Unsupported shuffle mode '%s' in BATCH runtime mode.", shuffleMode.toString()));
    }

    private GlobalStreamExchangeMode deriveGlobalStreamExchangeModeStreaming() {
        if (this.checkpointConfig.isApproximateLocalRecoveryEnabled()) {
            this.checkApproximateLocalRecoveryCompatibility();
            return GlobalStreamExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE;
        }
        return GlobalStreamExchangeMode.ALL_EDGES_PIPELINED;
    }

    private void checkApproximateLocalRecoveryCompatibility() {
        Preconditions.checkState(!this.checkpointConfig.isUnalignedCheckpointsEnabled(), "Approximate Local Recovery and Unaligned Checkpoint can not be used together yet");
    }

    private void setBatchStateBackendAndTimerService(StreamGraph graph) {
        boolean useStateBackend = this.configuration.get(ExecutionOptions.USE_BATCH_STATE_BACKEND);
        boolean sortInputs = this.configuration.get(ExecutionOptions.SORT_INPUTS);
        Preconditions.checkState(!useStateBackend || sortInputs, "Batch state backend requires the sorted inputs to be enabled!");
        if (useStateBackend) {
            LOG.debug("Using BATCH execution state backend and timer service.");
            graph.setStateBackend(new BatchExecutionStateBackend());
            graph.getJobConfiguration().set((ConfigOption)StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, (Object)false);
            graph.setCheckpointStorage(new BatchExecutionCheckpointStorage());
            graph.setTimerServiceProvider(BatchExecutionInternalTimeServiceManager::create);
            graph.createJobCheckpointingSettings();
        }
    }

    private void setFineGrainedGlobalStreamExchangeMode(StreamGraph graph) {
        if (this.shouldExecuteInBatchMode && graph.hasFineGrainedResource()) {
            graph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
        }
    }

    private boolean shouldExecuteInBatchMode() {
        RuntimeExecutionMode configuredMode = this.configuration.get(ExecutionOptions.RUNTIME_MODE);
        boolean existsUnboundedSource = this.existsUnboundedSource();
        Preconditions.checkState(configuredMode != RuntimeExecutionMode.BATCH || !existsUnboundedSource, "Detected an UNBOUNDED source with the '" + ExecutionOptions.RUNTIME_MODE.key() + "' set to 'BATCH'. This combination is not allowed, please set the '" + ExecutionOptions.RUNTIME_MODE.key() + "' to STREAMING or AUTOMATIC");
        if (Preconditions.checkNotNull(configuredMode) != RuntimeExecutionMode.AUTOMATIC) {
            return configuredMode == RuntimeExecutionMode.BATCH;
        }
        return !existsUnboundedSource;
    }

    private boolean existsUnboundedSource() {
        return this.transformations.stream().anyMatch(transformation -> this.isUnboundedSource((Transformation<?>)transformation) || transformation.getTransitivePredecessors().stream().anyMatch(this::isUnboundedSource));
    }

    private boolean isUnboundedSource(Transformation<?> transformation) {
        Preconditions.checkNotNull(transformation);
        return transformation instanceof WithBoundedness && ((WithBoundedness)((Object)transformation)).getBoundedness() != Boundedness.BOUNDED;
    }

    private Collection<Integer> transform(Transformation<?> transform) {
        int globalMaxParallelismFromConfig;
        if (this.alreadyTransformed.containsKey(transform)) {
            return this.alreadyTransformed.get(transform);
        }
        LOG.debug("Transforming " + String.valueOf(transform));
        if (transform.getMaxParallelism() <= 0 && (globalMaxParallelismFromConfig = this.executionConfig.getMaxParallelism()) > 0) {
            transform.setMaxParallelism(globalMaxParallelismFromConfig);
        }
        transform.getSlotSharingGroup().ifPresent(slotSharingGroup -> {
            ResourceSpec resourceSpec = SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup);
            if (!resourceSpec.equals(ResourceSpec.UNKNOWN)) {
                this.slotSharingGroupResources.compute(slotSharingGroup.getName(), (name, profile) -> {
                    if (profile == null) {
                        return ResourceProfile.fromResourceSpec(resourceSpec, MemorySize.ZERO);
                    }
                    if (!ResourceProfile.fromResourceSpec(resourceSpec, MemorySize.ZERO).equals(profile)) {
                        throw new IllegalArgumentException("The slot sharing group " + slotSharingGroup.getName() + " has been configured with two different resource spec.");
                    }
                    return profile;
                });
            }
        });
        transform.getOutputType();
        TransformationTranslator<?, ? extends Transformation> translator = translatorMap.get(transform.getClass());
        Collection<Integer> transformedIds = translator != null ? this.translate(translator, transform) : this.legacyTransform(transform);
        if (!this.alreadyTransformed.containsKey(transform)) {
            this.alreadyTransformed.put(transform, transformedIds);
        }
        return transformedIds;
    }

    private Collection<Integer> legacyTransform(Transformation<?> transform) {
        if (!(transform instanceof SourceTransformationWrapper)) {
            throw new IllegalStateException("Unknown transformation: " + String.valueOf(transform));
        }
        Collection<Integer> transformedIds = this.transform(((SourceTransformationWrapper)transform).getInput());
        if (transform.getBufferTimeout() >= 0L) {
            this.streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
        } else {
            this.streamGraph.setBufferTimeout(transform.getId(), this.getBufferTimeout());
        }
        if (transform.getUid() != null) {
            this.streamGraph.setTransformationUID(transform.getId(), transform.getUid());
        }
        if (transform.getUserProvidedNodeHash() != null) {
            this.streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
        }
        if (!this.streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled() && transform instanceof PhysicalTransformation && transform.getUserProvidedNodeHash() == null && transform.getUid() == null) {
            throw new IllegalStateException("Auto generated UIDs have been disabled but no UID or hash has been assigned to operator " + transform.getName());
        }
        if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
            this.streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
        }
        this.streamGraph.setManagedMemoryUseCaseWeights(transform.getId(), transform.getManagedMemoryOperatorScopeUseCaseWeights(), transform.getManagedMemorySlotScopeUseCases());
        return transformedIds;
    }

    private long getBufferTimeout() {
        return this.configuration.get(ExecutionOptions.BUFFER_TIMEOUT_ENABLED) != false ? this.configuration.get(ExecutionOptions.BUFFER_TIMEOUT).toMillis() : -1L;
    }

    private Collection<Integer> translate(TransformationTranslator<?, Transformation<?>> translator, Transformation<?> transform) {
        Preconditions.checkNotNull(translator);
        Preconditions.checkNotNull(transform);
        List<Collection<Integer>> allInputIds = this.getParentInputIds(transform.getInputs());
        if (this.alreadyTransformed.containsKey(transform)) {
            return this.alreadyTransformed.get(transform);
        }
        String slotSharingGroup = this.determineSlotSharingGroup(transform.getSlotSharingGroup().isPresent() ? transform.getSlotSharingGroup().get().getName() : null, allInputIds.stream().flatMap(Collection::stream).collect(Collectors.toList()));
        ContextImpl context = new ContextImpl(this, this.streamGraph, slotSharingGroup, this.configuration, this.transformations);
        return this.shouldExecuteInBatchMode ? translator.translateForBatch(transform, context) : translator.translateForStreaming(transform, context);
    }

    private List<Collection<Integer>> getParentInputIds(@Nullable Collection<Transformation<?>> parentTransformations) {
        ArrayList<Collection<Integer>> allInputIds = new ArrayList<Collection<Integer>>();
        if (parentTransformations == null) {
            return allInputIds;
        }
        for (Transformation<?> transformation : parentTransformations) {
            allInputIds.add(this.transform(transformation));
        }
        return allInputIds;
    }

    private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) {
        if (specifiedGroup != null) {
            return specifiedGroup;
        }
        String inputGroup = null;
        for (int id : inputIds) {
            String inputGroupCandidate = this.streamGraph.getSlotSharingGroup(id);
            if (inputGroup == null) {
                inputGroup = inputGroupCandidate;
                continue;
            }
            if (inputGroup.equals(inputGroupCandidate)) continue;
            return DEFAULT_SLOT_SHARING_GROUP;
        }
        return inputGroup == null ? DEFAULT_SLOT_SHARING_GROUP : inputGroup;
    }

    static {
        HashMap<Class, TransformationTranslator<Object, Object>> tmp = new HashMap<Class, TransformationTranslator<Object, Object>>();
        tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator());
        tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator());
        tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator());
        tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator());
        tmp.put(SourceTransformation.class, new SourceTransformationTranslator());
        tmp.put(SinkTransformation.class, new SinkTransformationTranslator());
        tmp.put(GlobalCommitterTransform.class, new GlobalCommitterTransformationTranslator());
        tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator());
        tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator());
        tmp.put(UnionTransformation.class, new UnionTransformationTranslator());
        tmp.put(StubTransformation.class, new StubTransformationTranslator());
        tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator());
        tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator());
        tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator());
        tmp.put(TimestampsAndWatermarksTransformation.class, new TimestampsAndWatermarksTransformationTranslator());
        tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator());
        tmp.put(KeyedBroadcastStateTransformation.class, new KeyedBroadcastStateTransformationTranslator());
        tmp.put(CacheTransformation.class, new CacheTransformationTranslator());
        translatorMap = Collections.unmodifiableMap(tmp);
        iterationIdCounter = 0;
    }

    private static class ContextImpl
    implements TransformationTranslator.Context {
        private final StreamGraphGenerator streamGraphGenerator;
        private final StreamGraph streamGraph;
        private final String slotSharingGroup;
        private final ReadableConfig config;
        private final Collection<Transformation<?>> transformations;

        public ContextImpl(StreamGraphGenerator streamGraphGenerator, StreamGraph streamGraph, String slotSharingGroup, ReadableConfig config, Collection<Transformation<?>> transformations) {
            this.streamGraphGenerator = Preconditions.checkNotNull(streamGraphGenerator);
            this.streamGraph = Preconditions.checkNotNull(streamGraph);
            this.slotSharingGroup = Preconditions.checkNotNull(slotSharingGroup);
            this.config = Preconditions.checkNotNull(config);
            this.transformations = Preconditions.checkNotNull(transformations, "transformations must not be null");
        }

        @Override
        public StreamGraph getStreamGraph() {
            return this.streamGraph;
        }

        @Override
        public Collection<Integer> getStreamNodeIds(Transformation<?> transformation) {
            Preconditions.checkNotNull(transformation);
            Collection<Integer> ids = this.streamGraphGenerator.alreadyTransformed.get(transformation);
            Preconditions.checkState(ids != null, "Parent transformation \"" + String.valueOf(transformation) + "\" has not been transformed.");
            return ids;
        }

        @Override
        public String getSlotSharingGroup() {
            return this.slotSharingGroup;
        }

        @Override
        public long getDefaultBufferTimeout() {
            return this.streamGraphGenerator.getBufferTimeout();
        }

        @Override
        public ReadableConfig getGraphGeneratorConfig() {
            return this.config;
        }

        @Override
        public Collection<Integer> transform(Transformation<?> transformation) {
            return this.streamGraphGenerator.transform(transformation);
        }

        @Override
        public Collection<Transformation<?>> getSinkTransformations() {
            return this.transformations;
        }
    }
}

