/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.processor;

import com.hazelcast.jet.AbstractProcessor;
import com.hazelcast.jet.AggregateOperation;
import com.hazelcast.jet.Inbox;
import com.hazelcast.jet.Processor;
import com.hazelcast.jet.ProcessorSupplier;
import com.hazelcast.jet.ResettableSingletonTraverser;
import com.hazelcast.jet.TimestampKind;
import com.hazelcast.jet.TimestampedEntry;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.WatermarkEmissionPolicy;
import com.hazelcast.jet.WatermarkPolicy;
import com.hazelcast.jet.WindowDefinition;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedFunctions;
import com.hazelcast.jet.function.DistributedPredicate;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.function.DistributedToLongFunction;
import com.hazelcast.jet.impl.processor.AggregateP;
import com.hazelcast.jet.impl.processor.GroupByKeyP;
import com.hazelcast.jet.impl.processor.InsertWatermarksP;
import com.hazelcast.jet.impl.processor.SessionWindowP;
import com.hazelcast.jet.impl.processor.SlidingWindowP;
import com.hazelcast.jet.impl.processor.TransformP;
import java.util.Collection;
import java.util.Map;
import javax.annotation.Nonnull;

public final class Processors {
    private Processors() {
    }

    @Nonnull
    public static <T, K, A, R> DistributedSupplier<Processor> aggregateByKey(@Nonnull DistributedFunction<? super T, K> getKeyF, @Nonnull AggregateOperation<? super T, A, R> aggregateOperation) {
        return () -> new GroupByKeyP(getKeyF, aggregateOperation);
    }

    @Nonnull
    public static <T, K, A> DistributedSupplier<Processor> accumulateByKey(@Nonnull DistributedFunction<? super T, K> getKeyF, @Nonnull AggregateOperation<? super T, A, ?> aggregateOperation) {
        return () -> new GroupByKeyP(getKeyF, aggregateOperation.withFinish(DistributedFunction.identity()));
    }

    @Nonnull
    public static <A, R> DistributedSupplier<Processor> combineByKey(@Nonnull AggregateOperation<?, A, R> aggregateOperation) {
        return () -> new GroupByKeyP(Map.Entry::getKey, Processors.withCombiningAccumulate(Map.Entry::getValue, aggregateOperation));
    }

    @Nonnull
    public static <T, A, R> DistributedSupplier<Processor> aggregate(@Nonnull AggregateOperation<T, A, R> aggregateOperation) {
        return () -> new AggregateP(aggregateOperation);
    }

    @Nonnull
    public static <T, A, R> DistributedSupplier<Processor> accumulate(@Nonnull AggregateOperation<T, A, R> aggregateOperation) {
        return () -> new AggregateP(aggregateOperation.withFinish(DistributedFunction.identity()));
    }

    @Nonnull
    public static <T, A, R> DistributedSupplier<Processor> combine(@Nonnull AggregateOperation<T, A, R> aggregateOperation) {
        return () -> new AggregateP(Processors.withCombiningAccumulate(DistributedFunction.identity(), aggregateOperation));
    }

    @Nonnull
    public static <T, K, A, R> DistributedSupplier<Processor> aggregateToSlidingWindow(@Nonnull DistributedFunction<? super T, K> getKeyF, @Nonnull DistributedToLongFunction<? super T> getTimestampF, @Nonnull TimestampKind timestampKind, @Nonnull WindowDefinition windowDef, @Nonnull AggregateOperation<? super T, A, R> aggregateOperation) {
        return Processors.aggregateByKeyAndWindow(getKeyF, getTimestampF, timestampKind, windowDef, aggregateOperation);
    }

    @Nonnull
    public static <T, K, A> DistributedSupplier<Processor> accumulateByFrame(@Nonnull DistributedFunction<? super T, K> getKeyF, @Nonnull DistributedToLongFunction<? super T> getTimestampF, @Nonnull TimestampKind timestampKind, @Nonnull WindowDefinition windowDef, @Nonnull AggregateOperation<? super T, A, ?> aggregateOperation) {
        WindowDefinition tumblingByFrame = windowDef.toTumblingByFrame();
        return Processors.aggregateByKeyAndWindow(getKeyF, getTimestampF, timestampKind, tumblingByFrame, aggregateOperation.withFinish(DistributedFunction.identity()));
    }

    @Nonnull
    public static <K, A, R> DistributedSupplier<Processor> combineToSlidingWindow(@Nonnull WindowDefinition windowDef, @Nonnull AggregateOperation<?, A, R> aggregateOperation) {
        return Processors.aggregateByKeyAndWindow(TimestampedEntry::getKey, TimestampedEntry::getTimestamp, TimestampKind.FRAME, windowDef, Processors.withCombiningAccumulate(TimestampedEntry::getValue, aggregateOperation));
    }

    @Nonnull
    private static <T, K, A, R> DistributedSupplier<Processor> aggregateByKeyAndWindow(@Nonnull DistributedFunction<? super T, K> getKeyF, @Nonnull DistributedToLongFunction<? super T> getTimestampF, @Nonnull TimestampKind timestampKind, @Nonnull WindowDefinition windowDef, @Nonnull AggregateOperation<? super T, A, R> aggregateOperation) {
        return () -> new SlidingWindowP(getKeyF, timestampKind == TimestampKind.EVENT ? item -> windowDef.higherFrameTs(getTimestampF.applyAsLong(item)) : getTimestampF, windowDef, aggregateOperation);
    }

    @Nonnull
    public static <T, K, A, R> DistributedSupplier<Processor> aggregateToSessionWindow(long sessionTimeout, @Nonnull DistributedToLongFunction<? super T> getTimestampF, @Nonnull DistributedFunction<? super T, K> getKeyF, @Nonnull AggregateOperation<? super T, A, R> aggregateOperation) {
        return () -> new SessionWindowP(sessionTimeout, getTimestampF, getKeyF, aggregateOperation);
    }

    @Nonnull
    public static <T> DistributedSupplier<Processor> insertWatermarks(@Nonnull DistributedToLongFunction<T> getTimestampF, @Nonnull DistributedSupplier<WatermarkPolicy> newWmPolicyF, @Nonnull WatermarkEmissionPolicy wmEmitPolicy) {
        return () -> new InsertWatermarksP(getTimestampF, (WatermarkPolicy)newWmPolicyF.get(), wmEmitPolicy);
    }

    @Nonnull
    public static <T, R> DistributedSupplier<Processor> map(@Nonnull DistributedFunction<T, R> mapper) {
        return () -> {
            ResettableSingletonTraverser trav = new ResettableSingletonTraverser();
            return new TransformP(item -> {
                trav.accept(mapper.apply(item));
                return trav;
            });
        };
    }

    @Nonnull
    public static <T> DistributedSupplier<Processor> filter(@Nonnull DistributedPredicate<T> predicate) {
        return () -> {
            ResettableSingletonTraverser trav = new ResettableSingletonTraverser();
            return new TransformP(item -> {
                trav.accept(predicate.test(item) ? item : null);
                return trav;
            });
        };
    }

    @Nonnull
    public static <T, R> DistributedSupplier<Processor> flatMap(@Nonnull DistributedFunction<T, ? extends Traverser<? extends R>> mapper) {
        return () -> new TransformP(mapper);
    }

    @Nonnull
    public static DistributedSupplier<Processor> noop() {
        return () -> new NoopP();
    }

    @Nonnull
    public static ProcessorSupplier nonCooperative(@Nonnull ProcessorSupplier wrapped) {
        return count -> {
            Collection<? extends Processor> ps = wrapped.get(count);
            ps.forEach(p -> ((AbstractProcessor)p).setCooperative(false));
            return ps;
        };
    }

    @Nonnull
    public static DistributedSupplier<Processor> nonCooperative(@Nonnull DistributedSupplier<Processor> wrapped) {
        return () -> {
            Processor p = (Processor)wrapped.get();
            ((AbstractProcessor)p).setCooperative(false);
            return p;
        };
    }

    private static <T, A, R> AggregateOperation<T, A, R> withCombiningAccumulate(@Nonnull DistributedFunction<T, A> mapper, @Nonnull AggregateOperation<?, A, R> aggregateOperation) {
        return aggregateOperation.withAccumulate((acc, item) -> aggregateOperation.combineAccumulatorsF().accept(acc, mapper.apply(item)));
    }

    private static class NoopP
    implements Processor {
        private NoopP() {
        }

        @Override
        public void process(int ordinal, @Nonnull Inbox inbox) {
            inbox.drain(DistributedFunctions.noopConsumer());
        }
    }
}

