/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import java.time.Duration;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.springframework.core.log.LogAccessor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.kafka.listener.KafkaConsumerTimingAdjuster;
import org.springframework.retry.backoff.Sleeper;
import org.springframework.util.Assert;

public class WakingKafkaConsumerTimingAdjuster
implements KafkaConsumerTimingAdjuster {
    private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(WakingKafkaConsumerTimingAdjuster.class));
    private static final long HUNDRED = 100L;
    private static final Duration DEFAULT_TIMING_ADJUSTMENT_THRESHOLD = Duration.ofMillis(100L);
    private static final int DEFAULT_POLL_TIMEOUTS_FOR_ADJUSTMENT_WINDOW = 2;
    private Duration timingAdjustmentThreshold = DEFAULT_TIMING_ADJUSTMENT_THRESHOLD;
    private int pollTimeoutsForAdjustmentWindow = 2;
    private final TaskExecutor timingAdjustmentTaskExecutor;
    private final Sleeper sleeper;

    public WakingKafkaConsumerTimingAdjuster(TaskExecutor timingAdjustmentTaskExecutor, Sleeper sleeper) {
        Assert.notNull((Object)timingAdjustmentTaskExecutor, (String)"Task executor cannot be null.");
        Assert.notNull((Object)sleeper, (String)"Sleeper cannot be null.");
        this.timingAdjustmentTaskExecutor = timingAdjustmentTaskExecutor;
        this.sleeper = sleeper;
    }

    public WakingKafkaConsumerTimingAdjuster(TaskExecutor timingAdjustmentTaskExecutor) {
        Assert.notNull((Object)timingAdjustmentTaskExecutor, (String)"Task executor cannot be null.");
        this.timingAdjustmentTaskExecutor = timingAdjustmentTaskExecutor;
        this.sleeper = Thread::sleep;
    }

    public void setPollTimeoutsForAdjustmentWindow(int pollTimeoutsForAdjustmentWindow) {
        this.pollTimeoutsForAdjustmentWindow = pollTimeoutsForAdjustmentWindow;
    }

    public void setTimingAdjustmentThreshold(Duration timingAdjustmentThreshold) {
        this.timingAdjustmentThreshold = timingAdjustmentThreshold;
    }

    @Override
    public long adjustTiming(Consumer<?, ?> consumerToAdjust, TopicPartition topicPartition, long pollTimeout, long timeUntilDue) {
        boolean isInAdjustmentWindow = timeUntilDue > pollTimeout && timeUntilDue <= pollTimeout * (long)this.pollTimeoutsForAdjustmentWindow;
        long adjustmentAmount = timeUntilDue % pollTimeout;
        if (isInAdjustmentWindow && adjustmentAmount > this.timingAdjustmentThreshold.toMillis()) {
            this.timingAdjustmentTaskExecutor.execute(() -> this.doApplyTimingAdjustment(consumerToAdjust, topicPartition, adjustmentAmount));
            return adjustmentAmount;
        }
        return 0L;
    }

    private void doApplyTimingAdjustment(Consumer<?, ?> consumerForTimingAdjustment, TopicPartition topicPartition, long adjustmentAmount) {
        try {
            LOGGER.debug(() -> String.format("Applying timing adjustment of %s millis for TopicPartition %s", adjustmentAmount, topicPartition));
            this.sleeper.sleep(adjustmentAmount);
            LOGGER.debug(() -> "Waking up consumer for partition topic: " + topicPartition);
            consumerForTimingAdjustment.wakeup();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException("Interrupted waking up consumer while applying timing adjustment for TopicPartition " + topicPartition, e);
        }
        catch (Exception e) {
            LOGGER.error((Throwable)e, () -> "Error waking up consumer while applying timing adjustment for TopicPartition " + topicPartition);
        }
    }
}

