/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.examples.sessionwindow;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.ToLongFunctionEx;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.datamodel.KeyedWindowResult;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.examples.sessionwindow.GenerateEventsP;
import com.hazelcast.jet.examples.sessionwindow.ProductEvent;
import com.hazelcast.jet.examples.sessionwindow.ProductEventType;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.WindowDefinition;
import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Set;
import javax.annotation.Nonnull;

public class SessionWindow {
    private static final long JOB_DURATION_MS = 60000L;
    private static final int SESSION_TIMEOUT = 5000;

    private static Pipeline buildPipeline() {
        AggregateOperation1 aggrOp = AggregateOperations.allOf((AggregateOperation1)AggregateOperations.summingLong((ToLongFunctionEx & Serializable)e -> e.getProductEventType() == ProductEventType.VIEW_LISTING ? 1L : 0L), (AggregateOperation1)AggregateOperations.mapping((FunctionEx & Serializable)e -> e.getProductEventType() == ProductEventType.PURCHASE ? e.getProductId() : null, (AggregateOperation1)AggregateOperations.toSet()));
        Pipeline p = Pipeline.create();
        p.readFrom(SessionWindow.eventsSource()).withTimestamps(ProductEvent::getTimestamp, 0L).groupingKey(ProductEvent::getUserId).window((WindowDefinition)WindowDefinition.session((long)5000L)).aggregate(aggrOp).writeTo(Sinks.logger(SessionWindow::sessionToString));
        return p;
    }

    private static StreamSource<ProductEvent> eventsSource() {
        return Sources.streamFromProcessor((String)"generator", (ProcessorMetaSupplier)ProcessorMetaSupplier.preferLocalParallelismOne(GenerateEventsP::new));
    }

    @Nonnull
    private static String sessionToString(KeyedWindowResult<String, Tuple2<Long, Set<String>>> wr) {
        return String.format("Session{userId=%s, start=%s, duration=%2ds, value={viewed=%2d, purchases=%s}", wr.key(), Instant.ofEpochMilli(wr.start()).atZone(ZoneId.systemDefault()).toLocalTime(), Duration.ofMillis(wr.end() - wr.start()).getSeconds(), ((Tuple2)wr.result()).f0(), ((Tuple2)wr.result()).f1());
    }

    public static void main(String[] args) throws Exception {
        JetInstance jet = Jet.bootstrappedInstance();
        try {
            jet.newJob(SessionWindow.buildPipeline());
            Thread.sleep(60000L);
        }
        finally {
            Jet.shutdownAll();
        }
    }
}

