/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.jspecify.annotations.Nullable;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConsumerRecordRecoverer;
import org.springframework.kafka.listener.ErrorHandlingUtils;
import org.springframework.kafka.listener.ExceptionClassifier;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.RetryListener;
import org.springframework.util.Assert;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

class FallbackBatchErrorHandler
extends ExceptionClassifier
implements CommonErrorHandler {
    private final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
    private final BackOff backOff;
    private final BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer;
    private final CommonErrorHandler seeker = new SeekAfterRecoverFailsOrInterrupted();
    private final Map<Thread, Boolean> retrying = new ConcurrentHashMap<Thread, Boolean>();
    private final List<RetryListener> retryListeners = new ArrayList<RetryListener>();
    private boolean ackAfterHandle = true;
    private boolean reclassifyOnExceptionChange = true;

    FallbackBatchErrorHandler() {
        this((BackOff)new FixedBackOff(), null);
    }

    FallbackBatchErrorHandler(BackOff backOff, @Nullable ConsumerRecordRecoverer recoverer) {
        this.backOff = backOff;
        this.recoverer = (crs, ex) -> {
            if (recoverer == null) {
                this.logger.error((Throwable)ex, () -> "Records discarded: " + ErrorHandlingUtils.recordsToString(crs));
            } else {
                crs.spliterator().forEachRemaining(rec -> recoverer.accept(rec, ex));
            }
        };
    }

    public void setRetryListeners(RetryListener ... listeners) {
        Assert.noNullElements((Object[])listeners, (String)"'listeners' cannot have null elements");
        this.retryListeners.clear();
        this.retryListeners.addAll(Arrays.asList(listeners));
    }

    @Override
    public boolean isAckAfterHandle() {
        return this.ackAfterHandle;
    }

    @Override
    public void setAckAfterHandle(boolean ackAfterHandle) {
        this.ackAfterHandle = ackAfterHandle;
    }

    protected boolean isReclassifyOnExceptionChange() {
        return this.reclassifyOnExceptionChange;
    }

    public void setReclassifyOnExceptionChange(boolean reclassifyOnExceptionChange) {
        this.reclassifyOnExceptionChange = reclassifyOnExceptionChange;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handleBatch(Exception thrownException, @Nullable ConsumerRecords<?, ?> records, Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
        if (records == null || records.count() == 0) {
            this.logger.error((Throwable)thrownException, (CharSequence)"Called with no records; consumer exception");
            return;
        }
        this.retrying.put(Thread.currentThread(), true);
        try {
            ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff, this.seeker, this.recoverer, this.logger, this.getLogLevel(), this.retryListeners, this.getExceptionMatcher(), this.reclassifyOnExceptionChange);
        }
        finally {
            this.retrying.remove(Thread.currentThread());
        }
    }

    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions, Runnable publishPause) {
        if (Boolean.TRUE.equals(this.retrying.get(Thread.currentThread()))) {
            consumer.pause((Collection)consumer.assignment());
            publishPause.run();
        }
    }

    private final class SeekAfterRecoverFailsOrInterrupted
    implements CommonErrorHandler {
        SeekAfterRecoverFailsOrInterrupted() {
        }

        @Override
        public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
            data.partitions().stream().collect(Collectors.toMap(tp -> tp, tp -> ((ConsumerRecord)data.records(tp).get(0)).offset(), (u, v) -> v, LinkedHashMap::new)).forEach((arg_0, arg_1) -> consumer.seek(arg_0, arg_1));
            throw new KafkaException("Seek to current after exception", FallbackBatchErrorHandler.this.getLogLevel(), thrownException);
        }
    }
}

