/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.core.processor;

import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.BiPredicateEx;
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.PredicateEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.function.ToLongFunctionEx;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.ResettableSingletonTraverser;
import com.hazelcast.jet.core.SlidingWindowPolicy;
import com.hazelcast.jet.core.TimestampKind;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.core.function.KeyedWindowResultFunction;
import com.hazelcast.jet.datamodel.KeyedWindowResult;
import com.hazelcast.jet.datamodel.WindowResult;
import com.hazelcast.jet.function.TriFunction;
import com.hazelcast.jet.impl.processor.AsyncTransformUsingServiceOrderedP;
import com.hazelcast.jet.impl.processor.AsyncTransformUsingServiceUnorderedP;
import com.hazelcast.jet.impl.processor.GroupP;
import com.hazelcast.jet.impl.processor.InsertWatermarksP;
import com.hazelcast.jet.impl.processor.SessionWindowP;
import com.hazelcast.jet.impl.processor.SlidingWindowP;
import com.hazelcast.jet.impl.processor.TransformP;
import com.hazelcast.jet.impl.processor.TransformStatefulP;
import com.hazelcast.jet.impl.processor.TransformUsingServiceP;
import com.hazelcast.jet.pipeline.ServiceFactory;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public final class Processors {
    private Processors() {
    }

    @Nonnull
    public static <A, R> SupplierEx<Processor> aggregateP(@Nonnull AggregateOperation<A, R> aggrOp) {
        return (SupplierEx & Serializable)() -> new GroupP(Collections.nCopies(aggrOp.arity(), (FunctionEx & Serializable)t -> "ALL"), aggrOp, (k, r) -> r);
    }

    @Nonnull
    public static <A, R> SupplierEx<Processor> accumulateP(@Nonnull AggregateOperation<A, R> aggrOp) {
        return (SupplierEx & Serializable)() -> new GroupP(Collections.nCopies(aggrOp.arity(), (FunctionEx & Serializable)t -> "ALL"), aggrOp.withIdentityFinish(), (k, r) -> r);
    }

    @Nonnull
    public static <A, R> SupplierEx<Processor> combineP(@Nonnull AggregateOperation<A, R> aggrOp) {
        return (SupplierEx & Serializable)() -> new GroupP((FunctionEx & Serializable)t -> "ALL", aggrOp.withCombiningAccumulateFn(FunctionEx.identity()), (k, r) -> r);
    }

    @Nonnull
    public static <K, A, R, OUT> SupplierEx<Processor> aggregateByKeyP(@Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull AggregateOperation<A, R> aggrOp, @Nonnull BiFunctionEx<? super K, ? super R, OUT> mapToOutputFn) {
        return (SupplierEx & Serializable)() -> new GroupP(keyFns, aggrOp, mapToOutputFn);
    }

    @Nonnull
    public static <K, A> SupplierEx<Processor> accumulateByKeyP(@Nonnull List<FunctionEx<?, ? extends K>> getKeyFns, @Nonnull AggregateOperation<A, ?> aggrOp) {
        return (SupplierEx & Serializable)() -> new GroupP(getKeyFns, aggrOp.withIdentityFinish(), Util::entry);
    }

    @Nonnull
    public static <K, A, R, OUT> SupplierEx<Processor> combineByKeyP(@Nonnull AggregateOperation<A, R> aggrOp, @Nonnull BiFunctionEx<? super K, ? super R, OUT> mapToOutputFn) {
        return (SupplierEx & Serializable)() -> new GroupP(Map.Entry::getKey, aggrOp.withCombiningAccumulateFn(Map.Entry::getValue), mapToOutputFn);
    }

    @Nonnull
    public static <K, A, R, OUT> SupplierEx<Processor> aggregateToSlidingWindowP(@Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull List<ToLongFunctionEx<?>> timestampFns, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy, long earlyResultsPeriod, @Nonnull AggregateOperation<A, ? extends R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn) {
        return Processors.aggregateByKeyAndWindowP(keyFns, timestampFns, timestampKind, winPolicy, earlyResultsPeriod, aggrOp, mapToOutputFn, true);
    }

    @Nonnull
    public static <K, A> SupplierEx<Processor> accumulateByFrameP(@Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull List<ToLongFunctionEx<?>> timestampFns, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy, @Nonnull AggregateOperation<A, ?> aggrOp) {
        return Processors.aggregateByKeyAndWindowP(keyFns, timestampFns, timestampKind, winPolicy.toTumblingByFrame(), 0L, aggrOp.withIdentityFinish(), KeyedWindowResult::new, false);
    }

    @Nonnull
    public static <K, A, R, OUT> SupplierEx<Processor> combineToSlidingWindowP(@Nonnull SlidingWindowPolicy winPolicy, @Nonnull AggregateOperation<A, ? extends R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn) {
        FunctionEx & Serializable keyFn = KeyedWindowResult::key;
        ToLongFunctionEx & Serializable timestampFn = WindowResult::end;
        return Processors.aggregateByKeyAndWindowP(Collections.singletonList(keyFn), Collections.singletonList(timestampFn), TimestampKind.FRAME, winPolicy, 0L, aggrOp.withCombiningAccumulateFn(WindowResult::result), mapToOutputFn, true);
    }

    @Nonnull
    private static <K, A, R, OUT> SupplierEx<Processor> aggregateByKeyAndWindowP(@Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull List<ToLongFunctionEx<?>> timestampFns, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy, long earlyResultsPeriod, @Nonnull AggregateOperation<A, ? extends R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn, boolean isLastStage) {
        return (SupplierEx & Serializable)() -> new SlidingWindowP(keyFns, com.hazelcast.jet.impl.util.Util.toList(timestampFns, f -> Processors.toFrameTimestampFn(f, timestampKind, winPolicy)), winPolicy, earlyResultsPeriod, aggrOp, mapToOutputFn, isLastStage);
    }

    private static ToLongFunctionEx<Object> toFrameTimestampFn(@Nonnull ToLongFunctionEx<?> timestampFnX, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy) {
        ToLongFunctionEx<?> timestampFn = timestampFnX;
        return timestampKind == TimestampKind.EVENT ? (ToLongFunctionEx & Serializable)item -> winPolicy.higherFrameTs(timestampFn.applyAsLong(item)) : (ToLongFunctionEx & Serializable)item -> winPolicy.higherFrameTs(timestampFn.applyAsLong(item) - 1L);
    }

    @Nonnull
    public static <K, A, R, OUT> SupplierEx<Processor> aggregateToSessionWindowP(long sessionTimeout, long earlyResultsPeriod, @Nonnull List<ToLongFunctionEx<?>> timestampFns, @Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull AggregateOperation<A, ? extends R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn) {
        return (SupplierEx & Serializable)() -> new SessionWindowP(sessionTimeout, earlyResultsPeriod, timestampFns, keyFns, aggrOp, mapToOutputFn);
    }

    @Nonnull
    public static <T> SupplierEx<Processor> insertWatermarksP(@Nonnull EventTimePolicy<? super T> eventTimePolicy) {
        return (SupplierEx & Serializable)() -> new InsertWatermarksP(eventTimePolicy);
    }

    @Nonnull
    public static <T, R> SupplierEx<Processor> mapP(@Nonnull FunctionEx<? super T, ? extends R> mapFn) {
        return (SupplierEx & Serializable)() -> {
            ResettableSingletonTraverser trav = new ResettableSingletonTraverser();
            return new TransformP((FunctionEx & Serializable)item -> {
                trav.accept(mapFn.apply(item));
                return trav;
            });
        };
    }

    @Nonnull
    public static <T> SupplierEx<Processor> filterP(@Nonnull PredicateEx<? super T> filterFn) {
        return Processors.mapP((FunctionEx & Serializable)t -> filterFn.test(t) ? t : null);
    }

    @Nonnull
    public static <T, R> SupplierEx<Processor> flatMapP(@Nonnull FunctionEx<? super T, ? extends Traverser<? extends R>> flatMapFn) {
        return (SupplierEx & Serializable)() -> new TransformP(flatMapFn);
    }

    @Nonnull
    public static <T, K, S, R> SupplierEx<Processor> mapStatefulP(long ttl, @Nonnull FunctionEx<? super T, ? extends K> keyFn, @Nonnull ToLongFunctionEx<? super T> timestampFn, @Nonnull Supplier<? extends S> createFn, @Nonnull TriFunction<? super S, ? super K, ? super T, ? extends R> statefulMapFn, @Nullable TriFunction<? super S, ? super K, ? super Long, ? extends R> onEvictFn) {
        return (SupplierEx & Serializable)() -> {
            ResettableSingletonTraverser mainTrav = new ResettableSingletonTraverser();
            ResettableSingletonTraverser evictTrav = new ResettableSingletonTraverser();
            TriFunction onEvictFnCopy = onEvictFn;
            return new TransformStatefulP(ttl, (Function<Object, Object>)keyFn, (ToLongFunction<Object>)timestampFn, createFn, (state, key, item) -> {
                mainTrav.accept(statefulMapFn.apply((Object)state, (Object)key, (Object)item));
                return mainTrav;
            }, onEvictFnCopy != null ? (s, k, wm) -> {
                evictTrav.accept(onEvictFnCopy.apply(s, k, wm));
                return evictTrav;
            } : null);
        };
    }

    @Nonnull
    public static <T, K, S, R> SupplierEx<Processor> flatMapStatefulP(long ttl, @Nonnull FunctionEx<? super T, ? extends K> keyFn, @Nonnull ToLongFunctionEx<? super T> timestampFn, @Nonnull Supplier<? extends S> createFn, @Nonnull TriFunction<? super S, ? super K, ? super T, ? extends Traverser<R>> statefulFlatMapFn, @Nullable TriFunction<? super S, ? super K, ? super Long, ? extends Traverser<R>> onEvictFn) {
        return (SupplierEx & Serializable)() -> new TransformStatefulP(ttl, keyFn, timestampFn, createFn, statefulFlatMapFn, onEvictFn);
    }

    @Nonnull
    public static <C, S, T, R> ProcessorSupplier mapUsingServiceP(@Nonnull ServiceFactory<C, S> serviceFactory, @Nonnull BiFunctionEx<? super S, ? super T, ? extends R> mapFn) {
        return TransformUsingServiceP.supplier(serviceFactory, (singletonTraverser, context, item) -> {
            singletonTraverser.accept(mapFn.apply(context, item));
            return singletonTraverser;
        });
    }

    @Nonnull
    public static <C, S, T, K, R> ProcessorSupplier mapUsingServiceAsyncP(@Nonnull ServiceFactory<C, S> serviceFactory, int maxConcurrentOps, boolean preserveOrder, @Nonnull FunctionEx<T, K> extractKeyFn, @Nonnull BiFunctionEx<? super S, ? super T, CompletableFuture<R>> mapAsyncFn) {
        BiFunctionEx & Serializable flatMapAsyncFn = (BiFunctionEx & Serializable)(s, t) -> ((CompletableFuture)mapAsyncFn.apply(s, t)).thenApply(Traversers::singleton);
        return preserveOrder ? AsyncTransformUsingServiceOrderedP.supplier(serviceFactory, maxConcurrentOps, flatMapAsyncFn) : AsyncTransformUsingServiceUnorderedP.supplier(serviceFactory, maxConcurrentOps, flatMapAsyncFn, extractKeyFn);
    }

    @Nonnull
    public static <C, S, T> ProcessorSupplier filterUsingServiceP(@Nonnull ServiceFactory<C, S> serviceFactory, @Nonnull BiPredicateEx<? super S, ? super T> filterFn) {
        return TransformUsingServiceP.supplier(serviceFactory, (singletonTraverser, context, item) -> {
            singletonTraverser.accept(filterFn.test(context, item) ? item : null);
            return singletonTraverser;
        });
    }

    @Nonnull
    public static <C, S, T, R> ProcessorSupplier flatMapUsingServiceP(@Nonnull ServiceFactory<C, S> serviceFactory, @Nonnull BiFunctionEx<? super S, ? super T, ? extends Traverser<R>> flatMapFn) {
        return TransformUsingServiceP.supplier(serviceFactory, (singletonTraverser, service, item) -> (Traverser)flatMapFn.apply(service, item));
    }

    @Nonnull
    public static SupplierEx<Processor> noopP() {
        return (SupplierEx & Serializable)() -> new NoopP();
    }

    private static class NoopP
    implements Processor {
        private Outbox outbox;

        private NoopP() {
        }

        @Override
        public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) throws Exception {
            this.outbox = outbox;
        }

        @Override
        public void process(int ordinal, @Nonnull Inbox inbox) {
            inbox.drain(ConsumerEx.noop());
        }

        @Override
        public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
            return this.outbox.offer(watermark);
        }

        @Override
        public void restoreFromSnapshot(@Nonnull Inbox inbox) {
            inbox.drain(ConsumerEx.noop());
        }
    }
}

