/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.jet.AbstractProcessor;
import com.hazelcast.jet.Distributed;
import com.hazelcast.jet.Processor;
import com.hazelcast.jet.ProcessorMetaSupplier;
import com.hazelcast.jet.ProcessorSupplier;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.impl.connector.ReadIListP;
import com.hazelcast.jet.impl.connector.ReadIMapP;
import com.hazelcast.jet.impl.connector.WriteIListP;
import com.hazelcast.jet.impl.connector.WriteIMapP;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nonnull;

public final class Processors {
    private Processors() {
    }

    @Nonnull
    public static ProcessorMetaSupplier readMap(@Nonnull String mapName) {
        return ReadIMapP.supplier(mapName);
    }

    @Nonnull
    public static ProcessorMetaSupplier readMap(@Nonnull String mapName, @Nonnull ClientConfig clientConfig) {
        return ReadIMapP.supplier(mapName, clientConfig);
    }

    @Nonnull
    public static ProcessorMetaSupplier writeMap(@Nonnull String mapName) {
        return WriteIMapP.supplier(mapName);
    }

    @Nonnull
    public static ProcessorMetaSupplier writeMap(@Nonnull String mapName, @Nonnull ClientConfig clientConfig) {
        return WriteIMapP.supplier(mapName, clientConfig);
    }

    @Nonnull
    public static ProcessorMetaSupplier readList(@Nonnull String listName) {
        return ReadIListP.supplier(listName);
    }

    @Nonnull
    public static ProcessorMetaSupplier readList(@Nonnull String listName, @Nonnull ClientConfig clientConfig) {
        return ReadIListP.supplier(listName, clientConfig);
    }

    @Nonnull
    public static ProcessorSupplier writeList(@Nonnull String listName) {
        return WriteIListP.supplier(listName);
    }

    @Nonnull
    public static ProcessorSupplier writeList(@Nonnull String listName, @Nonnull ClientConfig clientConfig) {
        return WriteIListP.supplier(listName, clientConfig);
    }

    @Nonnull
    public static <T, R> ProcessorSupplier map(@Nonnull Distributed.Function<? super T, ? extends R> mapper) {
        return ProcessorSupplier.of(() -> {
            Traversers.ResettableSingletonTraverser trav = new Traversers.ResettableSingletonTraverser();
            return new TransformP(item -> {
                trav.item = mapper.apply(item);
                return trav;
            });
        });
    }

    @Nonnull
    public static <T> ProcessorSupplier filter(@Nonnull Distributed.Predicate<? super T> predicate) {
        return ProcessorSupplier.of(() -> {
            Traversers.ResettableSingletonTraverser trav = new Traversers.ResettableSingletonTraverser();
            return new TransformP(item -> {
                trav.item = predicate.test(item) ? item : null;
                return trav;
            });
        });
    }

    @Nonnull
    public static <T, R> ProcessorSupplier flatMap(@Nonnull Distributed.Function<? super T, ? extends Traverser<? extends R>> mapper) {
        return ProcessorSupplier.of(() -> new TransformP(mapper));
    }

    @Nonnull
    public static <T, K, A, R> ProcessorSupplier groupAndAccumulate(@Nonnull Distributed.Function<? super T, ? extends K> keyExtractor, @Nonnull Distributed.Supplier<? extends A> supplier, @Nonnull Distributed.BiFunction<? super A, ? super T, ? extends A> accumulator, @Nonnull Distributed.BiFunction<? super K, ? super A, ? extends R> finisher) {
        return ProcessorSupplier.of(() -> new GroupAndAccumulateP(keyExtractor, supplier, accumulator, finisher));
    }

    @Nonnull
    public static <T, A> ProcessorSupplier groupAndAccumulate(@Nonnull Distributed.Function<? super T, ?> keyExtractor, @Nonnull Distributed.Supplier<? extends A> supplier, @Nonnull Distributed.BiFunction<? super A, ? super T, ? extends A> accumulator) {
        return Processors.groupAndAccumulate(keyExtractor, supplier, accumulator, Util::entry);
    }

    @Nonnull
    public static <T, A> ProcessorSupplier groupAndAccumulate(@Nonnull Distributed.Supplier<? extends A> supplier, @Nonnull Distributed.BiFunction<? super A, ? super T, ? extends A> accumulator) {
        return Processors.groupAndAccumulate(Distributed.Function.identity(), supplier, accumulator);
    }

    @Nonnull
    public static <T, K, A, R> ProcessorSupplier groupAndCollect(@Nonnull Distributed.Function<? super T, ? extends K> keyExtractor, @Nonnull Distributed.Supplier<? extends A> supplier, @Nonnull Distributed.BiConsumer<? super A, ? super T> collector, @Nonnull Distributed.BiFunction<? super K, ? super A, ? extends R> finisher) {
        return ProcessorSupplier.of(() -> new GroupAndCollectP(keyExtractor, supplier, collector, finisher));
    }

    @Nonnull
    public static <T, A> ProcessorSupplier groupAndCollect(@Nonnull Distributed.Function<? super T, ?> keyExtractor, @Nonnull Distributed.Supplier<? extends A> supplier, @Nonnull Distributed.BiConsumer<? super A, ? super T> collector) {
        return Processors.groupAndCollect(keyExtractor, supplier, collector, Util::entry);
    }

    @Nonnull
    public static <T, A> ProcessorSupplier groupAndCollect(@Nonnull Distributed.Supplier<? extends A> supplier, @Nonnull Distributed.BiConsumer<? super A, ? super T> collector) {
        return Processors.groupAndCollect(Distributed.Function.identity(), supplier, collector);
    }

    @Nonnull
    public static <T, A, R> ProcessorSupplier accumulate(@Nonnull Distributed.Supplier<? extends A> supplier, @Nonnull Distributed.BiFunction<? super A, ? super T, ? extends A> accumulator, @Nonnull Distributed.Function<? super A, ? extends R> finisher) {
        return Processors.groupAndAccumulate(x -> true, supplier, accumulator, (dummyTrueBoolean, a) -> finisher.apply((Object)a));
    }

    @Nonnull
    public static <T, A> ProcessorSupplier accumulate(@Nonnull Distributed.Supplier<? extends A> supplier, @Nonnull Distributed.BiFunction<? super A, ? super T, ? extends A> accumulator) {
        return Processors.groupAndAccumulate(x -> true, supplier, accumulator, (dummyTrueBoolean, a) -> a);
    }

    @Nonnull
    public static <T, A, R> ProcessorSupplier collect(@Nonnull Distributed.Supplier<? extends A> supplier, @Nonnull Distributed.BiConsumer<? super A, ? super T> collector, @Nonnull Distributed.Function<? super A, ? extends R> finisher) {
        return Processors.groupAndCollect(x -> true, supplier, collector, (dummyTrueBoolean, a) -> finisher.apply((Object)a));
    }

    @Nonnull
    public static <T, A> ProcessorSupplier collect(@Nonnull Distributed.Supplier<? extends A> supplier, @Nonnull Distributed.BiConsumer<? super A, ? super T> collector) {
        return Processors.groupAndCollect(x -> true, supplier, collector, (dummyTrueBoolean, a) -> a);
    }

    @Nonnull
    public static <T, K> ProcessorSupplier countDistinct(@Nonnull Distributed.Function<T, K> keyExtractor) {
        return ProcessorSupplier.of(() -> new CountDistinctP(keyExtractor));
    }

    @Nonnull
    public static <T> ProcessorSupplier countDistinct() {
        return ProcessorSupplier.of(() -> new CountDistinctP<Object, Object>(x -> x));
    }

    private static class CountDistinctP<T, K>
    extends AbstractProcessor {
        private final Distributed.Function<T, K> extractKey;
        private final Set<K> seenItems = new HashSet<K>();

        CountDistinctP(@Nonnull Distributed.Function<T, K> extractKey) {
            this.extractKey = extractKey;
        }

        @Override
        protected boolean tryProcess(int ordinal, @Nonnull Object item) throws Exception {
            assert (ordinal == 0);
            this.seenItems.add(this.extractKey.apply(item));
            return true;
        }

        @Override
        public boolean complete() {
            this.emit(this.seenItems.size());
            return true;
        }
    }

    private static class GroupAndCollectP<T, K, A, R>
    extends ReducingProcessorBase<T, K, A, R> {
        private final BiConsumer<? super A, ? super T> collector;

        GroupAndCollectP(@Nonnull Function<? super T, ? extends K> keyExtractor, @Nonnull Supplier<? extends A> supplier, @Nonnull BiConsumer<? super A, ? super T> collector, @Nonnull BiFunction<? super K, ? super A, ? extends R> finisher) {
            super(keyExtractor, supplier, finisher);
            this.collector = collector;
        }

        @Override
        protected boolean tryProcess(int ordinal, @Nonnull Object item) throws Exception {
            Object acc = this.groups.computeIfAbsent(this.keyExtractor.apply(item), k -> this.supplier.get());
            this.collector.accept(acc, item);
            return true;
        }
    }

    private static class GroupAndAccumulateP<T, K, A, R>
    extends ReducingProcessorBase<T, K, A, R> {
        private final BiFunction<? super A, ? super T, ? extends A> accumulator;

        GroupAndAccumulateP(@Nonnull Function<? super T, ? extends K> keyExtractor, @Nonnull Supplier<? extends A> supplier, @Nonnull BiFunction<? super A, ? super T, ? extends A> accumulator, @Nonnull BiFunction<? super K, ? super A, ? extends R> finisher) {
            super(keyExtractor, supplier, finisher);
            this.accumulator = accumulator;
        }

        @Override
        protected boolean tryProcess(int ordinal, @Nonnull Object item) throws Exception {
            this.groups.compute(this.keyExtractor.apply(item), (x, a) -> this.accumulator.apply(a != null ? a : this.supplier.get(), item));
            return true;
        }
    }

    private static abstract class ReducingProcessorBase<T, K, A, R>
    extends AbstractProcessor {
        final Function<? super T, ? extends K> keyExtractor;
        final Supplier<? extends A> supplier;
        final Map<K, A> groups = new HashMap<K, A>();
        final Traverser<R> resultTraverser;

        ReducingProcessorBase(@Nonnull Function<? super T, ? extends K> keyExtractor, @Nonnull Supplier<? extends A> supplier, @Nonnull BiFunction<? super K, ? super A, ? extends R> finisher) {
            this.keyExtractor = keyExtractor;
            this.supplier = supplier;
            this.resultTraverser = Traversers.lazy(() -> Traversers.traverseStream(this.groups.entrySet().stream().map(entry -> finisher.apply((Object)entry.getKey(), (Object)entry.getValue()))));
        }

        @Override
        public boolean complete() {
            return this.emitCooperatively(this.resultTraverser);
        }
    }

    private static class TransformP<T, R>
    extends AbstractProcessor {
        private final AbstractProcessor.FlatMapper<T, R> flatMapper;

        TransformP(@Nonnull Distributed.Function<? super T, ? extends Traverser<? extends R>> mapper) {
            this.flatMapper = this.flatMapper(mapper);
        }

        @Override
        protected boolean tryProcess(int ordinal, @Nonnull Object item) throws Exception {
            return this.flatMapper.tryProcess(item);
        }
    }

    public static class NoopP
    implements Processor {
    }
}

