/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.core.processor;

import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.BiPredicateEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.PredicateEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.function.ToLongFunctionEx;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.ResettableSingletonTraverser;
import com.hazelcast.jet.core.SlidingWindowPolicy;
import com.hazelcast.jet.core.TimestampKind;
import com.hazelcast.jet.core.function.KeyedWindowResultFunction;
import com.hazelcast.jet.datamodel.KeyedWindowResult;
import com.hazelcast.jet.datamodel.WindowResult;
import com.hazelcast.jet.function.TriFunction;
import com.hazelcast.jet.impl.processor.AggregateP;
import com.hazelcast.jet.impl.processor.AsyncTransformUsingServiceOrderedP;
import com.hazelcast.jet.impl.processor.AsyncTransformUsingServiceUnorderedP;
import com.hazelcast.jet.impl.processor.GroupP;
import com.hazelcast.jet.impl.processor.InsertWatermarksP;
import com.hazelcast.jet.impl.processor.NoopP;
import com.hazelcast.jet.impl.processor.ProcessorSuppliers;
import com.hazelcast.jet.impl.processor.SessionWindowP;
import com.hazelcast.jet.impl.processor.SlidingWindowP;
import com.hazelcast.jet.impl.processor.SortP;
import com.hazelcast.jet.impl.processor.TransformP;
import com.hazelcast.jet.impl.processor.TransformStatefulP;
import com.hazelcast.jet.impl.processor.TransformUsingServiceP;
import com.hazelcast.jet.pipeline.ServiceFactory;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public final class Processors {
    private Processors() {
    }

    @Nonnull
    public static <A, R> SupplierEx<Processor> aggregateP(@Nonnull AggregateOperation<A, R> aggrOp) {
        return () -> new AggregateP(aggrOp);
    }

    @Nonnull
    public static <A, R> SupplierEx<Processor> accumulateP(@Nonnull AggregateOperation<A, R> aggrOp) {
        return new ProcessorSuppliers.AggregatePSupplier<A, A>(aggrOp.withIdentityFinish());
    }

    @Nonnull
    public static <A, R> SupplierEx<Processor> combineP(@Nonnull AggregateOperation<A, R> aggrOp) {
        return new ProcessorSuppliers.AggregatePSupplier(aggrOp.withCombiningAccumulateFn(FunctionEx.identity()));
    }

    @Nonnull
    public static <K, A, R, OUT> SupplierEx<Processor> aggregateByKeyP(@Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull AggregateOperation<A, R> aggrOp, @Nonnull BiFunctionEx<? super K, ? super R, OUT> mapToOutputFn) {
        return () -> new GroupP(keyFns, aggrOp, mapToOutputFn);
    }

    @Nonnull
    public static <K, A> SupplierEx<Processor> accumulateByKeyP(@Nonnull List<FunctionEx<?, ? extends K>> getKeyFns, @Nonnull AggregateOperation<A, ?> aggrOp) {
        return () -> new GroupP(getKeyFns, aggrOp.withIdentityFinish(), Util::entry);
    }

    @Nonnull
    public static <K, A, R, OUT> SupplierEx<Processor> combineByKeyP(@Nonnull AggregateOperation<A, R> aggrOp, @Nonnull BiFunctionEx<? super K, ? super R, OUT> mapToOutputFn) {
        return () -> new GroupP(Map.Entry::getKey, aggrOp.withCombiningAccumulateFn(Map.Entry::getValue), mapToOutputFn);
    }

    @Nonnull
    public static <K, A, R, OUT> SupplierEx<Processor> aggregateToSlidingWindowP(@Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull List<ToLongFunctionEx<?>> timestampFns, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy, long earlyResultsPeriod, @Nonnull AggregateOperation<A, ? extends R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn) {
        return Processors.aggregateByKeyAndWindowP(keyFns, timestampFns, timestampKind, winPolicy, earlyResultsPeriod, aggrOp, mapToOutputFn, true);
    }

    @Nonnull
    public static <K, A, R, OUT> SupplierEx<Processor> aggregateToSlidingWindowP(@Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull List<ToLongFunctionEx<?>> timestampFns, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy, long earlyResultsPeriod, @Nonnull AggregateOperation<A, ? extends R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn, byte windowWatermarkKey) {
        return Processors.aggregateByKeyAndWindowP(keyFns, timestampFns, timestampKind, winPolicy, earlyResultsPeriod, aggrOp, mapToOutputFn, true, windowWatermarkKey);
    }

    @Nonnull
    public static <K, A> SupplierEx<Processor> accumulateByFrameP(@Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull List<ToLongFunctionEx<?>> timestampFns, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy, @Nonnull AggregateOperation<A, ?> aggrOp) {
        return Processors.aggregateByKeyAndWindowP(keyFns, timestampFns, timestampKind, winPolicy.toTumblingByFrame(), 0L, aggrOp.withIdentityFinish(), KeyedWindowResult::new, false);
    }

    @Nonnull
    public static <K, A> SupplierEx<Processor> accumulateByFrameP(@Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull List<ToLongFunctionEx<?>> timestampFns, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy, @Nonnull AggregateOperation<A, ?> aggrOp, byte watermarkKey) {
        return Processors.aggregateByKeyAndWindowP(keyFns, timestampFns, timestampKind, winPolicy.toTumblingByFrame(), 0L, aggrOp.withIdentityFinish(), KeyedWindowResult::new, false, watermarkKey);
    }

    @Nonnull
    public static <K, A, R, OUT> SupplierEx<Processor> combineToSlidingWindowP(@Nonnull SlidingWindowPolicy winPolicy, @Nonnull AggregateOperation<A, ? extends R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn) {
        return Processors.combineToSlidingWindowP(winPolicy, aggrOp, mapToOutputFn, (byte)0);
    }

    @Nonnull
    public static <K, A, R, OUT> SupplierEx<Processor> combineToSlidingWindowP(@Nonnull SlidingWindowPolicy winPolicy, @Nonnull AggregateOperation<A, ? extends R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn, byte windowWatermarkKey) {
        FunctionEx<KeyedWindowResult, Object> keyFn = KeyedWindowResult::key;
        ToLongFunctionEx<KeyedWindowResult> timestampFn = WindowResult::end;
        return Processors.aggregateByKeyAndWindowP(Collections.singletonList(keyFn), Collections.singletonList(timestampFn), TimestampKind.FRAME, winPolicy, 0L, aggrOp.withCombiningAccumulateFn(WindowResult::result), mapToOutputFn, true, windowWatermarkKey);
    }

    @Nonnull
    private static <K, A, R, OUT> SupplierEx<Processor> aggregateByKeyAndWindowP(@Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull List<ToLongFunctionEx<?>> timestampFns, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy, long earlyResultsPeriod, @Nonnull AggregateOperation<A, ? extends R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn, boolean isLastStage) {
        return Processors.aggregateByKeyAndWindowP(keyFns, timestampFns, timestampKind, winPolicy, earlyResultsPeriod, aggrOp, mapToOutputFn, isLastStage, (byte)0);
    }

    @Nonnull
    private static <K, A, R, OUT> SupplierEx<Processor> aggregateByKeyAndWindowP(@Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull List<ToLongFunctionEx<?>> timestampFns, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy, long earlyResultsPeriod, @Nonnull AggregateOperation<A, ? extends R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn, boolean isLastStage, byte windowWatermarkKey) {
        return () -> new SlidingWindowP(keyFns, com.hazelcast.jet.impl.util.Util.toList(timestampFns, f -> Processors.toFrameTimestampFn(f, timestampKind, winPolicy)), winPolicy, earlyResultsPeriod, aggrOp, mapToOutputFn, isLastStage, windowWatermarkKey);
    }

    private static ToLongFunctionEx<Object> toFrameTimestampFn(@Nonnull ToLongFunctionEx<?> timestampFnX, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy) {
        ToLongFunctionEx<?> timestampFn = timestampFnX;
        return timestampKind == TimestampKind.EVENT ? item -> winPolicy.higherFrameTs(timestampFn.applyAsLong(item)) : item -> winPolicy.higherFrameTs(timestampFn.applyAsLong(item) - 1L);
    }

    @Nonnull
    public static <K, A, R, OUT> SupplierEx<Processor> aggregateToSessionWindowP(long sessionTimeout, long earlyResultsPeriod, @Nonnull List<ToLongFunctionEx<?>> timestampFns, @Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull AggregateOperation<A, ? extends R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn) {
        return () -> new SessionWindowP(sessionTimeout, earlyResultsPeriod, timestampFns, keyFns, aggrOp, mapToOutputFn, 0);
    }

    @Nonnull
    public static <T> SupplierEx<Processor> insertWatermarksP(@Nonnull EventTimePolicy<? super T> eventTimePolicy) {
        return Processors.insertWatermarksP((ProcessorSupplier.Context ctx) -> eventTimePolicy);
    }

    @Nonnull
    public static <T> SupplierEx<Processor> insertWatermarksP(@Nonnull FunctionEx<ProcessorSupplier.Context, EventTimePolicy<? super T>> eventTimePolicyProvider) {
        return () -> new InsertWatermarksP(eventTimePolicyProvider);
    }

    @Nonnull
    public static <T, R> SupplierEx<Processor> mapP(@Nonnull FunctionEx<? super T, ? extends R> mapFn) {
        return new ProcessorSuppliers.ProcessorMapPSupplier<T, R>(mapFn);
    }

    @Nonnull
    public static <T> SupplierEx<Processor> filterP(@Nonnull PredicateEx<? super T> filterFn) {
        return Processors.mapP(t2 -> filterFn.test(t2) ? t2 : null);
    }

    @Nonnull
    public static <T, R> SupplierEx<Processor> flatMapP(@Nonnull FunctionEx<? super T, ? extends Traverser<? extends R>> flatMapFn) {
        return () -> new TransformP(flatMapFn);
    }

    @Nonnull
    public static <T, K, S, R> SupplierEx<Processor> mapStatefulP(long ttl, @Nonnull FunctionEx<? super T, ? extends K> keyFn, @Nonnull ToLongFunctionEx<? super T> timestampFn, @Nonnull Supplier<? extends S> createFn, @Nonnull TriFunction<? super S, ? super K, ? super T, ? extends R> statefulMapFn, @Nullable TriFunction<? super S, ? super K, ? super Long, ? extends R> onEvictFn) {
        return () -> {
            ResettableSingletonTraverser mainTrav = new ResettableSingletonTraverser();
            ResettableSingletonTraverser evictTrav = new ResettableSingletonTraverser();
            TriFunction onEvictFnCopy = onEvictFn;
            return new TransformStatefulP(ttl, keyFn, timestampFn, createFn, (state, key, item) -> {
                mainTrav.accept(statefulMapFn.apply((Object)state, (Object)key, (Object)item));
                return mainTrav;
            }, onEvictFnCopy != null ? (s2, k, wm) -> {
                evictTrav.accept(onEvictFnCopy.apply(s2, k, wm));
                return evictTrav;
            } : null);
        };
    }

    @Nonnull
    public static <T, K, S, R> SupplierEx<Processor> flatMapStatefulP(long ttl, @Nonnull FunctionEx<? super T, ? extends K> keyFn, @Nonnull ToLongFunctionEx<? super T> timestampFn, @Nonnull Supplier<? extends S> createFn, @Nonnull TriFunction<? super S, ? super K, ? super T, ? extends Traverser<R>> statefulFlatMapFn, @Nullable TriFunction<? super S, ? super K, ? super Long, ? extends Traverser<R>> onEvictFn) {
        return () -> new TransformStatefulP(ttl, keyFn, timestampFn, createFn, statefulFlatMapFn, onEvictFn);
    }

    @Nonnull
    public static <C, S, T, R> ProcessorSupplier mapUsingServiceP(@Nonnull ServiceFactory<C, S> serviceFactory, @Nonnull BiFunctionEx<? super S, ? super T, ? extends R> mapFn) {
        return TransformUsingServiceP.supplier(serviceFactory, (singletonTraverser, context, item) -> {
            singletonTraverser.accept(mapFn.apply((Object)context, (Object)item));
            return singletonTraverser;
        });
    }

    @Nonnull
    public static <C, S, T, K, R> ProcessorSupplier mapUsingServiceAsyncP(@Nonnull ServiceFactory<C, S> serviceFactory, int maxConcurrentOps, boolean preserveOrder, @Nonnull FunctionEx<T, K> extractKeyFn, @Nonnull BiFunctionEx<? super S, ? super T, CompletableFuture<R>> mapAsyncFn) {
        BiFunctionEx<Object, Object, CompletableFuture> flatMapAsyncFn = (s2, t2) -> ((CompletableFuture)mapAsyncFn.apply((Object)s2, (Object)t2)).thenApply(Traversers::singleton);
        return preserveOrder ? AsyncTransformUsingServiceOrderedP.supplier(serviceFactory, maxConcurrentOps, flatMapAsyncFn) : AsyncTransformUsingServiceUnorderedP.supplier(serviceFactory, maxConcurrentOps, flatMapAsyncFn, extractKeyFn);
    }

    @Nonnull
    public static <C, S, T> ProcessorSupplier filterUsingServiceP(@Nonnull ServiceFactory<C, S> serviceFactory, @Nonnull BiPredicateEx<? super S, ? super T> filterFn) {
        return TransformUsingServiceP.supplier(serviceFactory, (singletonTraverser, context, item) -> {
            singletonTraverser.accept(filterFn.test((Object)context, (Object)item) ? item : null);
            return singletonTraverser;
        });
    }

    @Nonnull
    public static <C, S, T, R> ProcessorSupplier flatMapUsingServiceP(@Nonnull ServiceFactory<C, S> serviceFactory, @Nonnull BiFunctionEx<? super S, ? super T, ? extends Traverser<R>> flatMapFn) {
        return TransformUsingServiceP.supplier(serviceFactory, (singletonTraverser, service, item) -> (Traverser)flatMapFn.apply((Object)service, (Object)item));
    }

    @Nonnull
    public static <T> SupplierEx<Processor> sortP(Comparator<T> comparator) {
        return () -> new SortP(comparator);
    }

    @Nonnull
    public static SupplierEx<Processor> noopP() {
        return new NoopP.NoopPSupplier();
    }
}

