/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.pipeline.transform;

import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.Partitioner;
import com.hazelcast.jet.core.SlidingWindowPolicy;
import com.hazelcast.jet.core.TimestampKind;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedFunctions;
import com.hazelcast.jet.function.KeyedWindowResultFunction;
import com.hazelcast.jet.impl.JetEvent;
import com.hazelcast.jet.impl.pipeline.Planner;
import com.hazelcast.jet.impl.pipeline.transform.AbstractTransform;
import com.hazelcast.jet.impl.pipeline.transform.Transform;
import com.hazelcast.jet.pipeline.SessionWindowDef;
import com.hazelcast.jet.pipeline.SlidingWindowDef;
import com.hazelcast.jet.pipeline.WindowDefinition;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;

public class WindowGroupTransform<K, R, OUT>
extends AbstractTransform {
    @Nonnull
    private final WindowDefinition wDef;
    @Nonnull
    private final List<DistributedFunction<?, ? extends K>> keyFns;
    @Nonnull
    private final AggregateOperation<?, ? extends R> aggrOp;
    @Nonnull
    private final KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn;

    public WindowGroupTransform(@Nonnull List<Transform> upstream, @Nonnull WindowDefinition wDef, @Nonnull List<DistributedFunction<?, ? extends K>> keyFns, @Nonnull AggregateOperation<?, ? extends R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn) {
        super(WindowGroupTransform.createName(wDef), upstream);
        this.wDef = wDef;
        this.keyFns = keyFns;
        this.aggrOp = aggrOp;
        this.mapToOutputFn = mapToOutputFn;
    }

    private static String createName(WindowDefinition wDef) {
        return wDef.kind().name().toLowerCase() + "-window";
    }

    @Override
    public long watermarkFrameSize() {
        return this.wDef.watermarkFrameSize();
    }

    @Override
    public void addToDag(Planner p) {
        if (this.wDef.kind() == WindowDefinition.WindowKind.SESSION) {
            this.addSessionWindow(p, (SessionWindowDef)this.wDef.downcast());
        } else if (this.aggrOp.combineFn() == null || this.getOptimization() == AbstractTransform.Optimization.MEMORY) {
            this.addSlidingWindowSingleStage(p, (SlidingWindowDef)this.wDef.downcast());
        } else {
            this.addSlidingWindowTwoStage(p, (SlidingWindowDef)this.wDef.downcast());
        }
    }

    private void addSlidingWindowSingleStage(Planner p, SlidingWindowDef wDef) {
        Planner.PlannerVertex pv = p.addVertex((Transform)this, p.uniqueVertexName(this.name(), ""), this.localParallelism(), Processors.aggregateToSlidingWindowP(this.keyFns, Collections.nCopies(this.keyFns.size(), JetEvent::timestamp), TimestampKind.EVENT, wDef.toSlidingWindowPolicy(), this.aggrOp, this.mapToOutputFn));
        p.addEdges((Transform)this, pv.v, (e, ord) -> e.distributed().partitioned(this.keyFns.get((int)ord)));
    }

    private void addSlidingWindowTwoStage(Planner p, SlidingWindowDef wDef) {
        String namePrefix = p.uniqueVertexName(this.name(), "-step");
        SlidingWindowPolicy winPolicy = wDef.toSlidingWindowPolicy();
        Vertex v1 = p.dag.newVertex(namePrefix + '1', Processors.accumulateByFrameP(this.keyFns, Collections.nCopies(this.keyFns.size(), JetEvent::timestamp), TimestampKind.EVENT, winPolicy, this.aggrOp));
        v1.localParallelism(this.localParallelism());
        Planner.PlannerVertex pv2 = p.addVertex((Transform)this, namePrefix + '2', this.localParallelism(), Processors.combineToSlidingWindowP(winPolicy, this.aggrOp, this.mapToOutputFn));
        p.addEdges((Transform)this, v1, (e, ord) -> e.partitioned(this.keyFns.get((int)ord), Partitioner.HASH_CODE));
        p.dag.edge(Edge.between(v1, pv2.v).distributed().partitioned(DistributedFunctions.entryKey()));
    }

    private void addSessionWindow(Planner p, SessionWindowDef wDef) {
        Planner.PlannerVertex pv = p.addVertex((Transform)this, p.uniqueVertexName(this.name(), ""), this.localParallelism(), Processors.aggregateToSessionWindowP(wDef.sessionTimeout(), Collections.nCopies(this.keyFns.size(), JetEvent::timestamp), this.keyFns, this.aggrOp, this.mapToOutputFn));
        p.addEdges((Transform)this, pv.v, (e, ord) -> e.distributed().partitioned(this.keyFns.get((int)ord)));
    }
}

