/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution.init;

import com.hazelcast.core.Member;
import com.hazelcast.internal.util.concurrent.ConcurrentConveyor;
import com.hazelcast.internal.util.concurrent.OneToOneConcurrentArrayQueue;
import com.hazelcast.internal.util.concurrent.QueuedPipe;
import com.hazelcast.jet.DAG;
import com.hazelcast.jet.Edge;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Processor;
import com.hazelcast.jet.ProcessorMetaSupplier;
import com.hazelcast.jet.ProcessorSupplier;
import com.hazelcast.jet.Vertex;
import com.hazelcast.jet.config.EdgeConfig;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.impl.JetService;
import com.hazelcast.jet.impl.execution.ConcurrentInboundEdgeStream;
import com.hazelcast.jet.impl.execution.ConveyorCollector;
import com.hazelcast.jet.impl.execution.ConveyorCollectorWithPartition;
import com.hazelcast.jet.impl.execution.ConveyorEmitter;
import com.hazelcast.jet.impl.execution.InboundEdgeStream;
import com.hazelcast.jet.impl.execution.InboundEmitter;
import com.hazelcast.jet.impl.execution.OutboundCollector;
import com.hazelcast.jet.impl.execution.OutboundEdgeStream;
import com.hazelcast.jet.impl.execution.ProcessorTasklet;
import com.hazelcast.jet.impl.execution.ReceiverTasklet;
import com.hazelcast.jet.impl.execution.SenderTasklet;
import com.hazelcast.jet.impl.execution.Tasklet;
import com.hazelcast.jet.impl.execution.init.Contexts;
import com.hazelcast.jet.impl.execution.init.EdgeDef;
import com.hazelcast.jet.impl.execution.init.JetImplDataSerializerHook;
import com.hazelcast.jet.impl.execution.init.PartitionArrangement;
import com.hazelcast.jet.impl.execution.init.VertexDef;
import com.hazelcast.jet.impl.util.DoneItem;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.partition.IPartitionService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;

public class ExecutionPlan
implements IdentifiedDataSerializable {
    private static final ILogger LOGGER = Logger.getLogger(ExecutionPlan.class);
    private final List<Tasklet> tasklets = new ArrayList<Tasklet>();
    private final Map<Integer, Map<Integer, ReceiverTasklet>> receiverMap = new HashMap<Integer, Map<Integer, ReceiverTasklet>>();
    private final Map<Integer, Map<Integer, Map<Address, SenderTasklet>>> senderMap = new HashMap<Integer, Map<Integer, Map<Address, SenderTasklet>>>();
    private List<VertexDef> vertices = new ArrayList<VertexDef>();
    private final Map<String, ConcurrentConveyor<Object>[]> localConveyorMap = new HashMap<String, ConcurrentConveyor<Object>[]>();
    private final Map<String, Map<Address, ConcurrentConveyor<Object>>> edgeSenderConveyorMap = new HashMap<String, Map<Address, ConcurrentConveyor<Object>>>();
    private PartitionArrangement ptionArrgmt;
    private NodeEngine nodeEngine;
    private long executionId;

    ExecutionPlan() {
    }

    public static Map<Member, ExecutionPlan> createExecutionPlans(NodeEngine nodeEngine, DAG dag, int defaultParallelism) {
        JetInstance instance = ExecutionPlan.getJetInstance(nodeEngine);
        ArrayList members = new ArrayList(nodeEngine.getClusterService().getMembers());
        int clusterSize = members.size();
        boolean isJobDistributed = clusterSize > 1;
        EdgeConfig defaultEdgeConfig = instance.getConfig().getDefaultEdgeConfig();
        Map<Member, ExecutionPlan> plans = members.stream().collect(Collectors.toMap(m -> m, m -> new ExecutionPlan()));
        Map<String, Integer> vertexIdMap = ExecutionPlan.assignVertexIds(dag);
        for (Map.Entry<String, Integer> entry : vertexIdMap.entrySet()) {
            Vertex vertex = dag.getVertex(entry.getKey());
            int vertexId = entry.getValue();
            int localParallelism = vertex.getLocalParallelism() != -1 ? vertex.getLocalParallelism() : defaultParallelism;
            int totalParallelism = localParallelism * clusterSize;
            List<EdgeDef> inbound = ExecutionPlan.toEdgeDefs(dag.getInboundEdges(vertex.getName()), defaultEdgeConfig, e -> (Integer)vertexIdMap.get(e.getSourceName()), isJobDistributed);
            List<EdgeDef> outbound = ExecutionPlan.toEdgeDefs(dag.getOutboundEdges(vertex.getName()), defaultEdgeConfig, e -> (Integer)vertexIdMap.get(e.getDestName()), isJobDistributed);
            ProcessorMetaSupplier metaSupplier = vertex.getSupplier();
            metaSupplier.init(new Contexts.MetaSupplierCtx(instance, totalParallelism, localParallelism));
            List<Address> addresses = plans.keySet().stream().map(Member::getAddress).collect(Collectors.toList());
            Function<Address, ProcessorSupplier> procSupplierFn = metaSupplier.get(addresses);
            for (Map.Entry<Member, ExecutionPlan> e2 : plans.entrySet()) {
                ProcessorSupplier processorSupplier = procSupplierFn.apply(e2.getKey().getAddress());
                VertexDef vertexDef = new VertexDef(vertexId, vertex.getName(), processorSupplier, localParallelism);
                vertexDef.addInboundEdges(inbound);
                vertexDef.addOutboundEdges(outbound);
                e2.getValue().vertices.add(vertexDef);
            }
        }
        return plans;
    }

    private static JetInstance getJetInstance(NodeEngine nodeEngine) {
        return ((JetService)nodeEngine.getService("hz:impl:jetService")).getJetInstance();
    }

    public void initialize(NodeEngine nodeEngine, long executionId) {
        this.nodeEngine = nodeEngine;
        this.executionId = executionId;
        this.initProcSuppliers();
        this.initDag();
        this.ptionArrgmt = new PartitionArrangement(nodeEngine);
        JetInstance instance = ExecutionPlan.getJetInstance(nodeEngine);
        for (VertexDef srcVertex : this.vertices) {
            int processorIdx = -1;
            for (Processor processor : ExecutionPlan.createProcessors(srcVertex, srcVertex.parallelism())) {
                List<OutboundEdgeStream> outboundStreams = this.createOutboundEdgeStreams(srcVertex, ++processorIdx);
                List<InboundEdgeStream> inboundStreams = this.createInboundEdgeStreams(srcVertex, processorIdx);
                ILogger logger = nodeEngine.getLogger(srcVertex.name() + '(' + processor.getClass().getSimpleName() + ")#" + processorIdx);
                Contexts.ProcCtx context = new Contexts.ProcCtx(instance, logger, srcVertex.name(), processorIdx);
                this.tasklets.add(new ProcessorTasklet(srcVertex.name(), context, processor, inboundStreams, outboundStreams));
            }
        }
        this.tasklets.addAll(this.receiverMap.values().stream().map(Map::values).flatMap(Collection::stream).collect(Collectors.toList()));
    }

    public List<ProcessorSupplier> getProcessorSuppliers() {
        return this.vertices.stream().map(VertexDef::processorSupplier).collect(Collectors.toList());
    }

    public Map<Integer, Map<Integer, ReceiverTasklet>> getReceiverMap() {
        return this.receiverMap;
    }

    public Map<Integer, Map<Integer, Map<Address, SenderTasklet>>> getSenderMap() {
        return this.senderMap;
    }

    public List<Tasklet> getTasklets() {
        return this.tasklets;
    }

    public int getFactoryId() {
        return JetImplDataSerializerHook.FACTORY_ID;
    }

    public int getId() {
        return 0;
    }

    public void writeData(ObjectDataOutput out) throws IOException {
        Util.writeList(out, this.vertices);
    }

    public void readData(ObjectDataInput in) throws IOException {
        this.vertices = Util.readList(in);
    }

    private static Map<String, Integer> assignVertexIds(DAG dag) {
        LinkedHashMap<String, Integer> vertexIdMap = new LinkedHashMap<String, Integer>();
        int[] vertexId = new int[]{0};
        dag.forEach(v -> {
            int n = vertexId[0];
            vertexId[0] = n + 1;
            vertexIdMap.put(v.getName(), n);
        });
        return vertexIdMap;
    }

    private static List<EdgeDef> toEdgeDefs(List<Edge> edges, EdgeConfig defaultEdgeConfig, Function<Edge, Integer> oppositeVtxId, boolean isJobDistributed) {
        return edges.stream().map(edge -> new EdgeDef((Edge)edge, edge.getConfig() == null ? defaultEdgeConfig : edge.getConfig(), (Integer)oppositeVtxId.apply((Edge)edge), isJobDistributed)).collect(Collectors.toList());
    }

    private void initProcSuppliers() {
        JetService service = (JetService)this.nodeEngine.getService("hz:impl:jetService");
        this.vertices.forEach(v -> v.processorSupplier().init(new Contexts.ProcSupplierCtx(service.getJetInstance(), v.parallelism())));
    }

    private void initDag() {
        Map<Integer, VertexDef> vMap = this.vertices.stream().collect(Collectors.toMap(VertexDef::vertexId, v -> v));
        this.vertices.forEach(v -> {
            v.inboundEdges().forEach(e -> e.initTransientFields(vMap, (VertexDef)v, false));
            v.outboundEdges().forEach(e -> e.initTransientFields(vMap, (VertexDef)v, true));
        });
        IPartitionService partitionService = this.nodeEngine.getPartitionService();
        this.vertices.stream().map(VertexDef::outboundEdges).flatMap(Collection::stream).map(EdgeDef::partitioner).filter(Objects::nonNull).forEach(p -> p.init(arg_0 -> ((IPartitionService)partitionService).getPartitionId(arg_0)));
    }

    private static Collection<? extends Processor> createProcessors(VertexDef vertexDef, int parallelism) {
        Collection<? extends Processor> processors = vertexDef.processorSupplier().get(parallelism);
        if (processors.size() != parallelism) {
            throw new JetException("ProcessorSupplier failed to return the requested number of processors. Requested: " + parallelism + ", returned: " + processors.size());
        }
        return processors;
    }

    private List<OutboundEdgeStream> createOutboundEdgeStreams(VertexDef srcVertex, int processorIdx) {
        ArrayList<OutboundEdgeStream> outboundStreams = new ArrayList<OutboundEdgeStream>();
        for (EdgeDef edge : srcVertex.outboundEdges()) {
            this.localConveyorMap.computeIfAbsent(edge.edgeId(), e -> ExecutionPlan.createConveyorArray(edge.destVertex().parallelism(), srcVertex.parallelism() + (edge.isDistributed() ? 1 : 0), edge.getConfig().getQueueSize()));
            Map<Address, ConcurrentConveyor<Object>> memberToSenderConveyorMap = edge.isDistributed() ? this.memberToSenderConveyorMap(this.edgeSenderConveyorMap, edge) : null;
            outboundStreams.add(this.createOutboundEdgeStream(edge, processorIdx, memberToSenderConveyorMap));
        }
        return outboundStreams;
    }

    private Map<Address, ConcurrentConveyor<Object>> memberToSenderConveyorMap(Map<String, Map<Address, ConcurrentConveyor<Object>>> edgeSenderConveyorMap, EdgeDef edge) {
        assert (edge.isDistributed()) : "Edge is not distributed";
        return edgeSenderConveyorMap.computeIfAbsent(edge.edgeId(), x -> {
            HashMap<Address, ConcurrentConveyor<Object>> addrToConveyor = new HashMap<Address, ConcurrentConveyor<Object>>();
            for (Address destAddr : Util.getRemoteMembers(this.nodeEngine)) {
                ConcurrentConveyor<Object> conveyor = ExecutionPlan.createConveyorArray(1, edge.sourceVertex().parallelism(), edge.getConfig().getQueueSize())[0];
                ConcurrentInboundEdgeStream inboundEdgeStream = ExecutionPlan.createInboundEdgeStream(edge.destOrdinal(), edge.priority(), conveyor);
                int destVertexId = edge.destVertex().vertexId();
                SenderTasklet t = new SenderTasklet(inboundEdgeStream, this.nodeEngine, destAddr, this.executionId, destVertexId, edge.getConfig().getPacketSizeLimit());
                this.senderMap.computeIfAbsent(destVertexId, xx -> new HashMap()).computeIfAbsent(edge.destOrdinal(), xx -> new HashMap()).put(destAddr, t);
                this.tasklets.add(t);
                addrToConveyor.put(destAddr, conveyor);
            }
            return addrToConveyor;
        });
    }

    private static ConcurrentConveyor<Object>[] createConveyorArray(int count, int queueCount, int queueSize) {
        ConcurrentConveyor[] concurrentConveyors = new ConcurrentConveyor[count];
        Arrays.setAll(concurrentConveyors, i -> {
            QueuedPipe[] queues = new QueuedPipe[queueCount];
            Arrays.setAll(queues, j -> new OneToOneConcurrentArrayQueue(queueSize));
            return ConcurrentConveyor.concurrentConveyor((Object)DoneItem.DONE_ITEM, (QueuedPipe[])queues);
        });
        return concurrentConveyors;
    }

    private OutboundEdgeStream createOutboundEdgeStream(EdgeDef edge, int processorIndex, Map<Address, ConcurrentConveyor<Object>> senderConveyorMap) {
        OutboundCollector[] allCollectors;
        int totalPtionCount = this.nodeEngine.getPartitionService().getPartitionCount();
        int processorCount = edge.destVertex().parallelism();
        int[][] ptionsPerProcessor = this.ptionArrgmt.assignPartitionsToProcessors(processorCount, edge.isDistributed());
        OutboundCollector[] localCollectors = new OutboundCollector[processorCount];
        ConcurrentConveyor<Object>[] localConveyors = this.localConveyorMap.get(edge.edgeId());
        Arrays.setAll(localCollectors, n -> new ConveyorCollector((ConcurrentConveyor<Object>)localConveyors[n], processorIndex, ptionsPerProcessor[n]));
        if (!edge.isDistributed()) {
            allCollectors = localCollectors;
        } else {
            this.createIfAbsentReceiverTasklet(edge, ptionsPerProcessor, totalPtionCount);
            Map<Address, int[]> memberToPartitions = this.ptionArrgmt.remotePartitionAssignment.get();
            allCollectors = new OutboundCollector[memberToPartitions.size() + 1];
            allCollectors[0] = OutboundCollector.compositeCollector(localCollectors, edge, totalPtionCount);
            int index = 1;
            for (Map.Entry<Address, int[]> entry : memberToPartitions.entrySet()) {
                allCollectors[index++] = new ConveyorCollectorWithPartition(senderConveyorMap.get(entry.getKey()), processorIndex, entry.getValue());
            }
        }
        return new OutboundEdgeStream(edge.sourceOrdinal(), edge.isBuffered() ? Integer.MAX_VALUE : edge.getConfig().getHighWaterMark(), OutboundCollector.compositeCollector(allCollectors, edge, totalPtionCount));
    }

    private void createIfAbsentReceiverTasklet(EdgeDef edge, int[][] ptionsPerProcessor, int totalPtionCount) {
        ConcurrentConveyor<Object>[] localConveyors = this.localConveyorMap.get(edge.edgeId());
        this.receiverMap.computeIfAbsent(edge.destVertex().vertexId(), x -> new HashMap()).computeIfAbsent(edge.destOrdinal(), x -> {
            OutboundCollector[] collectors = new OutboundCollector[ptionsPerProcessor.length];
            Arrays.setAll(collectors, n -> new ConveyorCollector((ConcurrentConveyor<Object>)localConveyors[n], localConveyors[n].queueCount() - 1, ptionsPerProcessor[n]));
            OutboundCollector collector = OutboundCollector.compositeCollector(collectors, edge, totalPtionCount);
            int senderCount = this.nodeEngine.getClusterService().getSize() - 1;
            return new ReceiverTasklet(collector, edge.getConfig().getReceiveWindowMultiplier(), this.getConfig().getInstanceConfig().getFlowControlPeriodMs(), senderCount);
        });
    }

    private JetConfig getConfig() {
        JetService service = (JetService)this.nodeEngine.getService("hz:impl:jetService");
        return service.getJetInstance().getConfig();
    }

    private List<InboundEdgeStream> createInboundEdgeStreams(VertexDef srcVertex, int processorIdx) {
        ArrayList<InboundEdgeStream> inboundStreams = new ArrayList<InboundEdgeStream>();
        for (EdgeDef inEdge : srcVertex.inboundEdges()) {
            ConcurrentConveyor<Object> conveyor = this.localConveyorMap.get(inEdge.edgeId())[processorIdx];
            inboundStreams.add(ExecutionPlan.createInboundEdgeStream(inEdge.destOrdinal(), inEdge.priority(), conveyor));
        }
        return inboundStreams;
    }

    private static ConcurrentInboundEdgeStream createInboundEdgeStream(int ordinal, int priority, ConcurrentConveyor<Object> conveyor) {
        InboundEmitter[] emitters = new InboundEmitter[conveyor.queueCount()];
        Arrays.setAll(emitters, n -> new ConveyorEmitter(conveyor, n));
        return new ConcurrentInboundEdgeStream(emitters, ordinal, priority);
    }
}

