/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.reactive.client.internal.api;

import java.util.function.Function;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.reactive.client.api.MessageGroupingFunction;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.scheduler.Scheduler;
import reactor.util.concurrent.Queues;

public final class GroupOrderedMessageProcessors {
    private GroupOrderedMessageProcessors() {
    }

    public static <T> Flux<GroupedFlux<Integer, Message<T>>> groupByProcessingGroup(Flux<Message<T>> messageFlux, MessageGroupingFunction groupingFunction, int numberOfGroups) {
        return messageFlux.groupBy(message -> groupingFunction.resolveProcessingGroup((Message<?>)message, numberOfGroups), Math.max(Queues.XS_BUFFER_SIZE, numberOfGroups));
    }

    public static <T, R> Flux<R> processGroupsInOrderConcurrently(Flux<Message<T>> messageFlux, MessageGroupingFunction groupingFunction, Function<? super Message<T>, ? extends Publisher<? extends R>> messageHandler, Scheduler scheduler, int concurrency) {
        return GroupOrderedMessageProcessors.groupByProcessingGroup(messageFlux, groupingFunction, concurrency).flatMap(groupedFlux -> groupedFlux.publishOn(scheduler).concatMap(messageHandler), concurrency);
    }

    public static <T, R> Flux<R> processGroupsInOrderInParallel(Flux<Message<T>> messageFlux, MessageGroupingFunction groupingFunction, Function<? super Message<T>, ? extends Publisher<? extends R>> messageHandler, Scheduler scheduler, int parallelism) {
        return GroupOrderedMessageProcessors.groupByProcessingGroup(messageFlux, groupingFunction, parallelism).parallel(parallelism).runOn(scheduler).flatMap(groupedFlux -> groupedFlux.concatMap(messageHandler)).sequential();
    }
}

