/*
 * Decompiled with CFR 0.152.
 */
package io.awspring.cloud.sqs.listener.sink;

import io.awspring.cloud.sqs.CompletableFutures;
import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.listener.MessageProcessingContext;
import io.awspring.cloud.sqs.listener.sink.AbstractMessageProcessingPipelineSink;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;

public class OrderedMessageSink<T>
extends AbstractMessageProcessingPipelineSink<T> {
    private static final Logger logger = LoggerFactory.getLogger(OrderedMessageSink.class);

    @Override
    protected CompletableFuture<Void> doEmit(Collection<Message<T>> messages, MessageProcessingContext<T> context) {
        logger.trace("Emitting messages {}", (Object)MessageHeaderUtils.getId(messages));
        CompletableFuture execution = messages.stream().reduce(CompletableFuture.completedFuture(null), (resultFuture, msg) -> CompletableFutures.handleCompose(resultFuture, (v, t) -> {
            if (t == null) {
                return this.execute(msg, context).whenComplete((BiConsumer)this.logIfError((Message<T>)msg));
            }
            context.runBackPressureReleaseCallback();
            return CompletableFutures.failedFuture(t);
        }), (a, b) -> a);
        return execution.exceptionally(t -> null);
    }

    private BiConsumer<Void, Throwable> logIfError(Message<T> msg) {
        return (v, t) -> {
            if (t != null) {
                this.logError((Throwable)t, msg);
            }
        };
    }
}

