/*
 * Decompiled with CFR 0.152.
 */
package io.awspring.cloud.sqs.listener.sink.adapter;

import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.listener.MessageProcessingContext;
import io.awspring.cloud.sqs.listener.sink.MessageSink;
import io.awspring.cloud.sqs.listener.sink.adapter.AbstractDelegatingMessageListeningSinkAdapter;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

public class MessageGroupingSinkAdapter<T>
extends AbstractDelegatingMessageListeningSinkAdapter<T> {
    private static final Logger logger = LoggerFactory.getLogger(MessageGroupingSinkAdapter.class);
    private final Function<Message<T>, String> groupingFunction;

    public MessageGroupingSinkAdapter(MessageSink<T> delegate, Function<Message<T>, String> groupingFunction) {
        super(delegate);
        Assert.notNull(groupingFunction, (String)"groupingFunction cannot be null.");
        this.groupingFunction = groupingFunction;
    }

    @Override
    public CompletableFuture<Void> emit(Collection<Message<T>> messages, MessageProcessingContext<T> context) {
        logger.trace("Emitting messages {}", (Object)MessageHeaderUtils.getId(messages));
        return CompletableFuture.allOf((CompletableFuture[])messages.stream().collect(Collectors.groupingBy(this.groupingFunction)).values().stream().map(messageBatch -> this.getDelegate().emit(messageBatch, context)).toArray(CompletableFuture[]::new));
    }
}

