/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.engine.server.dag.physical;

import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
import org.apache.seatunnel.api.options.EnvCommonOptions;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleMultipleRowStrategy;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.internal.IntermediateQueue;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
import org.apache.seatunnel.engine.server.dag.execution.ExecutionEdge;
import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlan;
import org.apache.seatunnel.engine.server.dag.execution.Pipeline;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
import org.apache.seatunnel.engine.server.dag.physical.config.IntermediateQueueConfig;
import org.apache.seatunnel.engine.server.dag.physical.config.SinkConfig;
import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
import org.apache.seatunnel.engine.server.dag.physical.flow.Flow;
import org.apache.seatunnel.engine.server.dag.physical.flow.IntermediateExecutionFlow;
import org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
import org.apache.seatunnel.engine.server.dag.physical.flow.UnknownFlowException;
import org.apache.seatunnel.engine.server.execution.Task;
import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
import org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask;
import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
import org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask;
import org.apache.seatunnel.engine.server.task.group.AbstractTaskGroupWithIntermediateQueue;
import org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateBlockingQueue;
import org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateDisruptor;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;

public class PhysicalPlanGenerator {
    private final List<Pipeline> pipelines;
    private final IdGenerator taskGroupIdGenerator = new IdGenerator();
    private final JobImmutableInformation jobImmutableInformation;
    private final long initializationTimestamp;
    private final ExecutorService executorService;
    private final ClassLoaderService classLoaderService;
    private final NodeEngine nodeEngine;
    private final FlakeIdGenerator flakeIdGenerator;
    private final Map<SourceAction<?, ?, ?>, TaskLocation> enumeratorTaskIDMap = new HashMap();
    private final Map<SinkAction<?, ?, ?, ?>, TaskLocation> committerTaskIDMap = new HashMap();
    private final Set<TaskLocation> pipelineTasks;
    private final Set<TaskLocation> startingTasks;
    private final Map<TaskLocation, Set<Tuple2<ActionStateKey, Integer>>> subtaskActions;
    private final IMap<Object, Object> runningJobStateIMap;
    private final IMap<Object, Object> runningJobStateTimestampsIMap;
    private final QueueType queueType;

    public PhysicalPlanGenerator(@NonNull ExecutionPlan executionPlan, @NonNull NodeEngine nodeEngine, @NonNull JobImmutableInformation jobImmutableInformation, long initializationTimestamp, @NonNull ExecutorService executorService, @NonNull ClassLoaderService classLoaderService, @NonNull FlakeIdGenerator flakeIdGenerator, @NonNull IMap runningJobStateIMap, @NonNull IMap runningJobStateTimestampsIMap, @NonNull QueueType queueType) {
        if (executionPlan == null) {
            throw new NullPointerException("executionPlan is marked non-null but is null");
        }
        if (nodeEngine == null) {
            throw new NullPointerException("nodeEngine is marked non-null but is null");
        }
        if (jobImmutableInformation == null) {
            throw new NullPointerException("jobImmutableInformation is marked non-null but is null");
        }
        if (executorService == null) {
            throw new NullPointerException("executorService is marked non-null but is null");
        }
        if (classLoaderService == null) {
            throw new NullPointerException("classLoaderService is marked non-null but is null");
        }
        if (flakeIdGenerator == null) {
            throw new NullPointerException("flakeIdGenerator is marked non-null but is null");
        }
        if (runningJobStateIMap == null) {
            throw new NullPointerException("runningJobStateIMap is marked non-null but is null");
        }
        if (runningJobStateTimestampsIMap == null) {
            throw new NullPointerException("runningJobStateTimestampsIMap is marked non-null but is null");
        }
        if (queueType == null) {
            throw new NullPointerException("queueType is marked non-null but is null");
        }
        this.pipelines = executionPlan.getPipelines();
        this.nodeEngine = nodeEngine;
        this.jobImmutableInformation = jobImmutableInformation;
        this.initializationTimestamp = initializationTimestamp;
        this.executorService = executorService;
        this.classLoaderService = classLoaderService;
        this.flakeIdGenerator = flakeIdGenerator;
        this.pipelineTasks = new HashSet<TaskLocation>();
        this.startingTasks = new HashSet<TaskLocation>();
        this.subtaskActions = new HashMap<TaskLocation, Set<Tuple2<ActionStateKey, Integer>>>();
        this.runningJobStateIMap = runningJobStateIMap;
        this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
        this.queueType = queueType;
    }

    public Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> generate() {
        Map tagFilter = (Map)this.jobImmutableInformation.getJobConfig().getEnvOptions().get(EnvCommonOptions.NODE_TAG_FILTER.key());
        CopyOnWriteArrayList waitForCompleteBySubPlanList = new CopyOnWriteArrayList();
        HashMap checkpointPlans = new HashMap();
        int totalPipelineNum = this.pipelines.size();
        Stream<SubPlan> subPlanStream = this.pipelines.stream().map(pipeline -> {
            this.pipelineTasks.clear();
            this.startingTasks.clear();
            this.subtaskActions.clear();
            int pipelineId = pipeline.getId();
            List<ExecutionEdge> edges = pipeline.getEdges();
            List<SourceAction<?, ?, ?>> sources = this.findSourceAction(edges);
            List<PhysicalVertex> coordinatorVertexList = this.getEnumeratorTask(sources, pipelineId, totalPipelineNum);
            coordinatorVertexList.addAll(this.getCommitterTask(edges, pipelineId, totalPipelineNum));
            List<PhysicalVertex> physicalVertexList = this.getSourceTask(edges, sources, pipelineId, totalPipelineNum);
            physicalVertexList.addAll(this.getShuffleTask(edges, pipelineId, totalPipelineNum));
            CompletableFuture pipelineFuture = new CompletableFuture();
            waitForCompleteBySubPlanList.add(new PassiveCompletableFuture(pipelineFuture));
            checkpointPlans.put(pipelineId, CheckpointPlan.builder().pipelineId(pipelineId).pipelineSubtasks(this.pipelineTasks).startingSubtasks(this.startingTasks).pipelineActions(pipeline.getActions()).subtaskActions(this.subtaskActions).build());
            return new SubPlan(pipelineId, totalPipelineNum, this.initializationTimestamp, physicalVertexList, coordinatorVertexList, this.jobImmutableInformation, this.executorService, this.runningJobStateIMap, this.runningJobStateTimestampsIMap, tagFilter);
        });
        PhysicalPlan physicalPlan = new PhysicalPlan(subPlanStream.collect(Collectors.toList()), this.executorService, this.jobImmutableInformation, this.initializationTimestamp, this.runningJobStateIMap, this.runningJobStateTimestampsIMap);
        return Tuple2.tuple2((Object)physicalPlan, checkpointPlans);
    }

    private List<SourceAction<?, ?, ?>> findSourceAction(List<ExecutionEdge> edges) {
        return edges.stream().filter(s -> s.getLeftVertex().getAction() instanceof SourceAction).map(s -> (SourceAction)s.getLeftVertex().getAction()).distinct().collect(Collectors.toList());
    }

    private List<PhysicalVertex> getCommitterTask(List<ExecutionEdge> edges, int pipelineIndex, int totalPipelineNum) {
        AtomicInteger atomicInteger = new AtomicInteger(-1);
        List collect = edges.stream().filter(s -> s.getRightVertex().getAction() instanceof SinkAction).collect(Collectors.toList());
        return collect.stream().map(s -> (SinkAction)s.getRightVertex().getAction()).map(sinkAction -> {
            Optional sinkAggregatedCommitter;
            ClassLoader appClassLoader = Thread.currentThread().getContextClassLoader();
            try {
                ClassLoader classLoader = this.classLoaderService.getClassLoader(this.jobImmutableInformation.getJobId(), (Collection)sinkAction.getJarUrls());
                Thread.currentThread().setContextClassLoader(classLoader);
                sinkAggregatedCommitter = sinkAction.getSink().createAggregatedCommitter();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            finally {
                Thread.currentThread().setContextClassLoader(appClassLoader);
                this.classLoaderService.releaseClassLoader(this.jobImmutableInformation.getJobId(), (Collection)sinkAction.getJarUrls());
            }
            if (sinkAggregatedCommitter.isPresent()) {
                long taskGroupID = this.taskGroupIdGenerator.getNextId();
                TaskGroupLocation taskGroupLocation = new TaskGroupLocation(this.jobImmutableInformation.getJobId(), pipelineIndex, taskGroupID);
                TaskLocation taskLocation = new TaskLocation(taskGroupLocation, 0L, 0);
                SinkAggregatedCommitterTask t = new SinkAggregatedCommitterTask(this.jobImmutableInformation.getJobId(), taskLocation, sinkAction, (SinkAggregatedCommitter)sinkAggregatedCommitter.get());
                this.committerTaskIDMap.put((SinkAction<?, ?, ?, ?>)sinkAction, taskLocation);
                this.pipelineTasks.add(taskLocation);
                this.subtaskActions.put(taskLocation, Collections.singleton(Tuple2.tuple2((Object)ActionStateKey.of((Action)sinkAction), (Object)-1)));
                return new PhysicalVertex(atomicInteger.incrementAndGet(), collect.size(), new TaskGroupDefaultImpl(taskGroupLocation, sinkAction.getName() + "-AggregatedCommitterTask", Lists.newArrayList((Object[])new Task[]{t})), this.flakeIdGenerator, pipelineIndex, totalPipelineNum, Collections.singletonList(sinkAction.getJarUrls()), Collections.singletonList(sinkAction.getConnectorJarIdentifiers()), this.jobImmutableInformation, this.initializationTimestamp, this.nodeEngine, this.runningJobStateIMap, this.runningJobStateTimestampsIMap);
            }
            return null;
        }).filter(Objects::nonNull).collect(Collectors.toList());
    }

    private List<PhysicalVertex> getShuffleTask(List<ExecutionEdge> edges, int pipelineIndex, int totalPipelineNum) {
        return edges.stream().filter(s -> s.getLeftVertex().getAction() instanceof ShuffleAction).map(q -> (ShuffleAction)q.getLeftVertex().getAction()).collect(Collectors.toSet()).stream().map(q -> new PhysicalExecutionFlow((ShuffleAction)q, this.getNextWrapper(edges, (Action)q))).flatMap(flow -> {
            ArrayList<PhysicalVertex> physicalVertices = new ArrayList<PhysicalVertex>();
            ShuffleAction shuffleAction = (ShuffleAction)flow.getAction();
            ShuffleConfig shuffleConfig = shuffleAction.getConfig();
            ShuffleStrategy shuffleStrategy = shuffleConfig.getShuffleStrategy();
            if (shuffleStrategy instanceof ShuffleMultipleRowStrategy) {
                ShuffleMultipleRowStrategy shuffleMultipleRowStrategy = (ShuffleMultipleRowStrategy)shuffleStrategy;
                AtomicInteger atomicInteger = new AtomicInteger(0);
                for (Flow nextFlow : flow.getNext()) {
                    PhysicalExecutionFlow sinkFlow = (PhysicalExecutionFlow)nextFlow;
                    SinkAction sinkAction = (SinkAction)sinkFlow.getAction();
                    String sinkTableId = sinkAction.getConfig().getTablePath().toString();
                    int parallelismIndex = atomicInteger.getAndIncrement();
                    ShuffleMultipleRowStrategy shuffleStrategyOfSinkFlow = shuffleMultipleRowStrategy.toBuilder().targetTableId(sinkTableId).build();
                    ShuffleConfig shuffleConfigOfSinkFlow = shuffleConfig.toBuilder().shuffleStrategy((ShuffleStrategy)shuffleStrategyOfSinkFlow).build();
                    String shuffleActionName = String.format("%s -> %s -> %s", shuffleAction.getName(), sinkTableId, sinkAction.getName());
                    ShuffleAction shuffleActionOfSinkFlow = new ShuffleAction((long)parallelismIndex, shuffleActionName, shuffleConfigOfSinkFlow);
                    shuffleActionOfSinkFlow.setParallelism(1);
                    PhysicalExecutionFlow shuffleFlow = new PhysicalExecutionFlow(shuffleActionOfSinkFlow, Collections.singletonList(sinkFlow));
                    this.setFlowConfig(shuffleFlow);
                    long taskGroupID = this.taskGroupIdGenerator.getNextId();
                    TaskGroupLocation taskGroupLocation = new TaskGroupLocation(this.jobImmutableInformation.getJobId(), pipelineIndex, taskGroupID);
                    TaskLocation taskLocation = new TaskLocation(taskGroupLocation, 0L, parallelismIndex);
                    TransformSeaTunnelTask seaTunnelTask = new TransformSeaTunnelTask(this.jobImmutableInformation.getJobId(), taskLocation, parallelismIndex, shuffleFlow);
                    this.fillCheckpointPlan(seaTunnelTask);
                    physicalVertices.add(new PhysicalVertex(parallelismIndex, shuffleFlow.getAction().getParallelism(), new TaskGroupDefaultImpl(taskGroupLocation, shuffleFlow.getAction().getName() + "-ShuffleTask", Collections.singletonList(seaTunnelTask)), this.flakeIdGenerator, pipelineIndex, totalPipelineNum, Collections.singletonList(seaTunnelTask.getJarsUrl()), Collections.singletonList(seaTunnelTask.getConnectorPluginJars()), this.jobImmutableInformation, this.initializationTimestamp, this.nodeEngine, this.runningJobStateIMap, this.runningJobStateTimestampsIMap));
                }
            } else {
                for (int i = 0; i < flow.getAction().getParallelism(); ++i) {
                    long taskGroupID = this.taskGroupIdGenerator.getNextId();
                    TaskGroupLocation taskGroupLocation = new TaskGroupLocation(this.jobImmutableInformation.getJobId(), pipelineIndex, taskGroupID);
                    TaskLocation taskLocation = new TaskLocation(taskGroupLocation, 0L, i);
                    this.setFlowConfig((Flow)flow);
                    TransformSeaTunnelTask seaTunnelTask = new TransformSeaTunnelTask(this.jobImmutableInformation.getJobId(), taskLocation, i, (Flow)flow);
                    this.fillCheckpointPlan(seaTunnelTask);
                    physicalVertices.add(new PhysicalVertex(i, flow.getAction().getParallelism(), new TaskGroupDefaultImpl(taskGroupLocation, flow.getAction().getName() + "-ShuffleTask", Lists.newArrayList((Object[])new Task[]{seaTunnelTask})), this.flakeIdGenerator, pipelineIndex, totalPipelineNum, Collections.singletonList(seaTunnelTask.getJarsUrl()), Collections.singletonList(seaTunnelTask.getConnectorPluginJars()), this.jobImmutableInformation, this.initializationTimestamp, this.nodeEngine, this.runningJobStateIMap, this.runningJobStateTimestampsIMap));
                }
            }
            return physicalVertices.stream();
        }).collect(Collectors.toList());
    }

    private List<PhysicalVertex> getEnumeratorTask(List<SourceAction<?, ?, ?>> sources, int pipelineIndex, int totalPipelineNum) {
        AtomicInteger atomicInteger = new AtomicInteger(-1);
        return sources.stream().map(sourceAction -> {
            long taskGroupID = this.taskGroupIdGenerator.getNextId();
            TaskGroupLocation taskGroupLocation = new TaskGroupLocation(this.jobImmutableInformation.getJobId(), pipelineIndex, taskGroupID);
            TaskLocation taskLocation = new TaskLocation(taskGroupLocation, 0L, 0);
            SourceSplitEnumeratorTask t = new SourceSplitEnumeratorTask(this.jobImmutableInformation.getJobId(), taskLocation, sourceAction);
            this.pipelineTasks.add(taskLocation);
            this.startingTasks.add(taskLocation);
            this.subtaskActions.put(taskLocation, Collections.singleton(Tuple2.tuple2((Object)ActionStateKey.of((Action)sourceAction), (Object)-1)));
            this.enumeratorTaskIDMap.put((SourceAction<?, ?, ?>)sourceAction, taskLocation);
            return new PhysicalVertex(atomicInteger.incrementAndGet(), sources.size(), new TaskGroupDefaultImpl(taskGroupLocation, sourceAction.getName() + "-SplitEnumerator", Lists.newArrayList((Object[])new Task[]{t})), this.flakeIdGenerator, pipelineIndex, totalPipelineNum, Collections.singletonList(t.getJarsUrl()), Collections.singletonList(t.getConnectorPluginJars()), this.jobImmutableInformation, this.initializationTimestamp, this.nodeEngine, this.runningJobStateIMap, this.runningJobStateTimestampsIMap);
        }).collect(Collectors.toList());
    }

    private List<PhysicalVertex> getSourceTask(List<ExecutionEdge> edges, List<SourceAction<?, ?, ?>> sources, int pipelineIndex, int totalPipelineNum) {
        return sources.stream().map(s -> new PhysicalExecutionFlow((SourceAction)s, this.getNextWrapper(edges, (Action)s))).flatMap(flow -> {
            ArrayList<PhysicalVertex> t = new ArrayList<PhysicalVertex>();
            ArrayList<PhysicalExecutionFlow> flows = new ArrayList<PhysicalExecutionFlow>(Collections.singletonList(flow));
            if (PhysicalPlanGenerator.sourceWithSink(flow)) {
                flows.addAll(PhysicalPlanGenerator.splitSinkFromFlow(flow));
            }
            for (int i = 0; i < flow.getAction().getParallelism(); ++i) {
                long taskGroupId = this.taskGroupIdGenerator.getNextId();
                int finalParallelismIndex = i;
                TaskGroupLocation taskGroupLocation = new TaskGroupLocation(this.jobImmutableInformation.getJobId(), pipelineIndex, taskGroupId);
                AtomicInteger taskInTaskGroupIndex = new AtomicInteger(0);
                List taskList = flows.stream().map(f -> {
                    this.setFlowConfig((Flow)f);
                    TaskLocation taskLocation = new TaskLocation(taskGroupLocation, taskInTaskGroupIndex.getAndIncrement(), finalParallelismIndex);
                    if (f instanceof PhysicalExecutionFlow) {
                        return new SourceSeaTunnelTask(this.jobImmutableInformation.getJobId(), taskLocation, finalParallelismIndex, (PhysicalExecutionFlow)f, this.jobImmutableInformation.getJobConfig().getEnvOptions());
                    }
                    return new TransformSeaTunnelTask(this.jobImmutableInformation.getJobId(), taskLocation, finalParallelismIndex, (Flow)f);
                }).peek(this::fillCheckpointPlan).collect(Collectors.toList());
                List<Set<URL>> jars = taskList.stream().map(SeaTunnelTask::getJarsUrl).collect(Collectors.toList());
                List<Set<ConnectorJarIdentifier>> jarIdentifiers = taskList.stream().map(SeaTunnelTask::getConnectorPluginJars).collect(Collectors.toList());
                if (taskList.stream().anyMatch(TransformSeaTunnelTask.class::isInstance)) {
                    AbstractTaskGroupWithIntermediateQueue taskGroup = this.queueType.equals((Object)QueueType.BLOCKINGQUEUE) ? new TaskGroupWithIntermediateBlockingQueue(taskGroupLocation, flow.getAction().getName() + "-SourceTask", taskList.stream().map(task -> task).collect(Collectors.toList())) : new TaskGroupWithIntermediateDisruptor(taskGroupLocation, flow.getAction().getName() + "-SourceTask", taskList.stream().map(task -> task).collect(Collectors.toList()));
                    t.add(new PhysicalVertex(i, flow.getAction().getParallelism(), taskGroup, this.flakeIdGenerator, pipelineIndex, totalPipelineNum, jars, jarIdentifiers, this.jobImmutableInformation, this.initializationTimestamp, this.nodeEngine, this.runningJobStateIMap, this.runningJobStateTimestampsIMap));
                    continue;
                }
                t.add(new PhysicalVertex(i, flow.getAction().getParallelism(), new TaskGroupDefaultImpl(taskGroupLocation, flow.getAction().getName() + "-SourceTask", taskList.stream().map(task -> task).collect(Collectors.toList())), this.flakeIdGenerator, pipelineIndex, totalPipelineNum, jars, jarIdentifiers, this.jobImmutableInformation, this.initializationTimestamp, this.nodeEngine, this.runningJobStateIMap, this.runningJobStateTimestampsIMap));
            }
            return t.stream();
        }).collect(Collectors.toList());
    }

    private void fillCheckpointPlan(SeaTunnelTask task) {
        this.pipelineTasks.add(task.getTaskLocation());
        this.subtaskActions.put(task.getTaskLocation(), task.getActionStateKeys().stream().map(stateKey -> Tuple2.tuple2((Object)stateKey, (Object)task.getTaskLocation().getTaskIndex())).collect(Collectors.toSet()));
    }

    private void setFlowConfig(Flow f) {
        if (f instanceof PhysicalExecutionFlow) {
            PhysicalExecutionFlow flow = (PhysicalExecutionFlow)f;
            if (flow.getAction() instanceof SourceAction) {
                SourceConfig config = new SourceConfig();
                config.setEnumeratorTask(this.enumeratorTaskIDMap.get((SourceAction)flow.getAction()));
                flow.setConfig(config);
            } else if (flow.getAction() instanceof SinkAction) {
                SinkConfig config = new SinkConfig();
                if (this.committerTaskIDMap.containsKey((SinkAction)flow.getAction())) {
                    config.setContainCommitter(true);
                    config.setCommitterTask(this.committerTaskIDMap.get((SinkAction)flow.getAction()));
                }
                flow.setConfig(config);
            }
        } else if (f instanceof IntermediateExecutionFlow) {
            ((IntermediateExecutionFlow)f).setConfig(new IntermediateQueueConfig(((IntermediateExecutionFlow)f).getQueue().getId()));
        } else {
            throw new UnknownFlowException(f);
        }
        if (!f.getNext().isEmpty()) {
            f.getNext().forEach(this::setFlowConfig);
        }
    }

    private static List<Flow> splitSinkFromFlow(Flow flow) {
        List<PhysicalExecutionFlow> sinkFlows = flow.getNext().stream().filter(f -> f instanceof PhysicalExecutionFlow).map(f -> (PhysicalExecutionFlow)f).filter(f -> f.getAction() instanceof SinkAction).collect(Collectors.toList());
        ArrayList<Flow> allFlows = new ArrayList<Flow>();
        flow.getNext().removeAll(sinkFlows);
        sinkFlows.forEach(s -> {
            IntermediateQueue queue = new IntermediateQueue(s.getAction().getId(), s.getAction().getName() + "-Queue", s.getAction().getParallelism());
            IntermediateExecutionFlow intermediateFlow = new IntermediateExecutionFlow(queue);
            flow.getNext().add(intermediateFlow);
            IntermediateExecutionFlow intermediateFlowQuote = new IntermediateExecutionFlow(queue);
            intermediateFlowQuote.getNext().add((Flow)s);
            allFlows.add(intermediateFlowQuote);
        });
        if (flow.getNext().size() > sinkFlows.size()) {
            allFlows.addAll(flow.getNext().stream().flatMap(f -> PhysicalPlanGenerator.splitSinkFromFlow(f).stream()).collect(Collectors.toList()));
        }
        return allFlows;
    }

    private static boolean sourceWithSink(PhysicalExecutionFlow<?, ?> flow) {
        return flow.getAction() instanceof SinkAction || flow.getNext().stream().map(f -> (PhysicalExecutionFlow)f).map(PhysicalPlanGenerator::sourceWithSink).collect(Collectors.toList()).contains(true);
    }

    private List<Flow> getNextWrapper(List<ExecutionEdge> edges, Action start) {
        List actions = edges.stream().filter(e -> e.getLeftVertex().getAction().equals(start)).map(e -> e.getRightVertex().getAction()).collect(Collectors.toList());
        List<Flow> wrappers = actions.stream().filter(a -> a instanceof ShuffleAction || a instanceof SinkAction).map(PhysicalExecutionFlow::new).collect(Collectors.toList());
        wrappers.addAll(actions.stream().filter(a -> !(a instanceof ShuffleAction) && !(a instanceof SinkAction)).map(a -> new PhysicalExecutionFlow((Action)a, this.getNextWrapper(edges, (Action)a))).collect(Collectors.toList()));
        return wrappers;
    }
}

