/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.core;

import io.atleon.core.Alo;
import io.atleon.core.AloOps;
import io.atleon.core.Deduplication;
import io.atleon.core.DeduplicationConfig;
import io.atleon.util.Defaults;
import java.util.List;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

final class DeduplicatingTransformer<T>
implements Function<Publisher<T>, Publisher<T>> {
    private static final Scheduler DEFAULT_SCHEDULER = Schedulers.newBoundedElastic((int)Defaults.THREAD_CAP, (int)Integer.MAX_VALUE, (String)DeduplicatingTransformer.class.getSimpleName());
    private final DeduplicationConfig config;
    private final Deduplicator<T, ?> deduplicator;
    private final Scheduler sourceScheduler;

    private DeduplicatingTransformer(DeduplicationConfig config, Deduplicator<T, ?> deduplicator, Scheduler sourceScheduler) {
        this.config = config;
        this.deduplicator = deduplicator;
        this.sourceScheduler = sourceScheduler;
    }

    static <T> DeduplicatingTransformer<T> identity(DeduplicationConfig config, Deduplication<T> deduplication) {
        return DeduplicatingTransformer.identity(config, deduplication, DEFAULT_SCHEDULER);
    }

    static <T> DeduplicatingTransformer<T> identity(DeduplicationConfig config, Deduplication<T> deduplication, Scheduler sourceScheduler) {
        return new DeduplicatingTransformer<T>(config, Deduplicator.identity(deduplication), sourceScheduler);
    }

    static <T> DeduplicatingTransformer<Alo<T>> alo(DeduplicationConfig config, Deduplication<T> deduplication) {
        return DeduplicatingTransformer.alo(config, deduplication, DEFAULT_SCHEDULER);
    }

    static <T> DeduplicatingTransformer<Alo<T>> alo(DeduplicationConfig config, Deduplication<T> deduplication, Scheduler sourceScheduler) {
        return new DeduplicatingTransformer<Alo<T>>(config, Deduplicator.alo(deduplication), sourceScheduler);
    }

    @Override
    public Publisher<T> apply(Publisher<T> publisher) {
        return this.config.isEnabled() ? Flux.from(publisher).switchOnFirst((signal, flux) -> flux.transform(this::applyDeduplication)) : publisher;
    }

    private Flux<T> applyDeduplication(Publisher<T> publisher) {
        Scheduler scheduler = Schedulers.single((Scheduler)this.sourceScheduler);
        return Flux.from(publisher).publishOn(scheduler, this.config.getDeduplicationSourcePrefetch()).groupBy(this.deduplicator::extractKey).flatMap(groupedFlux -> this.deduplicateGroup((GroupedFlux<Object, T>)groupedFlux, scheduler), this.config.getDeduplicationConcurrency()).subscribeOn(scheduler);
    }

    private Mono<T> deduplicateGroup(GroupedFlux<Object, T> groupedFlux, Scheduler scheduler) {
        return groupedFlux.take(this.config.getDeduplicationDuration(), scheduler).take(this.config.getMaxDeduplicationSize()).collectList().map(this.deduplicator::deduplicate);
    }

    private static final class Deduplicator<T, R> {
        private final Function<T, R> dataExtractor;
        private final Function<R, Object> keyExtractor;
        private final Function<List<T>, T> reducer;

        private Deduplicator(Function<T, R> dataExtractor, Function<R, Object> keyExtractor, Function<List<T>, T> reducer) {
            this.dataExtractor = dataExtractor;
            this.keyExtractor = keyExtractor;
            this.reducer = reducer;
        }

        public static <T> Deduplicator<T, T> identity(Deduplication<T> deduplication) {
            Function<List<T>, T> reducer = Deduplicator.singleReducer(deduplication::reduceDuplicates);
            return new Deduplicator(Function.identity(), deduplication::extractKey, reducer);
        }

        public static <T> Deduplicator<Alo<T>, T> alo(Deduplication<T> deduplication) {
            Function reducer = Deduplicator.singleReducer(deduplication::reduceDuplicates);
            Function<List, Alo> aloReducer = it -> it.size() <= 1 ? (Alo)it.get(0) : AloOps.fanIn(it).map(reducer);
            return new Deduplicator<Alo, Object>(Alo::get, deduplication::extractKey, aloReducer);
        }

        public Object extractKey(T t) {
            return this.keyExtractor.apply(this.dataExtractor.apply(t));
        }

        public T deduplicate(List<T> list) {
            return this.reducer.apply(list);
        }

        private static <T> Function<List<T>, T> singleReducer(BinaryOperator<T> accumulator) {
            return list -> list.stream().reduce(accumulator).orElseThrow(Deduplicator::newEmptyDeduplicationGroupException);
        }

        private static IllegalStateException newEmptyDeduplicationGroupException() {
            return new IllegalStateException("Something bad has happened. Deduplication group was empty.");
        }
    }
}

