/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.inbound;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.jspecify.annotations.Nullable;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.acks.AcknowledgmentCallbackFactory;
import org.springframework.integration.core.Pausable;
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.json.JacksonJsonUtils;
import org.springframework.integration.support.json.JacksonMessagingUtils;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.kafka.listener.ErrorHandlingUtils;
import org.springframework.kafka.listener.LoggingCommitCallback;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonPresent;
import org.springframework.kafka.support.JsonKafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.LogIfLevelEnabled;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.SerializationUtils;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

public class KafkaMessageSource<K, V>
extends AbstractMessageSource<Object>
implements Pausable,
BeanClassLoaderAware {
    private static final long MIN_ASSIGN_TIMEOUT = 2000L;
    private static final int DEFAULT_CLOSE_TIMEOUT = 30;
    public static final String REMAINING_RECORDS = "kafka_remainingRecords";
    private final ConsumerFactory<K, V> consumerFactory;
    private final KafkaAckCallbackFactory<K, V> ackCallbackFactory;
    private final Lock receiveLock = new ReentrantLock();
    private final Lock consumerLock = new ReentrantLock();
    private final Map<TopicPartition, Set<KafkaAckInfo<K, V>>> inflightRecords = new ConcurrentHashMap<TopicPartition, Set<KafkaAckInfo<K, V>>>();
    private final AtomicInteger remainingCount = new AtomicInteger();
    private final ConsumerProperties consumerProperties;
    private final Collection<TopicPartition> assignedPartitions = new LinkedHashSet<TopicPartition>();
    private final @Nullable Duration commitTimeout;
    private final Duration assignTimeout;
    private final Duration pollTimeout;
    private final AtomicBoolean running = new AtomicBoolean();
    private final AtomicBoolean pausing = new AtomicBoolean();
    private final AtomicBoolean paused = new AtomicBoolean();
    private final AtomicBoolean stopped = new AtomicBoolean();
    private RecordMessageConverter messageConverter = new MessagingMessageConverter();
    private @Nullable Class<?> payloadType;
    private boolean rawMessageHeader;
    private Duration closeTimeout = Duration.ofSeconds(30L);
    private boolean checkNullKeyForExceptions;
    private boolean checkNullValueForExceptions;
    private volatile @Nullable Consumer<K, V> consumer;
    private volatile @Nullable Iterator<ConsumerRecord<K, V>> recordsIterator;
    public volatile boolean newAssignment;
    private @Nullable ClassLoader classLoader;

    public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory, ConsumerProperties consumerProperties) {
        this(consumerFactory, consumerProperties, new KafkaAckCallbackFactory(consumerProperties), false);
    }

    public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory, ConsumerProperties consumerProperties, boolean allowMultiFetch) {
        this(consumerFactory, consumerProperties, new KafkaAckCallbackFactory(consumerProperties), allowMultiFetch);
    }

    public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory, ConsumerProperties consumerProperties, KafkaAckCallbackFactory<K, V> ackCallbackFactory) {
        this(consumerFactory, consumerProperties, ackCallbackFactory, false);
    }

    public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory, ConsumerProperties consumerProperties, KafkaAckCallbackFactory<K, V> ackCallbackFactory, boolean allowMultiFetch) {
        Assert.notNull(consumerFactory, (String)"'consumerFactory' must not be null");
        Assert.notNull(ackCallbackFactory, (String)"'ackCallbackFactory' must not be null");
        Assert.isTrue((!ObjectUtils.isEmpty((Object[])consumerProperties.getTopics()) || !ObjectUtils.isEmpty((Object[])consumerProperties.getTopicPartitions()) || consumerProperties.getTopicPattern() != null ? 1 : 0) != 0, (String)"topics, topicPattern, or topicPartitions must be provided");
        this.consumerProperties = consumerProperties;
        this.consumerFactory = this.fixOrRejectConsumerFactory(consumerFactory, allowMultiFetch);
        this.ackCallbackFactory = ackCallbackFactory;
        this.pollTimeout = Duration.ofMillis(consumerProperties.getPollTimeout());
        this.assignTimeout = Duration.ofMillis(Math.max(this.pollTimeout.toMillis() * 20L, 2000L));
        this.commitTimeout = consumerProperties.getSyncCommitTimeout();
        MessagingMessageConverter messagingMessageConverter = (MessagingMessageConverter)this.messageConverter;
        messagingMessageConverter.setGenerateMessageId(true);
        messagingMessageConverter.setGenerateTimestamp(true);
        if (JacksonPresent.isJackson3Present()) {
            JsonKafkaHeaderMapper headerMapper = new JsonKafkaHeaderMapper();
            headerMapper.addTrustedPackages(JacksonMessagingUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
            messagingMessageConverter.setHeaderMapper((KafkaHeaderMapper)headerMapper);
        } else if (JacksonPresent.isJackson2Present()) {
            DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
            headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
            messagingMessageConverter.setHeaderMapper((KafkaHeaderMapper)headerMapper);
        }
    }

    public Collection<TopicPartition> getAssignedPartitions() {
        return Collections.unmodifiableCollection(this.assignedPartitions);
    }

    public void setBeanClassLoader(ClassLoader classLoader) {
        this.classLoader = classLoader;
    }

    protected void onInit() {
        if (!StringUtils.hasText((String)this.consumerProperties.getClientId())) {
            this.consumerProperties.setClientId(this.getComponentName());
        }
        Properties kafkaConsumerProperties = this.consumerProperties.getKafkaConsumerProperties();
        this.checkNullKeyForExceptions = this.consumerProperties.isCheckDeserExWhenKeyNull() || ErrorHandlingUtils.checkDeserializer(this.consumerFactory, (Properties)kafkaConsumerProperties, (boolean)false, (ClassLoader)this.classLoader);
        this.checkNullValueForExceptions = this.consumerProperties.isCheckDeserExWhenValueNull() || ErrorHandlingUtils.checkDeserializer(this.consumerFactory, (Properties)kafkaConsumerProperties, (boolean)true, (ClassLoader)this.classLoader);
    }

    public ConsumerProperties getConsumerProperties() {
        return this.consumerProperties;
    }

    protected @Nullable String getGroupId() {
        return this.consumerProperties.getGroupId();
    }

    protected String getClientId() {
        return this.consumerProperties.getClientId();
    }

    protected long getPollTimeout() {
        return this.pollTimeout.toMillis();
    }

    protected RecordMessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setMessageConverter(RecordMessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    protected @Nullable Class<?> getPayloadType() {
        return this.payloadType;
    }

    public void setPayloadType(Class<?> payloadType) {
        this.payloadType = payloadType;
    }

    protected @Nullable ConsumerRebalanceListener getRebalanceListener() {
        return this.consumerProperties.getConsumerRebalanceListener();
    }

    public String getComponentType() {
        return "kafka:message-source";
    }

    protected boolean isRawMessageHeader() {
        return this.rawMessageHeader;
    }

    public void setRawMessageHeader(boolean rawMessageHeader) {
        this.rawMessageHeader = rawMessageHeader;
    }

    protected @Nullable Duration getCommitTimeout() {
        return this.commitTimeout;
    }

    public void setCloseTimeout(Duration closeTimeout) {
        Assert.notNull((Object)closeTimeout, (String)"'closeTimeout' cannot be null");
        this.closeTimeout = closeTimeout;
    }

    private ConsumerFactory<K, V> fixOrRejectConsumerFactory(ConsumerFactory<K, V> suppliedConsumerFactory, boolean allowMultiFetch) {
        Object maxPoll = suppliedConsumerFactory.getConfigurationProperties().get("max.poll.records");
        if (!allowMultiFetch && (maxPoll == null || this.maxPollGtrOne(maxPoll))) {
            if (!suppliedConsumerFactory.getClass().getName().equals(DefaultKafkaConsumerFactory.class.getName())) {
                throw new IllegalArgumentException("Custom consumer factory is not configured with 'max.poll.records = 1'");
            }
            this.logger.warn(() -> "max.poll.records' has been forced from " + String.valueOf(maxPoll == null ? "unspecified" : maxPoll) + " to 1, to avoid having to seek after each record");
            return KafkaMessageSource.fixConsumerFactory(suppliedConsumerFactory);
        }
        return suppliedConsumerFactory;
    }

    private boolean maxPollGtrOne(Object maxPoll) {
        return this.maxPollNumberGtrOne(maxPoll) || this.maxPollStringGtr1(maxPoll);
    }

    private boolean maxPollNumberGtrOne(Object maxPoll) {
        return maxPoll instanceof Number && ((Number)maxPoll).intValue() != 1;
    }

    private boolean maxPollStringGtr1(Object maxPoll) {
        return maxPoll instanceof String && Integer.parseInt((String)maxPoll) != 1;
    }

    public boolean isRunning() {
        return this.running.get();
    }

    public void start() {
        if (this.running.compareAndSet(false, true)) {
            this.stopped.set(false);
        }
    }

    public void stop() {
        if (this.running.compareAndSet(true, false)) {
            this.stopConsumer();
            this.stopped.set(true);
        }
    }

    public void pause() {
        this.pausing.set(true);
    }

    public void resume() {
        this.pausing.set(false);
    }

    public boolean isPaused() {
        return this.paused.get();
    }

    protected @Nullable Object doReceive() {
        this.receiveLock.lock();
        try {
            if (this.stopped.get()) {
                this.logger.debug((CharSequence)"Message source is stopped; no records will be returned");
                Object var1_1 = null;
                return var1_1;
            }
            if (this.consumer == null) {
                this.createConsumer();
            }
            Assert.state((this.consumer != null ? 1 : 0) != 0, (String)"'consumer' must not be null");
            if (this.pausing.get() && !this.paused.get() && !this.assignedPartitions.isEmpty()) {
                this.consumer.pause(this.assignedPartitions);
                this.paused.set(true);
            } else if (this.paused.get() && !this.pausing.get()) {
                this.consumer.resume(this.assignedPartitions);
                this.paused.set(false);
            }
            if (this.paused.get() && this.recordsIterator == null) {
                this.logger.debug((CharSequence)"Consumer is paused; no records will be returned");
            }
        }
        finally {
            this.receiveLock.unlock();
        }
        ConsumerRecord<K, V> record = this.pollRecord();
        return record != null ? this.recordToMessage(record) : null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void createConsumer() {
        this.consumerLock.lock();
        try {
            this.consumer = this.consumerFactory.createConsumer(this.consumerProperties.getGroupId(), this.consumerProperties.getClientId(), null, this.consumerProperties.getKafkaConsumerProperties());
            IntegrationConsumerRebalanceListener rebalanceCallback = new IntegrationConsumerRebalanceListener(this.consumerProperties.getConsumerRebalanceListener());
            Pattern topicPattern = this.consumerProperties.getTopicPattern();
            @Nullable TopicPartitionOffset @Nullable [] partitions = this.consumerProperties.getTopicPartitions();
            if (topicPattern != null) {
                this.consumer.subscribe(topicPattern, (ConsumerRebalanceListener)rebalanceCallback);
            } else if (partitions != null) {
                this.assignAndSeekPartitions(partitions);
            } else {
                this.consumer.subscribe(Arrays.asList(this.consumerProperties.getTopics()), (ConsumerRebalanceListener)rebalanceCallback);
            }
        }
        finally {
            this.consumerLock.unlock();
        }
    }

    private void assignAndSeekPartitions(@Nullable TopicPartitionOffset[] partitions) {
        List topicPartitionsToAssign = Arrays.stream(partitions).filter(Objects::nonNull).map(TopicPartitionOffset::getTopicPartition).collect(Collectors.toList());
        Assert.state((this.consumer != null ? 1 : 0) != 0, (String)"'consumer' must not be null");
        this.consumer.assign(topicPartitionsToAssign);
        this.assignedPartitions.addAll(topicPartitionsToAssign);
        for (TopicPartitionOffset partition : partitions) {
            long newOffset;
            Assert.state((partition != null ? 1 : 0) != 0, (String)"'partition' must not be null");
            if (TopicPartitionOffset.SeekPosition.BEGINNING.equals((Object)partition.getPosition())) {
                this.consumer.seekToBeginning(Collections.singleton(partition.getTopicPartition()));
                continue;
            }
            if (TopicPartitionOffset.SeekPosition.END.equals((Object)partition.getPosition())) {
                this.consumer.seekToEnd(Collections.singleton(partition.getTopicPartition()));
                continue;
            }
            TopicPartition topicPartition = partition.getTopicPartition();
            Long offset = partition.getOffset();
            if (offset == null) continue;
            if (offset < 0L) {
                if (!partition.isRelativeToCurrent()) {
                    this.consumer.seekToEnd(Collections.singleton(topicPartition));
                    continue;
                }
                newOffset = Math.max(0L, this.consumer.position(topicPartition) + offset);
            } else {
                newOffset = partition.isRelativeToCurrent() ? this.consumer.position(topicPartition) + offset : offset;
            }
            try {
                this.consumer.seek(topicPartition, newOffset);
            }
            catch (Exception ex) {
                Consumer consumer = this.consumer;
                this.logger.error((Throwable)ex, () -> "Failed to set initial offset for " + String.valueOf(topicPartition) + " at " + newOffset + ". Position is " + consumer.position(topicPartition));
            }
        }
    }

    private @Nullable ConsumerRecord<K, V> pollRecord() {
        if (this.recordsIterator != null) {
            return this.nextRecord();
        }
        this.consumerLock.lock();
        try {
            Assert.state((this.consumer != null ? 1 : 0) != 0, (String)"'consumer' must not be null");
            ConsumerRecords records = this.consumer.poll(this.assignedPartitions.isEmpty() ? this.assignTimeout : this.pollTimeout);
            this.logger.debug(() -> records == null ? "Received null" : "Received " + records.count() + " records");
            if (records == null || records.count() == 0) {
                ConsumerRecord<K, V> consumerRecord = null;
                return consumerRecord;
            }
            this.remainingCount.set(records.count());
            this.recordsIterator = records.iterator();
            ConsumerRecord<K, V> consumerRecord = this.nextRecord();
            return consumerRecord;
        }
        catch (WakeupException ex) {
            this.logger.debug((CharSequence)"Woken");
            if (this.newAssignment) {
                this.newAssignment = false;
                ConsumerRecord<K, V> consumerRecord = this.pollRecord();
                return consumerRecord;
            }
            ConsumerRecord<K, V> consumerRecord = null;
            return consumerRecord;
        }
        finally {
            this.consumerLock.unlock();
        }
    }

    private ConsumerRecord<K, V> nextRecord() {
        Assert.state((this.recordsIterator != null ? 1 : 0) != 0, (String)"'recordsIterator' must not be null");
        ConsumerRecord<K, V> record = this.recordsIterator.next();
        if (!this.recordsIterator.hasNext()) {
            this.recordsIterator = null;
        }
        this.remainingCount.decrementAndGet();
        return record;
    }

    private Object recordToMessage(ConsumerRecord<K, V> record) {
        if (record.value() == null && this.checkNullValueForExceptions) {
            this.checkDeserializationException(record, "springDeserializerExceptionValue");
        }
        if (record.key() == null && this.checkNullKeyForExceptions) {
            this.checkDeserializationException(record, "springDeserializerExceptionKey");
        }
        TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
        KafkaAckInfoImpl ackInfo = new KafkaAckInfoImpl(record, topicPartition);
        AcknowledgmentCallback ackCallback = this.ackCallbackFactory.createCallback(ackInfo);
        this.inflightRecords.computeIfAbsent(topicPartition, tp -> Collections.synchronizedSet(new TreeSet())).add(ackInfo);
        Message message = this.messageConverter.toMessage(record, (Object)(ackCallback instanceof Acknowledgment ? (Acknowledgment)ackCallback : null), this.consumer, this.payloadType);
        if (message.getHeaders() instanceof KafkaMessageHeaders) {
            Map rawHeaders = ((KafkaMessageHeaders)message.getHeaders()).getRawHeaders();
            rawHeaders.put("acknowledgmentCallback", ackCallback);
            rawHeaders.put(REMAINING_RECORDS, this.remainingCount.get());
            if (this.rawMessageHeader) {
                rawHeaders.put("kafka_data", record);
                rawHeaders.put("sourceData", record);
            }
            return message;
        }
        AbstractIntegrationMessageBuilder builder = this.getMessageBuilderFactory().fromMessage(message).setHeader("acknowledgmentCallback", (Object)ackCallback).setHeader(REMAINING_RECORDS, (Object)this.remainingCount.get());
        if (this.rawMessageHeader) {
            builder.setHeader("kafka_data", record);
            builder.setHeader("sourceData", record);
        }
        return builder;
    }

    private void checkDeserializationException(ConsumerRecord<K, V> record, String headerName) {
        DeserializationException exception = SerializationUtils.getExceptionFromHeader(record, (String)headerName, (LogAccessor)this.logger);
        if (exception != null) {
            throw exception;
        }
    }

    public void destroy() {
        this.receiveLock.lock();
        try {
            this.stopConsumer();
        }
        finally {
            this.receiveLock.unlock();
        }
    }

    private void stopConsumer() {
        this.consumerLock.lock();
        try {
            if (this.consumer != null) {
                this.consumer.wakeup();
                this.consumer.close(CloseOptions.timeout((Duration)this.closeTimeout));
                this.consumer = null;
                this.assignedPartitions.clear();
            }
        }
        finally {
            this.consumerLock.unlock();
        }
    }

    private static <K, V> ConsumerFactory<K, V> fixConsumerFactory(ConsumerFactory<K, V> suppliedConsumerFactory) {
        HashMap<String, Integer> configs = new HashMap<String, Integer>(suppliedConsumerFactory.getConfigurationProperties());
        configs.put("max.poll.records", 1);
        DefaultKafkaConsumerFactory fixedConsumerFactory = new DefaultKafkaConsumerFactory(configs);
        if (suppliedConsumerFactory.getKeyDeserializer() != null) {
            fixedConsumerFactory.setKeyDeserializer(suppliedConsumerFactory.getKeyDeserializer());
        }
        if (suppliedConsumerFactory.getValueDeserializer() != null) {
            fixedConsumerFactory.setValueDeserializer(suppliedConsumerFactory.getValueDeserializer());
        }
        return fixedConsumerFactory;
    }

    public record KafkaAckCallbackFactory<K, V>(ConsumerProperties consumerProperties) implements AcknowledgmentCallbackFactory<KafkaAckInfo<K, V>>
    {
        public AcknowledgmentCallback createCallback(KafkaAckInfo<K, V> info) {
            return new KafkaAckCallback<K, V>(info, this.consumerProperties);
        }
    }

    private class IntegrationConsumerRebalanceListener
    implements ConsumerRebalanceListener {
        private final @Nullable ConsumerRebalanceListener providedRebalanceListener;
        private final boolean isConsumerAware;

        IntegrationConsumerRebalanceListener(ConsumerRebalanceListener providedRebalanceListener) {
            this.providedRebalanceListener = providedRebalanceListener;
            this.isConsumerAware = providedRebalanceListener instanceof ConsumerAwareRebalanceListener;
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            KafkaMessageSource.this.assignedPartitions.removeAll(partitions);
            KafkaMessageSource.this.logger.info(() -> "Partitions revoked: " + String.valueOf(partitions));
            if (this.providedRebalanceListener != null) {
                if (this.isConsumerAware) {
                    Assert.state((KafkaMessageSource.this.consumer != null ? 1 : 0) != 0, (String)"'consumer' must not be null");
                    ((ConsumerAwareRebalanceListener)this.providedRebalanceListener).onPartitionsRevokedAfterCommit(KafkaMessageSource.this.consumer, partitions);
                } else {
                    this.providedRebalanceListener.onPartitionsRevoked(partitions);
                }
            }
        }

        public void onPartitionsLost(Collection<TopicPartition> partitions) {
            if (this.providedRebalanceListener != null) {
                this.providedRebalanceListener.onPartitionsLost(partitions);
            }
            this.onPartitionsRevoked(partitions);
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            KafkaMessageSource.this.assignedPartitions.addAll(partitions);
            Assert.state((KafkaMessageSource.this.consumer != null ? 1 : 0) != 0, (String)"'consumer' must not be null");
            if (KafkaMessageSource.this.paused.get()) {
                KafkaMessageSource.this.consumer.pause(KafkaMessageSource.this.assignedPartitions);
                KafkaMessageSource.this.logger.warn((CharSequence)"Paused consumer resumed by Kafka due to rebalance; consumer paused again, so the initial poll() will never return any records");
            }
            KafkaMessageSource.this.logger.info(() -> "Partitions assigned: " + String.valueOf(partitions));
            if (this.providedRebalanceListener != null) {
                if (this.isConsumerAware) {
                    ((ConsumerAwareRebalanceListener)this.providedRebalanceListener).onPartitionsAssigned(KafkaMessageSource.this.consumer, partitions);
                } else {
                    this.providedRebalanceListener.onPartitionsAssigned(partitions);
                }
            }
            KafkaMessageSource.this.consumer.wakeup();
            KafkaMessageSource.this.newAssignment = true;
        }
    }

    public class KafkaAckInfoImpl
    implements KafkaAckInfo<K, V> {
        private final ConsumerRecord<K, V> record;
        private final TopicPartition topicPartition;
        private volatile boolean rolledBack;
        private volatile boolean ackDeferred;

        KafkaAckInfoImpl(ConsumerRecord<K, V> record, TopicPartition topicPartition) {
            this.record = record;
            this.topicPartition = topicPartition;
        }

        @Override
        public Object getConsumerMonitor() {
            return KafkaMessageSource.this.consumerLock;
        }

        @Override
        public @Nullable String getGroupId() {
            return KafkaMessageSource.this.getGroupId();
        }

        @Override
        public @Nullable Consumer<K, V> getConsumer() {
            return KafkaMessageSource.this.consumer;
        }

        @Override
        public ConsumerRecord<K, V> getRecord() {
            return this.record;
        }

        @Override
        public TopicPartition getTopicPartition() {
            return this.topicPartition;
        }

        @Override
        public Map<TopicPartition, Set<KafkaAckInfo<K, V>>> getOffsets() {
            return KafkaMessageSource.this.inflightRecords;
        }

        @Override
        public boolean isRolledBack() {
            return this.rolledBack;
        }

        @Override
        public void setRolledBack(boolean rolledBack) {
            this.rolledBack = rolledBack;
        }

        @Override
        public boolean isAckDeferred() {
            return this.ackDeferred;
        }

        @Override
        public void setAckDeferred(boolean ackDeferred) {
            this.ackDeferred = ackDeferred;
        }

        @Override
        public int compareTo(KafkaAckInfo<K, V> other) {
            return Long.compare(this.record.offset(), other.getRecord().offset());
        }

        public String toString() {
            return "KafkaAckInfo [record=" + String.valueOf(this.record) + ", rolledBack=" + this.rolledBack + ", ackDeferred=" + this.ackDeferred + "]";
        }
    }

    public static interface KafkaAckInfo<K, V>
    extends Comparable<KafkaAckInfo<K, V>> {
        public Object getConsumerMonitor();

        public @Nullable String getGroupId();

        public @Nullable Consumer<K, V> getConsumer();

        public ConsumerRecord<K, V> getRecord();

        public TopicPartition getTopicPartition();

        public Map<TopicPartition, Set<KafkaAckInfo<K, V>>> getOffsets();

        public boolean isRolledBack();

        public void setRolledBack(boolean var1);

        public boolean isAckDeferred();

        public void setAckDeferred(boolean var1);
    }

    public static class KafkaAckCallback<K, V>
    implements AcknowledgmentCallback,
    Acknowledgment {
        private final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
        private final LogIfLevelEnabled commitLogger;
        private final KafkaAckInfo<K, V> ackInfo;
        private final @Nullable Duration commitTimeout;
        private final OffsetCommitCallback commitCallback;
        private final boolean isSyncCommits;
        private volatile boolean acknowledged;
        private boolean autoAckEnabled = true;

        public KafkaAckCallback(KafkaAckInfo<K, V> ackInfo, @Nullable ConsumerProperties consumerProperties) {
            Assert.notNull(ackInfo, (String)"'ackInfo' cannot be null");
            this.ackInfo = ackInfo;
            this.commitTimeout = consumerProperties != null ? consumerProperties.getSyncCommitTimeout() : null;
            this.isSyncCommits = consumerProperties == null || consumerProperties.isSyncCommits();
            this.commitCallback = consumerProperties != null && consumerProperties.getCommitCallback() != null ? consumerProperties.getCommitCallback() : new LoggingCommitCallback();
            this.commitLogger = new LogIfLevelEnabled(this.logger, consumerProperties != null ? consumerProperties.getCommitLogLevel() : LogIfLevelEnabled.Level.DEBUG);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void acknowledge(AcknowledgmentCallback.Status status) {
            Assert.notNull((Object)status, (String)"'status' cannot be null");
            if (this.acknowledged) {
                throw new IllegalStateException("Already acknowledged");
            }
            Object object = this.ackInfo.getConsumerMonitor();
            synchronized (object) {
                block13: {
                    Set<KafkaAckInfo<K, V>> inflight;
                    try {
                        ConsumerRecord<K, V> record = this.ackInfo.getRecord();
                        switch (status) {
                            case ACCEPT: 
                            case REJECT: {
                                this.commitIfPossible(record);
                                break;
                            }
                            case REQUEUE: {
                                this.rollback(record);
                                break;
                            }
                        }
                        this.acknowledged = true;
                        if (this.ackInfo.isAckDeferred()) break block13;
                        inflight = this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition());
                        Assert.state((inflight != null ? 1 : 0) != 0, (String)"'inflight' must not be null");
                    }
                    catch (WakeupException e) {
                        try {
                            throw new IllegalStateException(e);
                        }
                        catch (Throwable throwable) {
                            this.acknowledged = true;
                            if (!this.ackInfo.isAckDeferred()) {
                                Set<KafkaAckInfo<K, V>> inflight2 = this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition());
                                Assert.state((inflight2 != null ? 1 : 0) != 0, (String)"'inflight' must not be null");
                                inflight2.remove(this.ackInfo);
                            }
                            throw throwable;
                        }
                    }
                    inflight.remove(this.ackInfo);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void rollback(ConsumerRecord<K, V> record) {
            Assert.state((this.ackInfo.getConsumer() != null ? 1 : 0) != 0, (String)"'consumer' must not be null");
            this.ackInfo.getConsumer().seek(this.ackInfo.getTopicPartition(), record.offset());
            Set<KafkaAckInfo<K, V>> inflight = this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition());
            Assert.state((inflight != null ? 1 : 0) != 0, (String)"'infligt' must not be null");
            Set<KafkaAckInfo<K, V>> set = inflight;
            synchronized (set) {
                List<Long> rewound;
                if (inflight.size() > 1 && !(rewound = inflight.stream().filter(i -> i.getRecord().offset() > record.offset()).map(i -> {
                    i.setRolledBack(true);
                    return i.getRecord().offset();
                }).toList()).isEmpty()) {
                    this.logger.warn(() -> "Rolled back " + KafkaUtils.format((ConsumerRecord)record) + " later in-flight offsets " + String.valueOf(rewound) + " will also be re-fetched");
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void commitIfPossible(ConsumerRecord<K, V> record) {
            if (this.ackInfo.isRolledBack()) {
                this.logger.warn(() -> "Cannot commit offset for " + KafkaUtils.format((ConsumerRecord)record) + "; an earlier offset was rolled back");
            } else {
                Set<KafkaAckInfo<K, V>> candidates = this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition());
                Assert.state((candidates != null ? 1 : 0) != 0, (String)"'candidates' must not be null");
                KafkaAckInfo ackInformation = null;
                Set<KafkaAckInfo<K, V>> set = candidates;
                synchronized (set) {
                    if (candidates.iterator().next().equals(this.ackInfo)) {
                        ArrayList<KafkaAckInfo<K, V>> toCommit = new ArrayList<KafkaAckInfo<K, V>>();
                        for (KafkaAckInfo<K, V> info : candidates) {
                            if (this.ackInfo.equals(info)) continue;
                            if (!info.isAckDeferred()) break;
                            toCommit.add(info);
                        }
                        if (!toCommit.isEmpty()) {
                            KafkaAckInfo ackInformationToLog = ackInformation = (KafkaAckInfo)toCommit.get(toCommit.size() - 1);
                            this.commitLogger.log(() -> "Committing pending offsets for " + KafkaUtils.format((ConsumerRecord)record) + " and all deferred to " + KafkaUtils.format(ackInformationToLog.getRecord()));
                            toCommit.forEach(candidates::remove);
                        } else {
                            ackInformation = this.ackInfo;
                            this.commitLogger.log(() -> "Committing offset for " + KafkaUtils.format((ConsumerRecord)record));
                        }
                    } else {
                        this.ackInfo.setAckDeferred(true);
                    }
                    if (ackInformation != null) {
                        Map<TopicPartition, OffsetAndMetadata> offset = Collections.singletonMap(ackInformation.getTopicPartition(), new OffsetAndMetadata(ackInformation.getRecord().offset() + 1L));
                        Assert.state((ackInformation.getConsumer() != null ? 1 : 0) != 0, (String)"'consumer' must not be null");
                        if (this.isSyncCommits) {
                            if (this.commitTimeout == null) {
                                ackInformation.getConsumer().commitSync(offset);
                            } else {
                                ackInformation.getConsumer().commitSync(offset, this.commitTimeout);
                            }
                        } else {
                            ackInformation.getConsumer().commitAsync(offset, this.commitCallback);
                        }
                    } else {
                        this.logger.debug((CharSequence)"Deferring commit offset; earlier messages are in flight.");
                    }
                }
            }
        }

        public boolean isAcknowledged() {
            return this.acknowledged;
        }

        public void acknowledge() {
            this.acknowledge(AcknowledgmentCallback.Status.ACCEPT);
        }

        public void noAutoAck() {
            this.autoAckEnabled = false;
        }

        public boolean isAutoAck() {
            return this.autoAckEnabled;
        }
    }
}

