/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution.init;

import com.hazelcast.cluster.Address;
import com.hazelcast.function.ComparatorEx;
import com.hazelcast.internal.nio.Connection;
import com.hazelcast.internal.partition.InternalPartitionService;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.util.concurrent.ConcurrentConveyor;
import com.hazelcast.internal.util.concurrent.OneToOneConcurrentArrayQueue;
import com.hazelcast.internal.util.concurrent.QueuedPipe;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.impl.JetService;
import com.hazelcast.jet.impl.execution.ConcurrentInboundEdgeStream;
import com.hazelcast.jet.impl.execution.ConveyorCollector;
import com.hazelcast.jet.impl.execution.ConveyorCollectorWithPartition;
import com.hazelcast.jet.impl.execution.InboundEdgeStream;
import com.hazelcast.jet.impl.execution.OutboundCollector;
import com.hazelcast.jet.impl.execution.OutboundEdgeStream;
import com.hazelcast.jet.impl.execution.ProcessorTasklet;
import com.hazelcast.jet.impl.execution.ReceiverTasklet;
import com.hazelcast.jet.impl.execution.SenderTasklet;
import com.hazelcast.jet.impl.execution.SnapshotContext;
import com.hazelcast.jet.impl.execution.StoreSnapshotTasklet;
import com.hazelcast.jet.impl.execution.Tasklet;
import com.hazelcast.jet.impl.execution.init.Contexts;
import com.hazelcast.jet.impl.execution.init.EdgeDef;
import com.hazelcast.jet.impl.execution.init.JetInitDataSerializerHook;
import com.hazelcast.jet.impl.execution.init.PartitionArrangement;
import com.hazelcast.jet.impl.execution.init.VertexDef;
import com.hazelcast.jet.impl.util.AsyncSnapshotWriterImpl;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.ImdgUtil;
import com.hazelcast.jet.impl.util.ObjectWithPartitionId;
import com.hazelcast.jet.impl.util.PrefixedLogger;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

public class ExecutionPlan
implements IdentifiedDataSerializable {
    private static final int SNAPSHOT_QUEUE_SIZE = 1024;
    private Address[] partitionOwners;
    private JobConfig jobConfig;
    private List<VertexDef> vertices = new ArrayList<VertexDef>();
    private int memberIndex;
    private int memberCount;
    private long lastSnapshotId;
    private final transient List<Tasklet> tasklets = new ArrayList<Tasklet>();
    private final transient Map<Address, Connection> memberConnections = new HashMap<Address, Connection>();
    private final transient Map<Integer, Map<Integer, Map<Address, ReceiverTasklet>>> receiverMap = new HashMap<Integer, Map<Integer, Map<Address, ReceiverTasklet>>>();
    private final transient Map<Integer, Map<Integer, Map<Address, SenderTasklet>>> senderMap = new HashMap<Integer, Map<Integer, Map<Address, SenderTasklet>>>();
    private final transient Map<String, ConcurrentConveyor<Object>[]> localConveyorMap = new HashMap<String, ConcurrentConveyor<Object>[]>();
    private final transient Map<String, Map<Address, ConcurrentConveyor<Object>>> edgeSenderConveyorMap = new HashMap<String, Map<Address, ConcurrentConveyor<Object>>>();
    private final transient List<Processor> processors = new ArrayList<Processor>();
    private transient PartitionArrangement ptionArrgmt;
    private transient NodeEngineImpl nodeEngine;
    private transient long executionId;
    private final transient Supplier<Set<Address>> remoteMembers = Util.memoize(() -> Arrays.stream(this.partitionOwners).filter(a -> !a.equals(this.nodeEngine.getThisAddress())).collect(Collectors.toSet()));

    ExecutionPlan() {
    }

    ExecutionPlan(Address[] partitionOwners, JobConfig jobConfig, long lastSnapshotId, int memberIndex, int memberCount) {
        this.partitionOwners = partitionOwners;
        this.jobConfig = jobConfig;
        this.lastSnapshotId = lastSnapshotId;
        this.memberIndex = memberIndex;
        this.memberCount = memberCount;
    }

    public void initialize(NodeEngine nodeEngine, long jobId, long executionId, SnapshotContext snapshotContext, ConcurrentHashMap<String, File> tempDirectories, InternalSerializationService jobSerializationService) {
        this.nodeEngine = (NodeEngineImpl)nodeEngine;
        this.executionId = executionId;
        this.initProcSuppliers(jobId, tempDirectories, jobSerializationService);
        this.initDag(jobSerializationService);
        this.ptionArrgmt = new PartitionArrangement(this.partitionOwners, nodeEngine.getThisAddress());
        JetInstance instance = Util.getJetInstance(nodeEngine);
        Set<Integer> higherPriorityVertices = VertexDef.getHigherPriorityVertices(this.vertices);
        for (Address destAddr : this.remoteMembers.get()) {
            this.memberConnections.put(destAddr, ImdgUtil.getMemberConnection(nodeEngine, destAddr));
        }
        for (VertexDef vertex : this.vertices) {
            Collection<? extends Processor> processors = ExecutionPlan.createProcessors(vertex, vertex.localParallelism());
            QueuedPipe[] snapshotQueues = new QueuedPipe[vertex.localParallelism()];
            Arrays.setAll(snapshotQueues, i -> new OneToOneConcurrentArrayQueue(1024));
            ConcurrentConveyor<Object> ssConveyor = ConcurrentConveyor.concurrentConveyor(null, snapshotQueues);
            String jobPrefix = PrefixedLogger.prefix(this.jobConfig.getName(), jobId, vertex.name());
            ILogger storeSnapshotLogger = PrefixedLogger.prefixedLogger(nodeEngine.getLogger(StoreSnapshotTasklet.class), jobPrefix);
            StoreSnapshotTasklet ssTasklet = new StoreSnapshotTasklet(snapshotContext, ConcurrentInboundEdgeStream.create(ssConveyor, 0, 0, true, jobPrefix + "/ssFrom", null), new AsyncSnapshotWriterImpl(nodeEngine, snapshotContext, vertex.name(), this.memberIndex, this.memberCount, jobSerializationService), storeSnapshotLogger, vertex.name(), higherPriorityVertices.contains(vertex.vertexId()));
            this.tasklets.add(ssTasklet);
            int localProcessorIdx = 0;
            for (Processor processor : processors) {
                int globalProcessorIndex = this.memberIndex * vertex.localParallelism() + localProcessorIdx;
                String processorPrefix = PrefixedLogger.prefix(this.jobConfig.getName(), jobId, vertex.name(), globalProcessorIndex);
                ILogger logger = PrefixedLogger.prefixedLogger(nodeEngine.getLogger(processor.getClass()), processorPrefix);
                Contexts.ProcCtx context = new Contexts.ProcCtx(instance, jobId, executionId, this.getJobConfig(), logger, vertex.name(), localProcessorIdx, globalProcessorIndex, this.jobConfig.getProcessingGuarantee(), vertex.localParallelism(), this.memberIndex, this.memberCount, tempDirectories, jobSerializationService);
                List<OutboundEdgeStream> outboundStreams = this.createOutboundEdgeStreams(vertex, localProcessorIdx, jobPrefix, jobSerializationService);
                List<InboundEdgeStream> inboundStreams = this.createInboundEdgeStreams(vertex, localProcessorIdx, jobPrefix, globalProcessorIndex);
                ConveyorCollector snapshotCollector = new ConveyorCollector(ssConveyor, localProcessorIdx, null);
                boolean isSource = vertex.inboundEdges().stream().allMatch(EdgeDef::isSnapshotRestoreEdge) && !vertex.isSnapshotVertex();
                ProcessorTasklet processorTasklet = new ProcessorTasklet(context, nodeEngine.getExecutionService().getExecutor("jet:tasklet_initClose"), jobSerializationService, processor, inboundStreams, outboundStreams, snapshotContext, snapshotCollector, isSource);
                this.tasklets.add(processorTasklet);
                this.processors.add(processor);
                ++localProcessorIdx;
            }
        }
        List allReceivers = this.receiverMap.values().stream().flatMap(o -> o.values().stream()).flatMap(a -> a.values().stream()).collect(Collectors.toList());
        this.tasklets.addAll(allReceivers);
    }

    public List<ProcessorSupplier> getProcessorSuppliers() {
        return Util.toList(this.vertices, VertexDef::processorSupplier);
    }

    public Map<Integer, Map<Integer, Map<Address, ReceiverTasklet>>> getReceiverMap() {
        return this.receiverMap;
    }

    public Map<Integer, Map<Integer, Map<Address, SenderTasklet>>> getSenderMap() {
        return this.senderMap;
    }

    public List<Tasklet> getTasklets() {
        return this.tasklets;
    }

    public JobConfig getJobConfig() {
        return this.jobConfig;
    }

    void addVertex(VertexDef vertex) {
        this.vertices.add(vertex);
    }

    @Override
    public int getFactoryId() {
        return JetInitDataSerializerHook.FACTORY_ID;
    }

    @Override
    public int getClassId() {
        return 0;
    }

    @Override
    public void writeData(ObjectDataOutput out) throws IOException {
        ImdgUtil.writeList(out, this.vertices);
        out.writeInt(this.partitionOwners.length);
        out.writeLong(this.lastSnapshotId);
        for (Address address : this.partitionOwners) {
            out.writeObject(address);
        }
        out.writeObject(this.jobConfig);
        out.writeInt(this.memberIndex);
        out.writeInt(this.memberCount);
    }

    @Override
    public void readData(ObjectDataInput in) throws IOException {
        this.vertices = ImdgUtil.readList(in);
        int len = in.readInt();
        this.partitionOwners = new Address[len];
        this.lastSnapshotId = in.readLong();
        for (int i = 0; i < len; ++i) {
            this.partitionOwners[i] = (Address)in.readObject();
        }
        this.jobConfig = (JobConfig)in.readObject();
        this.memberIndex = in.readInt();
        this.memberCount = in.readInt();
    }

    private void initProcSuppliers(long jobId, ConcurrentHashMap<String, File> tempDirectories, InternalSerializationService jobSerializationService) {
        JetService service = (JetService)this.nodeEngine.getService("hz:impl:jetService");
        for (VertexDef vertex : this.vertices) {
            ProcessorSupplier supplier = vertex.processorSupplier();
            String prefix = PrefixedLogger.prefix(this.jobConfig.getName(), jobId, vertex.name(), "#PS");
            ILogger logger = PrefixedLogger.prefixedLogger(this.nodeEngine.getLogger(supplier.getClass()), prefix);
            try {
                supplier.init(new Contexts.ProcSupplierCtx(service.getJetInstance(), jobId, this.executionId, this.jobConfig, logger, vertex.name(), vertex.localParallelism(), vertex.localParallelism() * this.memberCount, this.memberIndex, this.memberCount, this.jobConfig.getProcessingGuarantee(), tempDirectories, jobSerializationService));
            }
            catch (Exception e) {
                throw ExceptionUtil.sneakyThrow(e);
            }
        }
    }

    private void initDag(InternalSerializationService jobSerializationService) {
        Map<Integer, VertexDef> vMap = this.vertices.stream().collect(Collectors.toMap(VertexDef::vertexId, v -> v));
        for (VertexDef v2 : this.vertices) {
            v2.inboundEdges().forEach(e -> e.initTransientFields(vMap, v2, false));
            v2.outboundEdges().forEach(e -> e.initTransientFields(vMap, v2, true));
        }
        InternalPartitionService partitionService = this.nodeEngine.getPartitionService();
        this.vertices.stream().map(VertexDef::outboundEdges).flatMap(Collection::stream).map(EdgeDef::partitioner).filter(Objects::nonNull).forEach(partitioner -> partitioner.init(object -> partitionService.getPartitionId((Data)jobSerializationService.toData(object))));
    }

    private static Collection<? extends Processor> createProcessors(VertexDef vertexDef, int parallelism) {
        Collection<? extends Processor> processors = vertexDef.processorSupplier().get(parallelism);
        if (processors.size() != parallelism) {
            throw new JetException("ProcessorSupplier failed to return the requested number of processors. Requested: " + parallelism + ", returned: " + processors.size());
        }
        return processors;
    }

    private List<OutboundEdgeStream> createOutboundEdgeStreams(VertexDef vertex, int processorIdx, String jobPrefix, InternalSerializationService jobSerializationService) {
        ArrayList<OutboundEdgeStream> outboundStreams = new ArrayList<OutboundEdgeStream>();
        for (EdgeDef edge : vertex.outboundEdges()) {
            OutboundCollector outboundCollector = this.createOutboundCollector(edge, processorIdx, jobPrefix, jobSerializationService);
            OutboundEdgeStream outboundEdgeStream = new OutboundEdgeStream(edge.sourceOrdinal(), outboundCollector);
            outboundStreams.add(outboundEdgeStream);
        }
        return outboundStreams;
    }

    private OutboundCollector createOutboundCollector(EdgeDef edge, int processorIndex, String jobPrefix, InternalSerializationService jobSerializationService) {
        if (edge.routingPolicy() == Edge.RoutingPolicy.ISOLATED && !edge.isLocal()) {
            throw new IllegalArgumentException("Isolated edges must be local: " + edge);
        }
        int totalPartitionCount = this.nodeEngine.getPartitionService().getPartitionCount();
        int[][] partitionsPerProcessor = this.getLocalPartitionDistribution(edge, edge.destVertex().localParallelism());
        OutboundCollector localCollector = this.createLocalOutboundCollector(edge, processorIndex, totalPartitionCount, partitionsPerProcessor);
        if (edge.isLocal()) {
            return localCollector;
        }
        OutboundCollector[] remoteCollectors = this.createRemoteOutboundCollectors(edge, jobPrefix, processorIndex, totalPartitionCount, partitionsPerProcessor, jobSerializationService);
        OutboundCollector[] collectors = new OutboundCollector[remoteCollectors.length + 1];
        collectors[0] = localCollector;
        System.arraycopy(remoteCollectors, 0, collectors, 1, collectors.length - 1);
        return OutboundCollector.compositeCollector(collectors, edge, totalPartitionCount, false);
    }

    private OutboundCollector createLocalOutboundCollector(EdgeDef edge, int processorIndex, int totalPartitionCount, int[][] partitionsPerProcessor) {
        int upstreamParallelism = edge.sourceVertex().localParallelism();
        int downstreamParallelism = edge.destVertex().localParallelism();
        int queueSize = edge.getConfig().getQueueSize();
        int numRemoteMembers = this.ptionArrgmt.getRemotePartitionAssignment().size();
        if (edge.routingPolicy() == Edge.RoutingPolicy.ISOLATED) {
            ConcurrentConveyor[] localConveyors = this.localConveyorMap.computeIfAbsent(edge.edgeId(), edgeId -> {
                int queueCount = upstreamParallelism / downstreamParallelism;
                int remainder = upstreamParallelism % downstreamParallelism;
                return (ConcurrentConveyor[])Stream.concat(Arrays.stream(ExecutionPlan.createConveyorArray(remainder, queueCount + 1, queueSize)), Arrays.stream(ExecutionPlan.createConveyorArray(downstreamParallelism - remainder, Math.max(1, queueCount), queueSize))).toArray(ConcurrentConveyor[]::new);
            });
            OutboundCollector[] localCollectors = (OutboundCollector[])IntStream.range(0, downstreamParallelism).filter(i -> i % upstreamParallelism == processorIndex % downstreamParallelism).mapToObj(i -> new ConveyorCollector(localConveyors[i], processorIndex / downstreamParallelism, null)).toArray(OutboundCollector[]::new);
            return OutboundCollector.compositeCollector(localCollectors, edge, totalPartitionCount, true);
        }
        ConcurrentConveyor[] localConveyors = this.localConveyorMap.computeIfAbsent(edge.edgeId(), edgeId -> {
            int queueCount = upstreamParallelism + (!edge.isLocal() ? numRemoteMembers : 0);
            return ExecutionPlan.createConveyorArray(downstreamParallelism, queueCount, queueSize);
        });
        OutboundCollector[] localCollectors = new OutboundCollector[downstreamParallelism];
        Arrays.setAll(localCollectors, n -> new ConveyorCollector(localConveyors[n], processorIndex, partitionsPerProcessor[n]));
        return OutboundCollector.compositeCollector(localCollectors, edge, totalPartitionCount, true);
    }

    private OutboundCollector[] createRemoteOutboundCollectors(EdgeDef edge, String jobPrefix, int processorIndex, int totalPartitionCount, int[][] partitionsPerProcessor, InternalSerializationService jobSerializationService) {
        if (!edge.getDistributedTo().equals(Edge.DISTRIBUTE_TO_ALL)) {
            if (edge.routingPolicy() != Edge.RoutingPolicy.PARTITIONED) {
                throw new JetException("An edge distributing to a specific member must be partitioned: " + edge);
            }
            if (!this.ptionArrgmt.getRemotePartitionAssignment().containsKey(edge.getDistributedTo()) && !edge.getDistributedTo().equals(this.nodeEngine.getThisAddress())) {
                throw new JetException("The target member of an edge is not present in the cluster or is a lite member: " + edge);
            }
        }
        Map<Address, ConcurrentConveyor<Object>> senderConveyorMap = this.memberToSenderConveyorMap(this.edgeSenderConveyorMap, edge, jobPrefix, jobSerializationService);
        this.createIfAbsentReceiverTasklet(edge, jobPrefix, partitionsPerProcessor, totalPartitionCount, jobSerializationService);
        Address distributeTo = edge.getDistributedTo();
        Map<Address, int[]> memberToPartitions = distributeTo.equals(Edge.DISTRIBUTE_TO_ALL) ? this.ptionArrgmt.getRemotePartitionAssignment() : this.ptionArrgmt.remotePartitionAssignmentToOne(distributeTo);
        OutboundCollector[] remoteCollectors = new OutboundCollector[memberToPartitions.size()];
        int index = 0;
        for (Map.Entry<Address, int[]> entry : memberToPartitions.entrySet()) {
            Address memberAddress = entry.getKey();
            int[] memberPartitions = entry.getValue();
            ConcurrentConveyor<Object> conveyor = senderConveyorMap.get(memberAddress);
            remoteCollectors[index++] = new ConveyorCollectorWithPartition(conveyor, processorIndex, memberPartitions);
        }
        return remoteCollectors;
    }

    private Map<Address, ConcurrentConveyor<Object>> memberToSenderConveyorMap(Map<String, Map<Address, ConcurrentConveyor<Object>>> edgeSenderConveyorMap, EdgeDef edge, String jobPrefix, InternalSerializationService jobSerializationService) {
        assert (!edge.isLocal()) : "Edge is not distributed";
        return edgeSenderConveyorMap.computeIfAbsent(edge.edgeId(), x -> {
            HashMap<Address, ConcurrentConveyor<Object>> addrToConveyor = new HashMap<Address, ConcurrentConveyor<Object>>();
            for (Address destAddr : this.remoteMembers.get()) {
                ConcurrentConveyor<Object> conveyor = ExecutionPlan.createConveyorArray(1, edge.sourceVertex().localParallelism(), edge.getConfig().getQueueSize())[0];
                ComparatorEx<?> origComparator = edge.getOrderComparator();
                ComparatorEx<ObjectWithPartitionId> adaptedComparator = origComparator == null ? null : (l, r) -> origComparator.compare(l.getItem(), r.getItem());
                InboundEdgeStream inboundEdgeStream = this.newEdgeStream(edge, conveyor, jobPrefix + "/toVertex:" + edge.destVertex().name() + "-toMember:" + destAddr, adaptedComparator);
                int destVertexId = edge.destVertex().vertexId();
                SenderTasklet t = new SenderTasklet(inboundEdgeStream, this.nodeEngine, destAddr, this.memberConnections.get(destAddr), destVertexId, edge.getConfig().getPacketSizeLimit(), this.executionId, edge.sourceVertex().name(), edge.sourceOrdinal(), jobSerializationService);
                this.senderMap.computeIfAbsent(destVertexId, xx -> new HashMap()).computeIfAbsent(edge.destOrdinal(), xx -> new HashMap()).put(destAddr, t);
                this.tasklets.add(t);
                addrToConveyor.put(destAddr, conveyor);
            }
            return addrToConveyor;
        });
    }

    private static ConcurrentConveyor<Object>[] createConveyorArray(int count, int queueCount, int queueSize) {
        ConcurrentConveyor[] concurrentConveyors = new ConcurrentConveyor[count];
        Arrays.setAll(concurrentConveyors, i -> {
            QueuedPipe[] queues = new QueuedPipe[queueCount];
            Arrays.setAll(queues, j -> new OneToOneConcurrentArrayQueue(queueSize));
            return ConcurrentConveyor.concurrentConveyor(null, queues);
        });
        return concurrentConveyors;
    }

    private int[][] getLocalPartitionDistribution(EdgeDef edge, int downstreamParallelism) {
        if (!edge.routingPolicy().equals(Edge.RoutingPolicy.PARTITIONED)) {
            return new int[downstreamParallelism][];
        }
        if (edge.isLocal() || this.nodeEngine.getThisAddress().equals(edge.getDistributedTo())) {
            return this.ptionArrgmt.assignPartitionsToProcessors(downstreamParallelism, false);
        }
        if (edge.getDistributedTo().equals(Edge.DISTRIBUTE_TO_ALL)) {
            return this.ptionArrgmt.assignPartitionsToProcessors(downstreamParallelism, true);
        }
        int[][] res = new int[downstreamParallelism][];
        Arrays.fill((Object[])res, new int[0]);
        return res;
    }

    private void createIfAbsentReceiverTasklet(EdgeDef edge, String jobPrefix, int[][] ptionsPerProcessor, int totalPtionCount, InternalSerializationService jobSerializationService) {
        ConcurrentConveyor[] localConveyors = this.localConveyorMap.get(edge.edgeId());
        this.receiverMap.computeIfAbsent(edge.destVertex().vertexId(), x -> new HashMap()).computeIfAbsent(edge.destOrdinal(), x -> {
            HashMap<Address, ReceiverTasklet> addrToTasklet = new HashMap<Address, ReceiverTasklet>();
            int offset = 0;
            for (Address addr : this.ptionArrgmt.getRemotePartitionAssignment().keySet()) {
                OutboundCollector[] collectors = new OutboundCollector[ptionsPerProcessor.length];
                int queueOffset = --offset;
                Arrays.setAll(collectors, n -> new ConveyorCollector(localConveyors[n], localConveyors[n].queueCount() + queueOffset, ptionsPerProcessor[n]));
                OutboundCollector collector = OutboundCollector.compositeCollector(collectors, edge, totalPtionCount, true);
                ReceiverTasklet receiverTasklet = new ReceiverTasklet(collector, jobSerializationService, edge.getConfig().getReceiveWindowMultiplier(), this.getConfig().getInstanceConfig().getFlowControlPeriodMs(), this.nodeEngine.getLoggingService(), addr, edge.destOrdinal(), edge.destVertex().name(), this.memberConnections.get(addr), jobPrefix);
                addrToTasklet.put(addr, receiverTasklet);
            }
            return addrToTasklet;
        });
    }

    private JetConfig getConfig() {
        JetService service = (JetService)this.nodeEngine.getService("hz:impl:jetService");
        return service.getJetInstance().getConfig();
    }

    private List<InboundEdgeStream> createInboundEdgeStreams(VertexDef srcVertex, int localProcessorIdx, String jobPrefix, int globalProcessorIdx) {
        ArrayList<InboundEdgeStream> inboundStreams = new ArrayList<InboundEdgeStream>();
        for (EdgeDef inEdge : srcVertex.inboundEdges()) {
            ConcurrentConveyor<Object> conveyor = this.localConveyorMap.get(inEdge.edgeId())[localProcessorIdx];
            inboundStreams.add(this.newEdgeStream(inEdge, conveyor, jobPrefix + "#" + globalProcessorIdx, inEdge.getOrderComparator()));
        }
        return inboundStreams;
    }

    private InboundEdgeStream newEdgeStream(EdgeDef inEdge, ConcurrentConveyor<Object> conveyor, String debugName, ComparatorEx<?> comparator) {
        return ConcurrentInboundEdgeStream.create(conveyor, inEdge.destOrdinal(), inEdge.priority(), this.jobConfig.getProcessingGuarantee() == ProcessingGuarantee.EXACTLY_ONCE, debugName, comparator);
    }

    public List<Processor> getProcessors() {
        return this.processors;
    }

    public long lastSnapshotId() {
        return this.lastSnapshotId;
    }

    public int getStoreSnapshotTaskletCount() {
        return (int)this.tasklets.stream().filter(t -> t instanceof StoreSnapshotTasklet).count();
    }

    public int getProcessorTaskletCount() {
        return (int)this.tasklets.stream().filter(t -> t instanceof ProcessorTasklet).count();
    }

    public int getHigherPriorityVertexCount() {
        return VertexDef.getHigherPriorityVertices(this.vertices).size();
    }

    List<VertexDef> getVertices() {
        return this.vertices;
    }
}

