package org.springframework.kafka.listener;

import java.util.Collection;
import java.util.function.BiConsumer;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

@Deprecated
/* loaded from: input_file:org/springframework/kafka/listener/RetryingBatchErrorHandler.class */
public class RetryingBatchErrorHandler extends ExceptionClassifier implements ListenerInvokingBatchErrorHandler {
    private final LogAccessor logger;
    private final BackOff backOff;
    private final BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer;
    private final CommonErrorHandler seeker;
    private final ThreadLocal<Boolean> retrying;
    private boolean ackAfterHandle;

    public RetryingBatchErrorHandler() {
        this(new FixedBackOff(), null);
    }

    public RetryingBatchErrorHandler(BackOff backOff, @Nullable ConsumerRecordRecoverer consumerRecordRecoverer) {
        this.logger = new LogAccessor(LogFactory.getLog(getClass()));
        this.seeker = new ErrorHandlerAdapter(new SeekToCurrentBatchErrorHandler());
        this.retrying = ThreadLocal.withInitial(() -> {
            return false;
        });
        this.ackAfterHandle = true;
        this.backOff = backOff;
        this.recoverer = (consumerRecords, exc) -> {
            if (consumerRecordRecoverer == null) {
                this.logger.error(exc, () -> {
                    return "Records discarded: " + ErrorHandlingUtils.recordsToString(consumerRecords);
                });
            } else {
                consumerRecords.spliterator().forEachRemaining(consumerRecord -> {
                    consumerRecordRecoverer.accept(consumerRecord, exc);
                });
            }
        };
    }

    @Override // org.springframework.kafka.listener.GenericErrorHandler
    public boolean isAckAfterHandle() {
        return this.ackAfterHandle;
    }

    @Override // org.springframework.kafka.listener.GenericErrorHandler
    public void setAckAfterHandle(boolean z) {
        this.ackAfterHandle = z;
    }

    @Override // org.springframework.kafka.listener.ListenerInvokingBatchErrorHandler, org.springframework.kafka.listener.ContainerAwareBatchErrorHandler, org.springframework.kafka.listener.BatchErrorHandler
    public void handle(Exception exc, @Nullable ConsumerRecords<?, ?> consumerRecords, Consumer<?, ?> consumer, MessageListenerContainer messageListenerContainer, Runnable runnable) {
        if (consumerRecords == null || consumerRecords.count() == 0) {
            this.logger.error(exc, "Called with no records; consumer exception");
            return;
        }
        this.retrying.set(true);
        try {
            ErrorHandlingUtils.retryBatch(exc, consumerRecords, consumer, messageListenerContainer, runnable, this.backOff, this.seeker, this.recoverer, this.logger, getLogLevel(), null, getClassifier());
            this.retrying.set(false);
        } catch (Throwable th) {
            this.retrying.set(false);
            throw th;
        }
    }

    @Deprecated
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> collection) {
        if (this.retrying.get().booleanValue()) {
            consumer.pause(consumer.assignment());
        }
    }

    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> collection, Runnable runnable) {
        if (this.retrying.get().booleanValue()) {
            consumer.pause(consumer.assignment());
            runnable.run();
        }
    }
}
