/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.core.processor;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.ResettableSingletonTraverser;
import com.hazelcast.jet.core.SlidingWindowPolicy;
import com.hazelcast.jet.core.TimestampKind;
import com.hazelcast.jet.core.WatermarkGenerationParams;
import com.hazelcast.jet.datamodel.TimestampedEntry;
import com.hazelcast.jet.function.DistributedBiFunction;
import com.hazelcast.jet.function.DistributedBiPredicate;
import com.hazelcast.jet.function.DistributedConsumer;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedFunctions;
import com.hazelcast.jet.function.DistributedPredicate;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.function.DistributedToLongFunction;
import com.hazelcast.jet.function.DistributedTriFunction;
import com.hazelcast.jet.function.KeyedWindowResultFunction;
import com.hazelcast.jet.impl.processor.GroupP;
import com.hazelcast.jet.impl.processor.InsertWatermarksP;
import com.hazelcast.jet.impl.processor.RollingAggregateP;
import com.hazelcast.jet.impl.processor.SessionWindowP;
import com.hazelcast.jet.impl.processor.SlidingWindowP;
import com.hazelcast.jet.impl.processor.TransformP;
import com.hazelcast.jet.impl.processor.TransformUsingContextP;
import com.hazelcast.jet.pipeline.ContextFactory;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

public final class Processors {
    private Processors() {
    }

    @Nonnull
    public static <A, R> DistributedSupplier<Processor> aggregateP(@Nonnull AggregateOperation<A, R> aggrOp) {
        return () -> new GroupP(Collections.nCopies(aggrOp.arity(), DistributedFunctions.constantKey()), aggrOp, (k, r) -> r);
    }

    @Nonnull
    public static <A, R> DistributedSupplier<Processor> accumulateP(@Nonnull AggregateOperation<A, R> aggrOp) {
        return () -> new GroupP(Collections.nCopies(aggrOp.arity(), DistributedFunctions.constantKey()), aggrOp.withIdentityFinish(), (k, r) -> r);
    }

    @Nonnull
    public static <A, R> DistributedSupplier<Processor> combineP(@Nonnull AggregateOperation<A, R> aggrOp) {
        return () -> new GroupP(DistributedFunctions.constantKey(), aggrOp.withCombiningAccumulateFn(DistributedFunction.identity()), (k, r) -> r);
    }

    @Nonnull
    public static <K, A, R, OUT> DistributedSupplier<Processor> aggregateByKeyP(@Nonnull List<DistributedFunction<?, ? extends K>> keyFns, @Nonnull AggregateOperation<A, R> aggrOp, @Nonnull DistributedBiFunction<? super K, ? super R, OUT> mapToOutputFn) {
        return () -> new GroupP(keyFns, aggrOp, mapToOutputFn);
    }

    @Nonnull
    public static <K, A> DistributedSupplier<Processor> accumulateByKeyP(@Nonnull List<DistributedFunction<?, ? extends K>> getKeyFns, @Nonnull AggregateOperation<A, ?> aggrOp) {
        return () -> new GroupP(getKeyFns, aggrOp.withIdentityFinish(), Util::entry);
    }

    @Nonnull
    public static <K, A, R, OUT> DistributedSupplier<Processor> combineByKeyP(@Nonnull AggregateOperation<A, R> aggrOp, @Nonnull DistributedBiFunction<? super K, ? super R, OUT> mapToOutputFn) {
        return () -> new GroupP(Map.Entry::getKey, aggrOp.withCombiningAccumulateFn(Map.Entry::getValue), mapToOutputFn);
    }

    @Nonnull
    public static <K, A, R, OUT> DistributedSupplier<Processor> aggregateToSlidingWindowP(@Nonnull List<DistributedFunction<?, ? extends K>> keyFns, @Nonnull List<DistributedToLongFunction<?>> timestampFns, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy, @Nonnull AggregateOperation<A, R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, OUT> mapToOutputFn) {
        return Processors.aggregateByKeyAndWindowP(keyFns, timestampFns, timestampKind, winPolicy, aggrOp, mapToOutputFn, true);
    }

    @Nonnull
    public static <K, A> DistributedSupplier<Processor> accumulateByFrameP(@Nonnull List<DistributedFunction<?, ? extends K>> keyFns, @Nonnull List<DistributedToLongFunction<?>> timestampFns, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy, @Nonnull AggregateOperation<A, ?> aggrOp) {
        return Processors.aggregateByKeyAndWindowP(keyFns, timestampFns, timestampKind, winPolicy.toTumblingByFrame(), aggrOp.withIdentityFinish(), TimestampedEntry::fromWindowResult, false);
    }

    @Nonnull
    public static <K, A, R, OUT> DistributedSupplier<Processor> combineToSlidingWindowP(@Nonnull SlidingWindowPolicy winPolicy, @Nonnull AggregateOperation<A, R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, OUT> mapToOutputFn) {
        DistributedFunction<TimestampedEntry, Object> keyFn = TimestampedEntry::getKey;
        DistributedToLongFunction<TimestampedEntry> timestampFn = TimestampedEntry::getTimestamp;
        return Processors.aggregateByKeyAndWindowP(Collections.singletonList(keyFn), Collections.singletonList(timestampFn), TimestampKind.FRAME, winPolicy, aggrOp.withCombiningAccumulateFn(TimestampedEntry::getValue), mapToOutputFn, true);
    }

    @Nonnull
    private static <K, A, R, OUT> DistributedSupplier<Processor> aggregateByKeyAndWindowP(@Nonnull List<DistributedFunction<?, ? extends K>> keyFns, @Nonnull List<DistributedToLongFunction<?>> timestampFns, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy, @Nonnull AggregateOperation<A, R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, OUT> mapToOutputFn, boolean isLastStage) {
        return () -> new SlidingWindowP(keyFns, timestampFns.stream().map(f -> Processors.toFrameTimestampFn(f, timestampKind, winPolicy)).collect(Collectors.toList()), winPolicy, aggrOp, mapToOutputFn, isLastStage);
    }

    private static DistributedToLongFunction<Object> toFrameTimestampFn(@Nonnull DistributedToLongFunction<?> timestampFnX, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy) {
        DistributedToLongFunction<?> timestampFn = timestampFnX;
        return timestampKind == TimestampKind.EVENT ? item -> winPolicy.higherFrameTs(timestampFn.applyAsLong(item)) : item -> winPolicy.higherFrameTs(timestampFn.applyAsLong(item) - 1L);
    }

    @Nonnull
    public static <K, A, R, OUT> DistributedSupplier<Processor> aggregateToSessionWindowP(long sessionTimeout, @Nonnull List<DistributedToLongFunction<?>> timestampFns, @Nonnull List<DistributedFunction<?, ? extends K>> keyFns, @Nonnull AggregateOperation<A, R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, OUT> mapToOutputFn) {
        return () -> new SessionWindowP(sessionTimeout, timestampFns, keyFns, aggrOp, mapToOutputFn);
    }

    @Nonnull
    public static <T> DistributedSupplier<Processor> insertWatermarksP(@Nonnull WatermarkGenerationParams<? super T> wmGenParams) {
        return () -> new InsertWatermarksP(wmGenParams);
    }

    @Nonnull
    public static <T, R> DistributedSupplier<Processor> mapP(@Nonnull DistributedFunction<T, R> mapFn) {
        return () -> {
            ResettableSingletonTraverser trav = new ResettableSingletonTraverser();
            return new TransformP(item -> {
                trav.accept(mapFn.apply(item));
                return trav;
            });
        };
    }

    @Nonnull
    public static <T> DistributedSupplier<Processor> filterP(@Nonnull DistributedPredicate<T> filterFn) {
        return () -> {
            ResettableSingletonTraverser trav = new ResettableSingletonTraverser();
            return new TransformP(item -> {
                trav.accept(filterFn.test(item) ? item : null);
                return trav;
            });
        };
    }

    @Nonnull
    public static <T, R> DistributedSupplier<Processor> flatMapP(@Nonnull DistributedFunction<T, ? extends Traverser<? extends R>> flatMapFn) {
        return () -> new TransformP(flatMapFn);
    }

    @Nonnull
    public static <C, T, R> ProcessorSupplier mapUsingContextP(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedBiFunction<? super C, ? super T, ? extends R> mapFn) {
        return TransformUsingContextP.supplier(contextFactory, (singletonTraverser, context, item) -> {
            singletonTraverser.accept(mapFn.apply((Object)context, (Object)item));
            return singletonTraverser;
        });
    }

    @Nonnull
    public static <C, T> ProcessorSupplier filterUsingContextP(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedBiPredicate<? super C, ? super T> filterFn) {
        return TransformUsingContextP.supplier(contextFactory, (singletonTraverser, context, item) -> {
            singletonTraverser.accept(filterFn.test((Object)context, (Object)item) ? item : null);
            return singletonTraverser;
        });
    }

    @Nonnull
    public static <C, T, R> ProcessorSupplier flatMapUsingContextP(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedBiFunction<? super C, ? super T, ? extends Traverser<? extends R>> flatMapFn) {
        return TransformUsingContextP.supplier(contextFactory, (singletonTraverser, context, item) -> (Traverser)flatMapFn.apply((Object)context, (Object)item));
    }

    @Nonnull
    public static <T, K, A, R, OUT> DistributedSupplier<Processor> rollingAggregateP(@Nonnull DistributedFunction<? super T, ? extends K> keyFn, @Nonnull AggregateOperation1<? super T, A, ? extends R> aggrOp, @Nonnull DistributedTriFunction<? super T, ? super K, ? super R, ? extends OUT> mapToOutputFn) {
        return () -> new RollingAggregateP(keyFn, aggrOp, mapToOutputFn);
    }

    @Nonnull
    public static DistributedSupplier<Processor> noopP() {
        return () -> new NoopP();
    }

    private static class NoopP
    implements Processor {
        private NoopP() {
        }

        @Override
        public void process(int ordinal, @Nonnull Inbox inbox) {
            inbox.drain(DistributedConsumer.noop());
        }

        @Override
        public void restoreFromSnapshot(@Nonnull Inbox inbox) {
            inbox.drain(DistributedConsumer.noop());
        }
    }
}

