/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution.init;

import com.hazelcast.core.Member;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.internal.cluster.ClusterService;
import com.hazelcast.jet.DAG;
import com.hazelcast.jet.Edge;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.ProcessorMetaSupplier;
import com.hazelcast.jet.ProcessorSupplier;
import com.hazelcast.jet.TopologyChangedException;
import com.hazelcast.jet.Vertex;
import com.hazelcast.jet.config.EdgeConfig;
import com.hazelcast.jet.impl.execution.init.Contexts;
import com.hazelcast.jet.impl.execution.init.EdgeDef;
import com.hazelcast.jet.impl.execution.init.ExecutionPlan;
import com.hazelcast.jet.impl.execution.init.VertexDef;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.nio.Address;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.partition.IPartitionService;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

public final class ExecutionPlanBuilder {
    private ExecutionPlanBuilder() {
    }

    public static Map<Member, ExecutionPlan> createExecutionPlans(NodeEngine nodeEngine, DAG dag, int defaultParallelism) {
        JetInstance instance = Util.getJetInstance(nodeEngine);
        HashSet<Member> members = new HashSet<Member>(nodeEngine.getClusterService().getSize());
        Address[] partitionOwners = new Address[nodeEngine.getPartitionService().getPartitionCount()];
        ExecutionPlanBuilder.initPartitionOwnersAndMembers(nodeEngine, members, partitionOwners);
        List<Address> addresses = members.stream().map(Member::getAddress).collect(Collectors.toList());
        int clusterSize = members.size();
        boolean isJobDistributed = clusterSize > 1;
        EdgeConfig defaultEdgeConfig = instance.getConfig().getDefaultEdgeConfig();
        Map<Member, ExecutionPlan> plans = members.stream().collect(Collectors.toMap(m -> m, m -> new ExecutionPlan(partitionOwners)));
        Map<String, Integer> vertexIdMap = ExecutionPlanBuilder.assignVertexIds(dag);
        for (Map.Entry<String, Integer> entry : vertexIdMap.entrySet()) {
            Vertex vertex = dag.getVertex(entry.getKey());
            int vertexId = entry.getValue();
            int localParallelism = vertex.getLocalParallelism() != -1 ? vertex.getLocalParallelism() : defaultParallelism;
            int totalParallelism = localParallelism * clusterSize;
            List<EdgeDef> inbound = ExecutionPlanBuilder.toEdgeDefs(dag.getInboundEdges(vertex.getName()), defaultEdgeConfig, e -> (Integer)vertexIdMap.get(e.getSourceName()), isJobDistributed);
            List<EdgeDef> outbound = ExecutionPlanBuilder.toEdgeDefs(dag.getOutboundEdges(vertex.getName()), defaultEdgeConfig, e -> (Integer)vertexIdMap.get(e.getDestName()), isJobDistributed);
            ProcessorMetaSupplier metaSupplier = vertex.getSupplier();
            metaSupplier.init(new Contexts.MetaSupplierCtx(instance, totalParallelism, localParallelism));
            Function<Address, ProcessorSupplier> procSupplierFn = metaSupplier.get(addresses);
            int procIdxOffset = 0;
            for (Map.Entry<Member, ExecutionPlan> e2 : plans.entrySet()) {
                ProcessorSupplier processorSupplier = procSupplierFn.apply(e2.getKey().getAddress());
                Util.checkSerializable(processorSupplier, "ProcessorSupplier in vertex " + vertex.getName());
                VertexDef vertexDef = new VertexDef(vertexId, vertex.getName(), processorSupplier, procIdxOffset, localParallelism);
                vertexDef.addInboundEdges(inbound);
                vertexDef.addOutboundEdges(outbound);
                e2.getValue().addVertex(vertexDef);
                procIdxOffset += localParallelism;
            }
        }
        return plans;
    }

    private static Map<String, Integer> assignVertexIds(DAG dag) {
        LinkedHashMap<String, Integer> vertexIdMap = new LinkedHashMap<String, Integer>();
        int[] vertexId = new int[]{0};
        dag.forEach(v -> {
            int n = vertexId[0];
            vertexId[0] = n + 1;
            vertexIdMap.put(v.getName(), n);
        });
        return vertexIdMap;
    }

    private static List<EdgeDef> toEdgeDefs(List<Edge> edges, EdgeConfig defaultEdgeConfig, Function<Edge, Integer> oppositeVtxId, boolean isJobDistributed) {
        return edges.stream().map(edge -> new EdgeDef((Edge)edge, edge.getConfig() == null ? defaultEdgeConfig : edge.getConfig(), (Integer)oppositeVtxId.apply((Edge)edge), isJobDistributed)).collect(Collectors.toList());
    }

    private static void initPartitionOwnersAndMembers(NodeEngine nodeEngine, Collection<Member> members, Address[] partitionOwners) {
        ClusterService clusterService = nodeEngine.getClusterService();
        IPartitionService partitionService = nodeEngine.getPartitionService();
        for (int partitionId = 0; partitionId < partitionOwners.length; ++partitionId) {
            Address address = partitionService.getPartitionOwnerOrWait(partitionId);
            MemberImpl member = clusterService.getMember(address);
            if (member == null) {
                throw new TopologyChangedException("Topology changed! " + address + " is not member anymore!");
            }
            members.add(member);
            partitionOwners[partitionId] = address;
        }
    }
}

