/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.TopicPartition;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.listener.BackOffHandler;
import org.springframework.kafka.listener.KafkaBackoffException;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.ListenerContainerRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.util.Assert;

public class ContainerPartitionPausingBackOffManager
implements KafkaConsumerBackoffManager {
    private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(KafkaConsumerBackoffManager.class));
    private final ListenerContainerRegistry listenerContainerRegistry;
    private final BackOffHandler backOffHandler;

    public ContainerPartitionPausingBackOffManager(ListenerContainerRegistry listenerContainerRegistry, BackOffHandler backOffHandler) {
        Assert.notNull((Object)listenerContainerRegistry, (String)"'listenerContainerRegistry' cannot be null");
        Assert.notNull((Object)backOffHandler, (String)"'backOffHandler' cannot be null");
        this.listenerContainerRegistry = listenerContainerRegistry;
        this.backOffHandler = backOffHandler;
    }

    @Override
    public void backOffIfNecessary(KafkaConsumerBackoffManager.Context context) {
        long backoffTime = context.getDueTimestamp() - System.currentTimeMillis();
        LOGGER.debug(() -> "Back off time: " + backoffTime + " Context: " + String.valueOf(context));
        if (backoffTime > 0L) {
            this.pauseConsumptionAndThrow(context, backoffTime);
        }
    }

    private void pauseConsumptionAndThrow(KafkaConsumerBackoffManager.Context context, Long backOffTime) throws KafkaBackoffException {
        TopicPartition topicPartition = context.getTopicPartition();
        MessageListenerContainer container = this.getListenerContainerFromContext(context);
        container.pausePartition(topicPartition);
        this.backOffHandler.onNextBackOff(container, topicPartition, (long)backOffTime);
        throw new KafkaBackoffException(String.format("Partition %s from topic %s is not ready for consumption, backing off for approx. %s millis.", topicPartition.partition(), topicPartition.topic(), backOffTime), topicPartition, context.getListenerId(), context.getDueTimestamp());
    }

    private MessageListenerContainer getListenerContainerFromContext(KafkaConsumerBackoffManager.Context context) {
        MessageListenerContainer container = this.listenerContainerRegistry.getListenerContainer(context.getListenerId());
        if (container == null) {
            container = this.listenerContainerRegistry.getUnregisteredListenerContainer(context.getListenerId());
        }
        Assert.notNull((Object)container, () -> "No container found with id: " + context.getListenerId());
        return container;
    }
}

