/*
 * Decompiled with CFR 0.152.
 */
package io.awspring.cloud.sqs.listener.acknowledgement;

import io.awspring.cloud.sqs.CompletableFutures;
import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.listener.ContainerOptions;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementCallback;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementExecutor;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementOrdering;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallbackException;
import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback;
import io.awspring.cloud.sqs.listener.acknowledgement.ExecutingAcknowledgementProcessor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.StopWatch;

public abstract class AbstractOrderingAcknowledgementProcessor<T>
implements ExecutingAcknowledgementProcessor<T>,
AcknowledgementCallback<T> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractOrderingAcknowledgementProcessor.class);
    private final Object lifecycleMonitor = new Object();
    private static final String DEFAULT_MESSAGE_GROUP = "default";
    private final Lock orderedExecutionLock = new ReentrantLock(true);
    private int maxAcknowledgementsPerBatch;
    private AcknowledgementExecutor<T> acknowledgementExecutor;
    private AcknowledgementOrdering acknowledgementOrdering;
    private final Map<String, CompletableFuture<Void>> lastAcknowledgementFutureMap = new ConcurrentHashMap<String, CompletableFuture<Void>>();
    private AsyncAcknowledgementResultCallback<T> acknowledgementResultCallback = new AsyncAcknowledgementResultCallback<T>(){};
    private boolean running;
    private String id;
    private Function<Message<T>, String> messageGroupingFunction;

    @Override
    public AcknowledgementCallback<T> getAcknowledgementCallback() {
        return this;
    }

    @Override
    public void configure(ContainerOptions<?, ?> containerOptions) {
        this.acknowledgementOrdering = containerOptions.getAcknowledgementOrdering();
        this.doConfigure(containerOptions);
    }

    protected void doConfigure(ContainerOptions<?, ?> containerOptions) {
    }

    @Override
    public void setAcknowledgementExecutor(AcknowledgementExecutor<T> acknowledgementExecutor) {
        Assert.notNull(acknowledgementExecutor, (String)"acknowledgementExecutor cannot be null");
        this.acknowledgementExecutor = acknowledgementExecutor;
    }

    @Override
    public void setAcknowledgementResultCallback(AsyncAcknowledgementResultCallback<T> acknowledgementResultCallback) {
        Assert.notNull(acknowledgementResultCallback, (String)"acknowledgementResultCallback cannot be null");
        this.acknowledgementResultCallback = acknowledgementResultCallback;
    }

    public void setMaxAcknowledgementsPerBatch(int maxAcknowledgementsPerBatch) {
        Assert.isTrue((maxAcknowledgementsPerBatch > 0 ? 1 : 0) != 0, (String)"maxAcknowledgementsPerBatch must be greater than zero");
        this.maxAcknowledgementsPerBatch = maxAcknowledgementsPerBatch;
    }

    public void setMessageGroupingFunction(Function<Message<T>, String> messageGroupingFunction) {
        Assert.notNull(messageGroupingFunction, (String)"messageGroupingFunction cannot be null");
        this.messageGroupingFunction = messageGroupingFunction;
    }

    @Override
    public void setId(String id) {
        Assert.notNull((Object)id, (String)"id cannot be null");
        this.id = id;
    }

    @Override
    public String getId() {
        return this.id;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            Assert.notNull(this.acknowledgementExecutor, (String)"acknowledgementExecutor not set");
            Assert.notNull((Object)((Object)this.acknowledgementOrdering), (String)"acknowledgementOrdering not set");
            Assert.notNull((Object)this.id, (String)"id not set");
            logger.debug("Starting {} with ordering {} and batch size {}", new Object[]{this.id, this.acknowledgementOrdering, this.maxAcknowledgementsPerBatch});
            this.running = true;
            this.validateAndInitializeMessageGrouping();
            this.doStart();
        }
    }

    private void validateAndInitializeMessageGrouping() {
        Assert.isTrue((this.isValidOrderedByGroup() || this.isValidNotOrderedByGroup() ? 1 : 0) != 0, (String)"Invalid configuration for acknowledgement ordering.");
        if (this.messageGroupingFunction == null) {
            this.messageGroupingFunction = msg -> DEFAULT_MESSAGE_GROUP;
        }
    }

    private boolean isValidOrderedByGroup() {
        return AcknowledgementOrdering.ORDERED_BY_GROUP.equals((Object)this.acknowledgementOrdering) && this.messageGroupingFunction != null;
    }

    private boolean isValidNotOrderedByGroup() {
        return !AcknowledgementOrdering.ORDERED_BY_GROUP.equals((Object)this.acknowledgementOrdering) && this.messageGroupingFunction == null;
    }

    protected void doStart() {
    }

    @Override
    public CompletableFuture<Void> onAcknowledge(Message<T> message) {
        if (!this.isRunning()) {
            logger.debug("{} not running, returning for message {}", (Object)this.id, (Object)MessageHeaderUtils.getId(message));
            return CompletableFuture.completedFuture(null);
        }
        logger.trace("Received message {} to acknowledge.", (Object)MessageHeaderUtils.getId(message));
        return this.doOnAcknowledge(message);
    }

    @Override
    public CompletableFuture<Void> onAcknowledge(Collection<Message<T>> messages) {
        logger.trace("Received messages {} to acknowledge.", (Object)MessageHeaderUtils.getId(messages));
        if (!this.isRunning()) {
            logger.debug("{} not running, returning for messages {}", (Object)this.id, (Object)MessageHeaderUtils.getId(messages));
            return CompletableFuture.completedFuture(null);
        }
        return this.doOnAcknowledge(messages);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        if (!this.isRunning()) {
            logger.debug("{} already stopped", (Object)this.id);
            return;
        }
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            logger.debug("Stopping {}", (Object)this.id);
            this.running = false;
            this.doStop();
        }
    }

    protected void doStop() {
    }

    public boolean isRunning() {
        return this.running;
    }

    protected Function<Message<T>, String> getMessageGroupingFunction() {
        return this.messageGroupingFunction;
    }

    protected CompletableFuture<Void> sendToExecutor(Collection<Message<T>> messagesToAck) {
        StopWatch watch = new StopWatch();
        watch.start();
        return CompletableFutures.exceptionallyCompose(this.sendToExecutorParallelOrOrdered(messagesToAck), t -> this.logAcknowledgementError(messagesToAck, (Throwable)t)).whenComplete((BiConsumer)this.logExecutionTime(messagesToAck, watch));
    }

    private BiConsumer<Void, Throwable> logExecutionTime(Collection<Message<T>> messagesToAck, StopWatch watch) {
        return (v, t) -> {
            watch.stop();
            logger.trace("Took {}ms to acknowledge messages {}", (Object)watch.getTotalTimeMillis(), (Object)MessageHeaderUtils.getId(messagesToAck));
        };
    }

    private CompletableFuture<Void> sendToExecutorParallelOrOrdered(Collection<Message<T>> messagesToAck) {
        return AcknowledgementOrdering.PARALLEL.equals((Object)this.acknowledgementOrdering) ? this.sendToExecutorParallel(messagesToAck) : this.sendToExecutorOrdered(messagesToAck);
    }

    private CompletableFuture<Void> sendToExecutorParallel(Collection<Message<T>> messagesToAck) {
        return CompletableFuture.allOf((CompletableFuture[])this.partitionMessages(messagesToAck).stream().map(this::doSendToExecutor).toArray(CompletableFuture[]::new));
    }

    private CompletableFuture<Void> sendToExecutorOrdered(Collection<Message<T>> messagesToAck) {
        this.orderedExecutionLock.lock();
        try {
            CompletableFuture<Void> completableFuture = CompletableFuture.allOf((CompletableFuture[])this.partitionMessages(messagesToAck).stream().map(this::doSendToExecutorOrdered).flatMap(Collection::stream).toArray(CompletableFuture[]::new));
            return completableFuture;
        }
        finally {
            this.orderedExecutionLock.unlock();
        }
    }

    private Collection<CompletableFuture<Void>> doSendToExecutorOrdered(Collection<Message<T>> messagesToAck) {
        return messagesToAck.stream().collect(Collectors.groupingBy(this.messageGroupingFunction)).entrySet().stream().filter(entry -> ((List)entry.getValue()).size() > 0).map(entry -> this.sendGroupToExecutor((String)entry.getKey(), (List)entry.getValue())).collect(Collectors.toList());
    }

    private CompletableFuture<Void> sendGroupToExecutor(String group, List<Message<T>> messages) {
        CompletionStage nextFuture = ((CompletableFuture)this.lastAcknowledgementFutureMap.computeIfAbsent(group, newGroup -> CompletableFuture.completedFuture(null)).exceptionally(t -> null)).thenCompose(theVoid -> this.doSendToExecutor(messages));
        this.lastAcknowledgementFutureMap.put(group, (CompletableFuture<Void>)nextFuture);
        this.removeCompletedFutures();
        return nextFuture;
    }

    private void removeCompletedFutures() {
        List<String> completedFutures = this.lastAcknowledgementFutureMap.entrySet().stream().filter(entry -> ((CompletableFuture)entry.getValue()).isDone()).map(Map.Entry::getKey).collect(Collectors.toList());
        logger.trace("Removing completed futures from groups {}", completedFutures);
        completedFutures.forEach(this.lastAcknowledgementFutureMap::remove);
    }

    private CompletableFuture<Void> doSendToExecutor(Collection<Message<T>> messagesToAck) {
        return CompletableFutures.handleCompose(this.acknowledgementExecutor.execute(messagesToAck), (v, t) -> t == null ? this.executeResultCallback(messagesToAck, null) : this.executeResultCallback(messagesToAck, (Throwable)t).thenCompose(theVoid -> CompletableFutures.failedFuture(t)));
    }

    private CompletableFuture<Void> executeResultCallback(Collection<Message<T>> messagesToAck, Throwable ackThrowable) {
        return CompletableFutures.exceptionallyCompose(this.doExecuteResultCallback(messagesToAck, ackThrowable), t -> CompletableFutures.failedFuture(new AcknowledgementResultCallbackException("Error executing acknowledgement result callback", (Throwable)t)));
    }

    private CompletableFuture<Void> doExecuteResultCallback(Collection<Message<T>> messagesToAck, Throwable t) {
        logger.trace("Executing result callback for {} in {}", (Object)MessageHeaderUtils.getId(messagesToAck), (Object)this.id);
        return t == null ? this.acknowledgementResultCallback.onSuccess(messagesToAck) : this.acknowledgementResultCallback.onFailure(messagesToAck, t);
    }

    private CompletableFuture<Void> logAcknowledgementError(Collection<Message<T>> messagesToAck, Throwable t) {
        logger.error("Acknowledgement processing has thrown an error for messages {} in {}", new Object[]{MessageHeaderUtils.getId(messagesToAck), this.id, t});
        return CompletableFutures.failedFuture(t);
    }

    private Collection<Collection<Message<T>>> partitionMessages(Collection<Message<T>> messagesToAck) {
        logger.trace("Partitioning {} messages in {}", (Object)messagesToAck.size(), (Object)this.id);
        List<Message<T>> messagesToUse = this.getMessagesAsList(messagesToAck);
        int totalSize = messagesToUse.size();
        return IntStream.rangeClosed(0, (totalSize - 1) / this.maxAcknowledgementsPerBatch).mapToObj(index -> messagesToUse.subList(index * this.maxAcknowledgementsPerBatch, Math.min((index + 1) * this.maxAcknowledgementsPerBatch, totalSize))).collect(Collectors.toList());
    }

    private List<Message<T>> getMessagesAsList(Collection<Message<T>> messagesToAck) {
        return messagesToAck instanceof List ? (List<Object>)messagesToAck : new ArrayList<Message<T>>(messagesToAck);
    }

    protected abstract CompletableFuture<Void> doOnAcknowledge(Message<T> var1);

    protected abstract CompletableFuture<Void> doOnAcknowledge(Collection<Message<T>> var1);
}

