/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.pipeline;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.impl.TopologicalSorter;
import com.hazelcast.jet.impl.pipeline.PipelineImpl;
import com.hazelcast.jet.impl.pipeline.transform.AbstractTransform;
import com.hazelcast.jet.impl.pipeline.transform.FlatMapTransform;
import com.hazelcast.jet.impl.pipeline.transform.MapTransform;
import com.hazelcast.jet.impl.pipeline.transform.SinkTransform;
import com.hazelcast.jet.impl.pipeline.transform.StreamSourceTransform;
import com.hazelcast.jet.impl.pipeline.transform.TimestampTransform;
import com.hazelcast.jet.impl.pipeline.transform.Transform;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.ObjIntConsumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

public class Planner {
    private static final ILogger LOGGER = Logger.getLogger(Planner.class);
    private static final int MAXIMUM_WATERMARK_GAP = 1000;
    public final DAG dag = new DAG();
    public final Map<Transform, PlannerVertex> xform2vertex = new HashMap<Transform, PlannerVertex>();
    private final PipelineImpl pipeline;

    Planner(PipelineImpl pipeline) {
        this.pipeline = pipeline;
    }

    DAG createDag() {
        this.pipeline.makeNamesUnique();
        Map<Transform, List<Transform>> adjacencyMap = this.pipeline.adjacencyMap();
        Planner.validateNoLeakage(adjacencyMap);
        TopologicalSorter.checkTopologicalSort(adjacencyMap.entrySet());
        long frameSizeGcd = Util.gcd(adjacencyMap.keySet().stream().map(Transform::preferredWatermarkStride).filter(frameSize -> frameSize > 0L).mapToLong(i -> i).toArray());
        if (frameSizeGcd == 0L) {
            frameSizeGcd = 1000L;
        }
        if (frameSizeGcd > 1000L) {
            frameSizeGcd = Util.gcd(frameSizeGcd, 1000L);
        }
        LoggingUtil.logFine(LOGGER, "Watermarks in the pipeline will be throttled to %d", frameSizeGcd);
        for (Transform transform : adjacencyMap.keySet()) {
            AbstractTransform t;
            if (transform instanceof StreamSourceTransform) {
                t = (StreamSourceTransform)transform;
                EventTimePolicy eventTimePolicy = ((StreamSourceTransform)t).getEventTimePolicy();
                if (eventTimePolicy == null) continue;
                ((StreamSourceTransform)t).setEventTimePolicy(Planner.withFrameSize(eventTimePolicy, frameSizeGcd));
                continue;
            }
            if (!(transform instanceof TimestampTransform)) continue;
            t = (TimestampTransform)transform;
            ((TimestampTransform)t).setEventTimePolicy(Planner.withFrameSize(((TimestampTransform)t).getEventTimePolicy(), frameSizeGcd));
        }
        HashMap<Transform, ArrayList<Transform>> originalParents = new HashMap<Transform, ArrayList<Transform>>();
        ArrayList<Transform> transforms = new ArrayList<Transform>(adjacencyMap.keySet());
        for (int i2 = 0; i2 < transforms.size(); ++i2) {
            Transform transform = (Transform)transforms.get(i2);
            List<Transform> chain = Planner.findFusableChain(transform, adjacencyMap);
            if (chain == null) continue;
            transforms.removeAll(chain.subList(1, chain.size()));
            Transform fused = Planner.fuseFlatMapTransforms(chain);
            transforms.set(i2, fused);
            Transform lastInChain = chain.get(chain.size() - 1);
            for (Transform downstream : adjacencyMap.get(lastInChain)) {
                originalParents.put(downstream, new ArrayList<Transform>(downstream.upstream()));
                downstream.upstream().replaceAll(p -> p == lastInChain ? fused : p);
            }
        }
        for (Transform transform : transforms) {
            transform.addToDag(this);
        }
        for (Map.Entry entry : originalParents.entrySet()) {
            List<Transform> upstream = ((Transform)entry.getKey()).upstream();
            for (int i3 = 0; i3 < upstream.size(); ++i3) {
                ((Transform)entry.getKey()).upstream().set(i3, (Transform)((List)entry.getValue()).get(i3));
            }
        }
        return this.dag;
    }

    private static List<Transform> findFusableChain(@Nonnull Transform transform, @Nonnull Map<Transform, List<Transform>> adjacencyMap) {
        ArrayList<Transform> chain = new ArrayList<Transform>();
        while (transform instanceof MapTransform || transform instanceof FlatMapTransform) {
            Transform nextTransform;
            chain.add(transform);
            List<Transform> downstream = adjacencyMap.get(transform);
            if (downstream.size() != 1 || (nextTransform = downstream.get(0)).localParallelism() != transform.localParallelism() || nextTransform.shouldRebalanceInput(0)) break;
            transform = nextTransform;
        }
        return chain.size() > 1 ? chain : null;
    }

    private static Transform fuseFlatMapTransforms(List<Transform> chain) {
        assert (chain.size() > 1) : "chain.size()=" + chain.size();
        assert (chain.get(0).upstream().size() == 1);
        int lastFlatMap = 0;
        FunctionEx flatMapFn = null;
        for (int i = 0; i < chain.size(); ++i) {
            if (!(chain.get(i) instanceof FlatMapTransform)) continue;
            FunctionEx function = ((FlatMapTransform)chain.get(i)).flatMapFn();
            FunctionEx inputMapFn = Planner.mergeMapFunctions(chain.subList(lastFlatMap, i));
            flatMapFn = inputMapFn != null ? (flatMapFn == null ? t -> {
                Object mappedValue = inputMapFn.apply(t);
                return mappedValue != null ? (Traverser)function.apply(mappedValue) : Traversers.empty();
            } : flatMapFn.andThen((FunctionEx & Serializable)r -> r.map(inputMapFn).flatMap(function))) : (flatMapFn == null ? arg_0 -> function.apply(arg_0) : flatMapFn.andThen((FunctionEx & Serializable)r -> r.flatMap(function)));
            lastFlatMap = i + 1;
        }
        FunctionEx trailingMapFn = Planner.mergeMapFunctions(chain.subList(lastFlatMap, chain.size()));
        String name = chain.stream().map(Transform::name).collect(Collectors.joining(", ", "fused(", ")"));
        if (flatMapFn == null) {
            return new MapTransform(name, chain.get(0).upstream().get(0), trailingMapFn);
        }
        if (trailingMapFn != null) {
            flatMapFn = flatMapFn.andThen((FunctionEx & Serializable)t -> t.map(trailingMapFn));
        }
        return new FlatMapTransform(name, chain.get(0).upstream().get(0), flatMapFn);
    }

    private static FunctionEx mergeMapFunctions(List<Transform> chain) {
        if (chain.isEmpty()) {
            return null;
        }
        List<FunctionEx> functions = Util.toList(chain, t -> ((MapTransform)t).mapFn());
        return (FunctionEx & Serializable)t -> {
            Object result = t;
            for (int i = 0; i < functions.size() && result != null; ++i) {
                result = ((FunctionEx)functions.get(i)).apply(result);
            }
            return result;
        };
    }

    private static void validateNoLeakage(Map<Transform, List<Transform>> adjacencyMap) {
        List leakages = adjacencyMap.entrySet().stream().filter(e -> !(e.getKey() instanceof SinkTransform)).filter(e -> ((List)e.getValue()).isEmpty()).map(Map.Entry::getKey).collect(Collectors.toList());
        if (!leakages.isEmpty()) {
            throw new IllegalArgumentException("These transforms have nothing attached to them: " + leakages);
        }
    }

    public PlannerVertex addVertex(Transform transform, String name, int localParallelism, SupplierEx<Processor> procSupplier) {
        return this.addVertex(transform, name, localParallelism, ProcessorMetaSupplier.of(procSupplier));
    }

    public PlannerVertex addVertex(Transform transform, String name, int localParallelism, ProcessorSupplier procSupplier) {
        return this.addVertex(transform, name, localParallelism, ProcessorMetaSupplier.of(procSupplier));
    }

    public PlannerVertex addVertex(Transform transform, String name, int localParallelism, ProcessorMetaSupplier metaSupplier) {
        PlannerVertex pv = new PlannerVertex(this.dag.newVertex(name, metaSupplier));
        pv.v.localParallelism(localParallelism);
        this.xform2vertex.put(transform, pv);
        return pv;
    }

    public void addEdges(Transform transform, Vertex toVertex, ObjIntConsumer<Edge> configureEdgeFn) {
        int destOrdinal = 0;
        for (Transform fromTransform : transform.upstream()) {
            PlannerVertex fromPv = this.xform2vertex.get(fromTransform);
            Edge edge = Edge.from(fromPv.v, fromPv.nextAvailableOrdinal()).to(toVertex, destOrdinal);
            Planner.applyRebalancing(edge, transform);
            this.dag.edge(edge);
            configureEdgeFn.accept(edge, destOrdinal);
            ++destOrdinal;
        }
    }

    public static void applyRebalancing(Edge edge, Transform toTransform) {
        int destOrdinal = edge.getDestOrdinal();
        if (!toTransform.shouldRebalanceInput(destOrdinal)) {
            return;
        }
        edge.distributed();
        FunctionEx<?, ?> keyFn = toTransform.partitionKeyFnForInput(destOrdinal);
        if (keyFn != null) {
            edge.partitioned(keyFn);
        }
    }

    public void addEdges(Transform transform, Vertex toVertex, Consumer<Edge> configureEdgeFn) {
        this.addEdges(transform, toVertex, (Edge e, int ord) -> configureEdgeFn.accept((Edge)e));
    }

    public void addEdges(Transform transform, Vertex toVertex) {
        this.addEdges(transform, toVertex, (Edge e) -> {});
    }

    @Nonnull
    private static <T> EventTimePolicy<T> withFrameSize(EventTimePolicy<T> original, long watermarkThrottlingFrameSize) {
        return EventTimePolicy.eventTimePolicy(original.timestampFn(), original.wrapFn(), original.newWmPolicyFn(), watermarkThrottlingFrameSize, 0L, original.idleTimeoutMillis());
    }

    public static <E> List<E> tailList(List<E> list) {
        return list.subList(1, list.size());
    }

    public static class PlannerVertex {
        public final Vertex v;
        private int availableOrdinal;

        PlannerVertex(Vertex v) {
            this.v = v;
        }

        public String toString() {
            return this.v.toString();
        }

        public int nextAvailableOrdinal() {
            return this.availableOrdinal++;
        }
    }
}

