/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.pipeline.test;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.AppendableTraverser;
import com.hazelcast.jet.core.EventTimeMapper;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.pipeline.test.GeneratorFunction;
import com.hazelcast.logging.ILogger;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;

public class ParallelStreamP<T>
extends AbstractProcessor {
    private static final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1L);
    private final long nanoTimeMillisToCurrentTimeMillis = ParallelStreamP.determineTimeOffset();
    private final long periodNanos;
    private final EventTimeMapper<? super T> eventTimeMapper;
    private int globalProcessorIndex;
    private long startNanoTime;
    private long totalParallelism;
    private long nowNanoTime;
    private long sequence;
    private Traverser<Object> traverser = new AppendableTraverser<Object>(2);
    private final List<? extends GeneratorFunction<T>> generators;
    private List<GeneratorFunction<T>> assignedGenerators;
    private ILogger logger;

    public ParallelStreamP(long eventsPerSecondPerGenerator, EventTimePolicy<? super T> eventTimePolicy, List<? extends GeneratorFunction<T>> generators) {
        this.startNanoTime = System.currentTimeMillis();
        this.periodNanos = NANOS_PER_SECOND / eventsPerSecondPerGenerator;
        this.eventTimeMapper = new EventTimeMapper<T>(eventTimePolicy);
        this.eventTimeMapper.addPartitions(generators.size());
        this.generators = generators;
    }

    @Override
    protected void init(@Nonnull Processor.Context context) {
        this.totalParallelism = context.totalParallelism();
        this.globalProcessorIndex = context.globalProcessorIndex();
        this.startNanoTime = TimeUnit.MILLISECONDS.toNanos(this.startNanoTime + this.nanoTimeMillisToCurrentTimeMillis) + (long)this.globalProcessorIndex * this.periodNanos;
        this.assignedGenerators = IntStream.range(0, this.generators.size()).filter(i -> (long)i % this.totalParallelism == (long)this.globalProcessorIndex).mapToObj(this.generators::get).collect(Collectors.toList());
        this.logger = context.logger();
    }

    @Override
    public boolean complete() {
        this.nowNanoTime = System.nanoTime();
        try {
            this.emitEvents();
        }
        catch (Exception e) {
            this.logger.severe((Throwable)e);
        }
        return false;
    }

    private void emitEvents() throws Exception {
        long emitUpTo = (this.nowNanoTime - this.startNanoTime) / this.periodNanos;
        while (this.emitFromTraverser(this.traverser) && this.sequence < emitUpTo) {
            long timestampNanoTime = this.startNanoTime + this.sequence * this.periodNanos;
            long timestamp = TimeUnit.NANOSECONDS.toMillis(timestampNanoTime) - this.nanoTimeMillisToCurrentTimeMillis;
            for (GeneratorFunction<T> generator : this.assignedGenerators) {
                T item = generator.generate(timestamp, this.sequence);
                this.traverser = this.eventTimeMapper.flatMapEvent(this.nowNanoTime, item, this.globalProcessorIndex, timestamp);
            }
            ++this.sequence;
        }
    }

    private static long determineTimeOffset() {
        return TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - System.currentTimeMillis();
    }
}

