/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.pipeline;

import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.SlidingWindowPolicy;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.WatermarkEmissionPolicy;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.impl.TopologicalSorter;
import com.hazelcast.jet.impl.pipeline.PipelineImpl;
import com.hazelcast.jet.impl.pipeline.transform.AbstractTransform;
import com.hazelcast.jet.impl.pipeline.transform.SinkTransform;
import com.hazelcast.jet.impl.pipeline.transform.StreamSourceTransform;
import com.hazelcast.jet.impl.pipeline.transform.TimestampTransform;
import com.hazelcast.jet.impl.pipeline.transform.Transform;
import com.hazelcast.jet.impl.util.Util;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

public class Planner {
    public final DAG dag = new DAG();
    public final Map<Transform, PlannerVertex> xform2vertex = new HashMap<Transform, PlannerVertex>();
    private final PipelineImpl pipeline;
    private final Set<String> vertexNames = new HashSet<String>();

    Planner(PipelineImpl pipeline) {
        this.pipeline = pipeline;
    }

    DAG createDag() {
        Map<Transform, List<Transform>> adjacencyMap = this.pipeline.adjacencyMap();
        Planner.validateNoLeakage(adjacencyMap);
        long frameSizeGcd = Util.gcd(adjacencyMap.keySet().stream().map(Transform::watermarkFrameSize).filter(frameSize -> frameSize > 0L).mapToLong(i -> i).toArray());
        WatermarkEmissionPolicy emitPolicy = frameSizeGcd > 0L ? WatermarkEmissionPolicy.emitByFrame(SlidingWindowPolicy.tumblingWinPolicy(frameSizeGcd)) : WatermarkEmissionPolicy.noThrottling();
        for (Transform transform : adjacencyMap.keySet()) {
            AbstractTransform t;
            if (transform instanceof StreamSourceTransform) {
                t = (StreamSourceTransform)transform;
                if (((StreamSourceTransform)t).getWmParams() == null) continue;
                ((StreamSourceTransform)t).setWmGenerationParams(((StreamSourceTransform)t).getWmParams().withEmitPolicy(emitPolicy));
                continue;
            }
            if (!(transform instanceof TimestampTransform)) continue;
            t = (TimestampTransform)transform;
            ((TimestampTransform)t).setWmGenerationParams(((TimestampTransform)t).getWmGenParams().withEmitPolicy(emitPolicy));
        }
        Iterable<Transform> sorted = TopologicalSorter.topologicalSort(adjacencyMap, Object::toString);
        for (Transform transform : sorted) {
            transform.addToDag(this);
        }
        return this.dag;
    }

    private static void validateNoLeakage(Map<Transform, List<Transform>> adjacencyMap) {
        List leakages = adjacencyMap.entrySet().stream().filter(e -> !(e.getKey() instanceof SinkTransform)).filter(e -> ((List)e.getValue()).isEmpty()).map(Map.Entry::getKey).collect(Collectors.toList());
        if (!leakages.isEmpty()) {
            throw new IllegalArgumentException("These transforms have nothing attached to them: " + leakages);
        }
    }

    public PlannerVertex addVertex(Transform transform, String name, int localParallelism, DistributedSupplier<Processor> procSupplier) {
        return this.addVertex(transform, name, localParallelism, ProcessorMetaSupplier.of(procSupplier));
    }

    public PlannerVertex addVertex(Transform transform, String name, int localParallelism, ProcessorSupplier procSupplier) {
        return this.addVertex(transform, name, localParallelism, ProcessorMetaSupplier.of(procSupplier));
    }

    public PlannerVertex addVertex(Transform transform, String name, int localParallelism, ProcessorMetaSupplier metaSupplier) {
        PlannerVertex pv = new PlannerVertex(this.dag.newVertex(name, metaSupplier));
        pv.v.localParallelism(localParallelism);
        this.xform2vertex.put(transform, pv);
        return pv;
    }

    public void addEdges(Transform transform, Vertex toVertex, BiConsumer<Edge, Integer> configureEdgeFn) {
        int destOrdinal = 0;
        for (Transform fromTransform : transform.upstream()) {
            PlannerVertex fromPv = this.xform2vertex.get(fromTransform);
            Edge edge = Edge.from(fromPv.v, fromPv.nextAvailableOrdinal()).to(toVertex, destOrdinal);
            this.dag.edge(edge);
            configureEdgeFn.accept(edge, destOrdinal);
            ++destOrdinal;
        }
    }

    public void addEdges(Transform transform, Vertex toVertex, Consumer<Edge> configureEdgeFn) {
        this.addEdges(transform, toVertex, (Edge e, Integer ord) -> configureEdgeFn.accept((Edge)e));
    }

    public void addEdges(Transform transform, Vertex toVertex) {
        this.addEdges(transform, toVertex, (Edge e) -> {});
    }

    @Nonnull
    public String uniqueVertexName(@Nonnull String proposedName, @Nonnull String proposedNameSuffix) {
        return Planner.uniqueName(this.vertexNames, proposedName, proposedNameSuffix);
    }

    @Nonnull
    static String uniqueName(@Nonnull Set<String> knownNames, @Nonnull String proposedName, @Nonnull String proposedNameSuffix) {
        int index = 1;
        String candidate;
        while (!knownNames.add(candidate = proposedName + (index == 1 ? "" : "-" + index) + proposedNameSuffix)) {
            ++index;
        }
        return candidate;
    }

    public static <E> List<E> tailList(List<E> list) {
        return list.subList(1, list.size());
    }

    public static class PlannerVertex {
        public final Vertex v;
        private int availableOrdinal;

        PlannerVertex(Vertex v) {
            this.v = v;
        }

        public String toString() {
            return this.v.toString();
        }

        public int nextAvailableOrdinal() {
            return this.availableOrdinal++;
        }
    }
}

