/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.inbound;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.springframework.core.log.LogAccessor;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.acks.AcknowledgmentCallbackFactory;
import org.springframework.integration.core.Pausable;
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.kafka.listener.ListenerUtils;
import org.springframework.kafka.listener.LoggingCommitCallback;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.LogIfLevelEnabled;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

public class KafkaMessageSource<K, V>
extends AbstractMessageSource<Object>
implements Pausable {
    private static final long MIN_ASSIGN_TIMEOUT = 2000L;
    private static final int DEFAULT_CLOSE_TIMEOUT = 30;
    public static final String REMAINING_RECORDS = "kafka_remainingRecords";
    private final ConsumerFactory<K, V> consumerFactory;
    private final KafkaAckCallbackFactory<K, V> ackCallbackFactory;
    private final Object consumerMonitor = new Object();
    private final Map<TopicPartition, Set<KafkaAckInfo<K, V>>> inflightRecords = new ConcurrentHashMap<TopicPartition, Set<KafkaAckInfo<K, V>>>();
    private final AtomicInteger remainingCount = new AtomicInteger();
    private final ConsumerProperties consumerProperties;
    private final Collection<TopicPartition> assignedPartitions = new LinkedHashSet<TopicPartition>();
    private final Duration commitTimeout;
    private final Duration assignTimeout;
    private final Duration pollTimeout;
    private RecordMessageConverter messageConverter = new MessagingMessageConverter();
    private Class<?> payloadType;
    private boolean rawMessageHeader;
    private boolean running;
    private Duration closeTimeout = Duration.ofSeconds(30L);
    private volatile Consumer<K, V> consumer;
    private volatile boolean pausing;
    private volatile boolean paused;
    private volatile Iterator<ConsumerRecord<K, V>> recordsIterator;

    public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory, ConsumerProperties consumerProperties) {
        this(consumerFactory, consumerProperties, new KafkaAckCallbackFactory(consumerProperties), false);
    }

    public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory, ConsumerProperties consumerProperties, boolean allowMultiFetch) {
        this(consumerFactory, consumerProperties, new KafkaAckCallbackFactory(consumerProperties), allowMultiFetch);
    }

    public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory, ConsumerProperties consumerProperties, KafkaAckCallbackFactory<K, V> ackCallbackFactory) {
        this(consumerFactory, consumerProperties, ackCallbackFactory, false);
    }

    public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory, ConsumerProperties consumerProperties, KafkaAckCallbackFactory<K, V> ackCallbackFactory, boolean allowMultiFetch) {
        Assert.notNull(consumerFactory, (String)"'consumerFactory' must not be null");
        Assert.notNull(ackCallbackFactory, (String)"'ackCallbackFactory' must not be null");
        Assert.isTrue((!ObjectUtils.isEmpty((Object[])consumerProperties.getTopics()) || !ObjectUtils.isEmpty((Object[])consumerProperties.getTopicPartitions()) || consumerProperties.getTopicPattern() != null ? 1 : 0) != 0, (String)"topics, topicPattern, or topicPartitions must be provided");
        this.consumerProperties = consumerProperties;
        this.consumerFactory = this.fixOrRejectConsumerFactory(consumerFactory, allowMultiFetch);
        this.ackCallbackFactory = ackCallbackFactory;
        this.pollTimeout = Duration.ofMillis(consumerProperties.getPollTimeout());
        this.assignTimeout = Duration.ofMillis(Math.max(this.pollTimeout.toMillis() * 20L, 2000L));
        this.commitTimeout = consumerProperties.getSyncCommitTimeout();
    }

    public Collection<TopicPartition> getAssignedPartitions() {
        return Collections.unmodifiableCollection(this.assignedPartitions);
    }

    protected void onInit() {
        if (!StringUtils.hasText((String)this.consumerProperties.getClientId())) {
            this.consumerProperties.setClientId(this.getComponentName());
        }
    }

    public ConsumerProperties getConsumerProperties() {
        return this.consumerProperties;
    }

    protected String getGroupId() {
        return this.consumerProperties.getGroupId();
    }

    protected String getClientId() {
        return this.consumerProperties.getClientId();
    }

    protected long getPollTimeout() {
        return this.pollTimeout.toMillis();
    }

    protected RecordMessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setMessageConverter(RecordMessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    protected Class<?> getPayloadType() {
        return this.payloadType;
    }

    public void setPayloadType(Class<?> payloadType) {
        this.payloadType = payloadType;
    }

    protected ConsumerRebalanceListener getRebalanceListener() {
        return this.consumerProperties.getConsumerRebalanceListener();
    }

    public String getComponentType() {
        return "kafka:message-source";
    }

    protected boolean isRawMessageHeader() {
        return this.rawMessageHeader;
    }

    public void setRawMessageHeader(boolean rawMessageHeader) {
        this.rawMessageHeader = rawMessageHeader;
    }

    protected Duration getCommitTimeout() {
        return this.commitTimeout;
    }

    public void setCloseTimeout(Duration closeTimeout) {
        Assert.notNull((Object)closeTimeout, (String)"'closeTimeout' cannot be null");
        this.closeTimeout = closeTimeout;
    }

    private ConsumerFactory<K, V> fixOrRejectConsumerFactory(ConsumerFactory<K, V> suppliedConsumerFactory, boolean allowMultiFetch) {
        Object maxPoll = suppliedConsumerFactory.getConfigurationProperties().get("max.poll.records");
        if (!allowMultiFetch && (maxPoll == null || this.maxPollGtrOne(maxPoll))) {
            if (!suppliedConsumerFactory.getClass().getName().equals(DefaultKafkaConsumerFactory.class.getName())) {
                throw new IllegalArgumentException("Custom consumer factory is not configured with 'max.poll.records = 1'");
            }
            if (this.logger.isWarnEnabled()) {
                this.logger.warn((Object)("'max.poll.records' has been forced from " + (maxPoll == null ? "unspecified" : maxPoll) + " to 1, to avoid having to seek after each record"));
            }
            HashMap<String, Integer> configs = new HashMap<String, Integer>(suppliedConsumerFactory.getConfigurationProperties());
            configs.put("max.poll.records", 1);
            DefaultKafkaConsumerFactory fixedConsumerFactory = new DefaultKafkaConsumerFactory(configs);
            if (suppliedConsumerFactory.getKeyDeserializer() != null) {
                fixedConsumerFactory.setKeyDeserializer(suppliedConsumerFactory.getKeyDeserializer());
            }
            if (suppliedConsumerFactory.getValueDeserializer() != null) {
                fixedConsumerFactory.setValueDeserializer(suppliedConsumerFactory.getValueDeserializer());
            }
            return fixedConsumerFactory;
        }
        return suppliedConsumerFactory;
    }

    private boolean maxPollGtrOne(Object maxPoll) {
        return this.maxPollNumberGtrOne(maxPoll) || this.maxPollStringGtr1(maxPoll);
    }

    private boolean maxPollNumberGtrOne(Object maxPoll) {
        return maxPoll instanceof Number && ((Number)maxPoll).intValue() != 1;
    }

    private boolean maxPollStringGtr1(Object maxPoll) {
        return maxPoll instanceof String && Integer.parseInt((String)maxPoll) != 1;
    }

    public synchronized boolean isRunning() {
        return this.running;
    }

    public synchronized void start() {
        this.running = true;
    }

    public synchronized void stop() {
        this.stopConsumer();
        this.running = false;
    }

    public synchronized void pause() {
        this.pausing = true;
    }

    public synchronized void resume() {
        this.pausing = false;
    }

    public boolean isPaused() {
        return this.paused;
    }

    protected synchronized Object doReceive() {
        ConsumerRecord<K, V> record;
        if (this.consumer == null) {
            this.createConsumer();
            this.running = true;
        }
        if (this.pausing && !this.paused && this.assignedPartitions.size() > 0) {
            this.consumer.pause(this.assignedPartitions);
            this.paused = true;
        } else if (this.paused && !this.pausing) {
            this.consumer.resume(this.assignedPartitions);
            this.paused = false;
        }
        if (this.paused && this.recordsIterator == null) {
            this.logger.debug((Object)"Consumer is paused; no records will be returned");
        }
        return (record = this.pollRecord()) != null ? this.recordToMessage(record) : null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void createConsumer() {
        Object object = this.consumerMonitor;
        synchronized (object) {
            this.consumer = this.consumerFactory.createConsumer(this.consumerProperties.getGroupId(), this.consumerProperties.getClientId(), null, this.consumerProperties.getKafkaConsumerProperties());
            ItegrationConsumerRebalanceListener rebalanceCallback = new ItegrationConsumerRebalanceListener(this.consumerProperties.getConsumerRebalanceListener());
            Pattern topicPattern = this.consumerProperties.getTopicPattern();
            TopicPartitionOffset[] partitions = this.consumerProperties.getTopicPartitions();
            if (topicPattern != null) {
                this.consumer.subscribe(topicPattern, (ConsumerRebalanceListener)rebalanceCallback);
            } else if (partitions != null) {
                this.assignAndSeekPartitionts(partitions);
            } else {
                this.consumer.subscribe(Arrays.asList(this.consumerProperties.getTopics()), (ConsumerRebalanceListener)rebalanceCallback);
            }
        }
    }

    private void assignAndSeekPartitionts(TopicPartitionOffset[] partitions) {
        List topicPartitionsToAssign = Arrays.stream(partitions).map(TopicPartitionOffset::getTopicPartition).collect(Collectors.toList());
        this.consumer.assign(topicPartitionsToAssign);
        this.assignedPartitions.addAll(topicPartitionsToAssign);
        for (TopicPartitionOffset partition : partitions) {
            if (TopicPartitionOffset.SeekPosition.BEGINNING.equals((Object)partition.getPosition())) {
                this.consumer.seekToBeginning(Collections.singleton(partition.getTopicPartition()));
                continue;
            }
            if (TopicPartitionOffset.SeekPosition.END.equals((Object)partition.getPosition())) {
                this.consumer.seekToEnd(Collections.singleton(partition.getTopicPartition()));
                continue;
            }
            TopicPartition topicPartition = partition.getTopicPartition();
            Long offset = partition.getOffset();
            if (offset == null) continue;
            long newOffset = offset;
            if (offset < 0L) {
                if (!partition.isRelativeToCurrent()) {
                    this.consumer.seekToEnd(Collections.singleton(topicPartition));
                    continue;
                }
                newOffset = Math.max(0L, this.consumer.position(topicPartition) + offset);
            } else if (partition.isRelativeToCurrent()) {
                newOffset = this.consumer.position(topicPartition) + offset;
            }
            try {
                this.consumer.seek(topicPartition, newOffset);
            }
            catch (Exception e) {
                this.logger.error((Object)("Failed to set initial offset for " + topicPartition + " at " + newOffset + ". Position is " + this.consumer.position(topicPartition)), (Throwable)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    private ConsumerRecord<K, V> pollRecord() {
        if (this.recordsIterator != null) {
            return this.nextRecord();
        }
        Object object = this.consumerMonitor;
        synchronized (object) {
            ConsumerRecords records = this.consumer.poll(this.assignedPartitions.isEmpty() ? this.assignTimeout : this.pollTimeout);
            if (records == null || records.count() == 0) {
                return null;
            }
            this.remainingCount.set(records.count());
            this.recordsIterator = records.iterator();
            return this.nextRecord();
        }
    }

    private ConsumerRecord<K, V> nextRecord() {
        ConsumerRecord<K, V> record = this.recordsIterator.next();
        if (!this.recordsIterator.hasNext()) {
            this.recordsIterator = null;
        }
        this.remainingCount.decrementAndGet();
        return record;
    }

    private Object recordToMessage(ConsumerRecord<K, V> record) {
        TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
        KafkaAckInfoImpl ackInfo = new KafkaAckInfoImpl(record, topicPartition);
        AcknowledgmentCallback ackCallback = this.ackCallbackFactory.createCallback(ackInfo);
        this.inflightRecords.computeIfAbsent(topicPartition, tp -> Collections.synchronizedSet(new TreeSet())).add(ackInfo);
        Message message = this.messageConverter.toMessage(record, ackCallback instanceof Acknowledgment ? (Acknowledgment)ackCallback : null, this.consumer, this.payloadType);
        if (message.getHeaders() instanceof KafkaMessageHeaders) {
            Map rawHeaders = ((KafkaMessageHeaders)message.getHeaders()).getRawHeaders();
            rawHeaders.put("acknowledgmentCallback", ackCallback);
            rawHeaders.put(REMAINING_RECORDS, this.remainingCount.get());
            if (this.rawMessageHeader) {
                rawHeaders.put("kafka_data", record);
                rawHeaders.put("sourceData", record);
            }
            return message;
        }
        AbstractIntegrationMessageBuilder builder = this.getMessageBuilderFactory().fromMessage(message).setHeader("acknowledgmentCallback", (Object)ackCallback).setHeader(REMAINING_RECORDS, (Object)this.remainingCount.get());
        if (this.rawMessageHeader) {
            builder.setHeader("kafka_data", record);
            builder.setHeader("sourceData", record);
        }
        return builder;
    }

    public synchronized void destroy() {
        this.stopConsumer();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stopConsumer() {
        Object object = this.consumerMonitor;
        synchronized (object) {
            if (this.consumer != null) {
                this.consumer.close(this.closeTimeout);
                this.consumer = null;
                this.assignedPartitions.clear();
            }
        }
    }

    public static interface KafkaAckInfo<K, V>
    extends Comparable<KafkaAckInfo<K, V>> {
        public Object getConsumerMonitor();

        public String getGroupId();

        public Consumer<K, V> getConsumer();

        public ConsumerRecord<K, V> getRecord();

        public TopicPartition getTopicPartition();

        public Map<TopicPartition, Set<KafkaAckInfo<K, V>>> getOffsets();

        public boolean isRolledBack();

        public void setRolledBack(boolean var1);

        public boolean isAckDeferred();

        public void setAckDeferred(boolean var1);
    }

    public class KafkaAckInfoImpl
    implements KafkaAckInfo<K, V> {
        private final ConsumerRecord<K, V> record;
        private final TopicPartition topicPartition;
        private volatile boolean rolledBack;
        private volatile boolean ackDeferred;

        KafkaAckInfoImpl(ConsumerRecord<K, V> record, TopicPartition topicPartition) {
            this.record = record;
            this.topicPartition = topicPartition;
        }

        @Override
        public Object getConsumerMonitor() {
            return KafkaMessageSource.this.consumerMonitor;
        }

        @Override
        public String getGroupId() {
            return KafkaMessageSource.this.getGroupId();
        }

        @Override
        public Consumer<K, V> getConsumer() {
            return KafkaMessageSource.this.consumer;
        }

        @Override
        public ConsumerRecord<K, V> getRecord() {
            return this.record;
        }

        @Override
        public TopicPartition getTopicPartition() {
            return this.topicPartition;
        }

        @Override
        public Map<TopicPartition, Set<KafkaAckInfo<K, V>>> getOffsets() {
            return KafkaMessageSource.this.inflightRecords;
        }

        @Override
        public boolean isRolledBack() {
            return this.rolledBack;
        }

        @Override
        public void setRolledBack(boolean rolledBack) {
            this.rolledBack = rolledBack;
        }

        @Override
        public boolean isAckDeferred() {
            return this.ackDeferred;
        }

        @Override
        public void setAckDeferred(boolean ackDeferred) {
            this.ackDeferred = ackDeferred;
        }

        @Override
        public int compareTo(KafkaAckInfo<K, V> other) {
            return Long.compare(this.record.offset(), other.getRecord().offset());
        }

        public String toString() {
            return "KafkaAckInfo [record=" + this.record + ", rolledBack=" + this.rolledBack + ", ackDeferred=" + this.ackDeferred + "]";
        }
    }

    public static class KafkaAckCallback<K, V>
    implements AcknowledgmentCallback,
    Acknowledgment {
        private final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
        private final LogIfLevelEnabled commitLogger;
        private final KafkaAckInfo<K, V> ackInfo;
        private final Duration commitTimeout;
        private final OffsetCommitCallback commitCallback;
        private final boolean isSyncCommits;
        private final boolean logOnlyMetadata;
        private volatile boolean acknowledged;
        private boolean autoAckEnabled = true;

        public KafkaAckCallback(KafkaAckInfo<K, V> ackInfo, @Nullable ConsumerProperties consumerProperties) {
            Assert.notNull(ackInfo, (String)"'ackInfo' cannot be null");
            this.ackInfo = ackInfo;
            this.commitTimeout = consumerProperties != null ? consumerProperties.getSyncCommitTimeout() : null;
            this.isSyncCommits = consumerProperties == null || consumerProperties.isSyncCommits();
            this.commitCallback = consumerProperties != null && consumerProperties.getCommitCallback() != null ? consumerProperties.getCommitCallback() : new LoggingCommitCallback();
            this.commitLogger = new LogIfLevelEnabled(this.logger, consumerProperties != null ? consumerProperties.getCommitLogLevel() : LogIfLevelEnabled.Level.DEBUG);
            this.logOnlyMetadata = consumerProperties != null && consumerProperties.isOnlyLogRecordMetadata();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         * Converted monitor instructions to comments
         * Lifted jumps to return sites
         */
        public void acknowledge(AcknowledgmentCallback.Status status) {
            Assert.notNull((Object)status, (String)"'status' cannot be null");
            if (this.acknowledged) {
                throw new IllegalStateException("Already acknowledged");
            }
            Object object = this.ackInfo.getConsumerMonitor();
            // MONITORENTER : object
            try {
                ConsumerRecord<K, V> record = this.ackInfo.getRecord();
                switch (status) {
                    case ACCEPT: 
                    case REJECT: {
                        this.commitIfPossible(record);
                        return;
                    }
                    case REQUEUE: {
                        this.rollback(record);
                        return;
                    }
                }
                return;
            }
            catch (WakeupException e) {
                throw new IllegalStateException(e);
            }
            finally {
                this.acknowledged = true;
                if (!this.ackInfo.isAckDeferred()) {
                    this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition()).remove(this.ackInfo);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void rollback(ConsumerRecord<K, V> record) {
            Set<KafkaAckInfo<K, V>> inflight;
            this.ackInfo.getConsumer().seek(this.ackInfo.getTopicPartition(), record.offset());
            Set<KafkaAckInfo<K, V>> set = inflight = this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition());
            synchronized (set) {
                List rewound;
                if (inflight.size() > 1 && (rewound = inflight.stream().filter(i -> i.getRecord().offset() > record.offset()).map(i -> {
                    i.setRolledBack(true);
                    return i.getRecord().offset();
                }).collect(Collectors.toList())).size() > 0 && this.logger.isWarnEnabled()) {
                    this.logger.warn((CharSequence)("Rolled back " + ListenerUtils.recordToString(record, (boolean)this.logOnlyMetadata) + " later in-flight offsets " + rewound + " will also be re-fetched"));
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void commitIfPossible(ConsumerRecord<K, V> record) {
            if (this.ackInfo.isRolledBack()) {
                if (this.logger.isWarnEnabled()) {
                    this.logger.warn((CharSequence)("Cannot commit offset for " + ListenerUtils.recordToString(record, (boolean)this.logOnlyMetadata) + "; an earlier offset was rolled back"));
                }
            } else {
                Set<KafkaAckInfo<K, V>> candidates = this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition());
                KafkaAckInfo ackInformation = null;
                Set<KafkaAckInfo<K, V>> set = candidates;
                synchronized (set) {
                    if (candidates.iterator().next().equals(this.ackInfo)) {
                        ArrayList<KafkaAckInfo<K, V>> toCommit = new ArrayList<KafkaAckInfo<K, V>>();
                        for (KafkaAckInfo<K, V> info : candidates) {
                            if (info == this.ackInfo) continue;
                            if (!info.isAckDeferred()) break;
                            toCommit.add(info);
                        }
                        if (toCommit.size() > 0) {
                            KafkaAckInfo ackInformationToLog = ackInformation = (KafkaAckInfo)toCommit.get(toCommit.size() - 1);
                            this.commitLogger.log(() -> "Committing pending offsets for " + ListenerUtils.recordToString((ConsumerRecord)record, (boolean)this.logOnlyMetadata) + " and all deferred to " + ListenerUtils.recordToString(ackInformationToLog.getRecord(), (boolean)this.logOnlyMetadata));
                            candidates.removeAll(toCommit);
                        } else {
                            ackInformation = this.ackInfo;
                            this.commitLogger.log(() -> "Committing offset for " + ListenerUtils.recordToString((ConsumerRecord)record, (boolean)this.logOnlyMetadata));
                        }
                    } else {
                        this.ackInfo.setAckDeferred(true);
                    }
                    if (ackInformation != null) {
                        Map<TopicPartition, OffsetAndMetadata> offset = Collections.singletonMap(ackInformation.getTopicPartition(), new OffsetAndMetadata(ackInformation.getRecord().offset() + 1L));
                        if (this.isSyncCommits) {
                            if (this.commitTimeout == null) {
                                ackInformation.getConsumer().commitSync(offset);
                            } else {
                                ackInformation.getConsumer().commitSync(offset, this.commitTimeout);
                            }
                        } else {
                            ackInformation.getConsumer().commitAsync(offset, this.commitCallback);
                        }
                    } else if (this.logger.isDebugEnabled()) {
                        this.logger.debug((CharSequence)"Deferring commit offset; earlier messages are in flight.");
                    }
                }
            }
        }

        public boolean isAcknowledged() {
            return this.acknowledged;
        }

        public void acknowledge() {
            this.acknowledge(AcknowledgmentCallback.Status.ACCEPT);
        }

        public void noAutoAck() {
            this.autoAckEnabled = false;
        }

        public boolean isAutoAck() {
            return this.autoAckEnabled;
        }
    }

    public static class KafkaAckCallbackFactory<K, V>
    implements AcknowledgmentCallbackFactory<KafkaAckInfo<K, V>> {
        private final ConsumerProperties consumerProperties;

        public KafkaAckCallbackFactory(ConsumerProperties consumerProperties) {
            this.consumerProperties = consumerProperties;
        }

        public AcknowledgmentCallback createCallback(KafkaAckInfo<K, V> info) {
            return new KafkaAckCallback<K, V>(info, this.consumerProperties);
        }
    }

    private class ItegrationConsumerRebalanceListener
    implements ConsumerRebalanceListener {
        private final ConsumerRebalanceListener providedRebalanceListener;
        private final boolean isConsumerAware;

        ItegrationConsumerRebalanceListener(ConsumerRebalanceListener providedRebalanceListener) {
            this.providedRebalanceListener = providedRebalanceListener;
            this.isConsumerAware = providedRebalanceListener instanceof ConsumerAwareRebalanceListener;
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            KafkaMessageSource.this.assignedPartitions.removeAll(partitions);
            if (KafkaMessageSource.this.logger.isInfoEnabled()) {
                KafkaMessageSource.this.logger.info((Object)("Partitions revoked: " + partitions));
            }
            if (this.providedRebalanceListener != null) {
                if (this.isConsumerAware) {
                    ((ConsumerAwareRebalanceListener)this.providedRebalanceListener).onPartitionsRevokedAfterCommit(KafkaMessageSource.this.consumer, partitions);
                } else {
                    this.providedRebalanceListener.onPartitionsRevoked(partitions);
                }
            }
        }

        public void onPartitionsLost(Collection<TopicPartition> partitions) {
            if (this.providedRebalanceListener != null) {
                this.providedRebalanceListener.onPartitionsLost(partitions);
            }
            this.onPartitionsRevoked(partitions);
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            KafkaMessageSource.this.assignedPartitions.addAll(partitions);
            if (KafkaMessageSource.this.paused) {
                KafkaMessageSource.this.consumer.pause(KafkaMessageSource.this.assignedPartitions);
                KafkaMessageSource.this.logger.warn((Object)"Paused consumer resumed by Kafka due to rebalance; consumer paused again, so the initial poll() will never return any records");
            }
            if (KafkaMessageSource.this.logger.isInfoEnabled()) {
                KafkaMessageSource.this.logger.info((Object)("Partitions assigned: " + partitions));
            }
            if (this.providedRebalanceListener != null) {
                if (this.isConsumerAware) {
                    ((ConsumerAwareRebalanceListener)this.providedRebalanceListener).onPartitionsAssigned(KafkaMessageSource.this.consumer, partitions);
                } else {
                    this.providedRebalanceListener.onPartitionsAssigned(partitions);
                }
            }
        }
    }
}

