/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.admin.AdminUtils;
import kafka.api.OffsetRequest;
import kafka.common.ErrorMapping;
import kafka.serializer.Decoder;
import kafka.serializer.DefaultDecoder;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.cloud.stream.binder.AbstractBinder;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.DefaultBinding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.MessageValues;
import org.springframework.cloud.stream.binder.PartitionHandler;
import org.springframework.cloud.stream.binder.kafka.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.WindowingOffsetManager;
import org.springframework.integration.channel.FixedSubscriberChannel;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.kafka.core.Configuration;
import org.springframework.integration.kafka.core.ConnectionFactory;
import org.springframework.integration.kafka.core.DefaultConnectionFactory;
import org.springframework.integration.kafka.core.KafkaMessage;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.integration.kafka.core.ZookeeperConfiguration;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.listener.AcknowledgingMessageListener;
import org.springframework.integration.kafka.listener.Acknowledgment;
import org.springframework.integration.kafka.listener.ErrorHandler;
import org.springframework.integration.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.integration.kafka.listener.KafkaNativeOffsetManager;
import org.springframework.integration.kafka.listener.MessageListener;
import org.springframework.integration.kafka.listener.OffsetManager;
import org.springframework.integration.kafka.support.ProducerConfiguration;
import org.springframework.integration.kafka.support.ProducerFactoryBean;
import org.springframework.integration.kafka.support.ProducerListener;
import org.springframework.integration.kafka.support.ProducerMetadata;
import org.springframework.integration.kafka.support.ZookeeperConnect;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import scala.collection.Map;
import scala.collection.Seq;

public class KafkaMessageChannelBinder
extends AbstractBinder<MessageChannel, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>>
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties>,
DisposableBean {
    public static final ByteArraySerializer BYTE_ARRAY_SERIALIZER = new ByteArraySerializer();
    public static final DaemonThreadFactory DAEMON_THREAD_FACTORY = new DaemonThreadFactory();
    private boolean autoCreateTopics = true;
    private boolean autoAddPartitions = false;
    private RetryOperations metadataRetryOperations;
    private final java.util.Map<String, Collection<Partition>> topicsInUse = new HashMap<String, Collection<Partition>>();
    private final ZookeeperConnect zookeeperConnect;
    private final String brokers;
    private String[] headersToMap;
    private final String zkAddress;
    private int replicationFactor = 1;
    private int requiredAcks = 1;
    private int queueSize = 1024;
    private int maxWait = 100;
    private int fetchSize = 0x100000;
    private int minPartitionCount = 1;
    private ConnectionFactory connectionFactory;
    private int socketBufferSize = 0x200000;
    private int offsetUpdateTimeWindow = 10000;
    private int offsetUpdateCount = 0;
    private int offsetUpdateShutdownTimeout = 2000;
    private int zkSessionTimeout = 10000;
    private int zkConnectionTimeout = 10000;
    private ProducerListener producerListener;
    private volatile Producer<byte[], byte[]> dlqProducer;
    private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();

    public KafkaMessageChannelBinder(ZookeeperConnect zookeeperConnect, String brokers, String zkAddress, String ... headersToMap) {
        this.zookeeperConnect = zookeeperConnect;
        this.brokers = brokers;
        this.zkAddress = zkAddress;
        if (ObjectUtils.isEmpty((Object[])headersToMap)) {
            this.headersToMap = BinderHeaders.STANDARD_HEADERS;
        } else {
            String[] combinedHeadersToMap = Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0, BinderHeaders.STANDARD_HEADERS.length + headersToMap.length);
            System.arraycopy(headersToMap, 0, combinedHeadersToMap, BinderHeaders.STANDARD_HEADERS.length, headersToMap.length);
            this.headersToMap = combinedHeadersToMap;
        }
    }

    String getZkAddress() {
        return this.zkAddress;
    }

    public void setSocketBufferSize(int socketBufferSize) {
        this.socketBufferSize = socketBufferSize;
    }

    public void setOffsetUpdateTimeWindow(int offsetUpdateTimeWindow) {
        this.offsetUpdateTimeWindow = offsetUpdateTimeWindow;
    }

    public void setOffsetUpdateCount(int offsetUpdateCount) {
        this.offsetUpdateCount = offsetUpdateCount;
    }

    public void setOffsetUpdateShutdownTimeout(int offsetUpdateShutdownTimeout) {
        this.offsetUpdateShutdownTimeout = offsetUpdateShutdownTimeout;
    }

    public ConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    public void setProducerListener(ProducerListener producerListener) {
        this.producerListener = producerListener;
    }

    public void setMetadataRetryOperations(RetryOperations metadataRetryOperations) {
        this.metadataRetryOperations = metadataRetryOperations;
    }

    public void setExtendedBindingProperties(KafkaExtendedBindingProperties extendedBindingProperties) {
        this.extendedBindingProperties = extendedBindingProperties;
    }

    public void onInit() throws Exception {
        ZookeeperConfiguration configuration = new ZookeeperConfiguration(this.zookeeperConnect);
        configuration.setBufferSize(this.socketBufferSize);
        configuration.setMaxWait(this.maxWait);
        DefaultConnectionFactory defaultConnectionFactory = new DefaultConnectionFactory((Configuration)configuration);
        defaultConnectionFactory.afterPropertiesSet();
        this.connectionFactory = defaultConnectionFactory;
        if (this.metadataRetryOperations == null) {
            RetryTemplate retryTemplate = new RetryTemplate();
            SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
            simpleRetryPolicy.setMaxAttempts(10);
            retryTemplate.setRetryPolicy((RetryPolicy)simpleRetryPolicy);
            ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
            backOffPolicy.setInitialInterval(100L);
            backOffPolicy.setMultiplier(2.0);
            backOffPolicy.setMaxInterval(1000L);
            retryTemplate.setBackOffPolicy((BackOffPolicy)backOffPolicy);
            this.metadataRetryOperations = retryTemplate;
        }
    }

    public void destroy() throws Exception {
        if (this.dlqProducer != null) {
            this.dlqProducer.close();
            this.dlqProducer = null;
        }
    }

    public static void validateTopicName(String topicName) {
        try {
            byte[] utf8;
            for (byte b : utf8 = topicName.getBytes("UTF-8")) {
                if (b >= 97 && b <= 122 || b >= 65 && b <= 90 || b >= 48 && b <= 57 || b == 46 || b == 45 || b == 95) continue;
                throw new IllegalArgumentException("Topic name can only have ASCII alphanumerics, '.', '_' and '-'");
            }
        }
        catch (UnsupportedEncodingException e) {
            throw new AssertionError((Object)e);
        }
    }

    public void setReplicationFactor(int replicationFactor) {
        this.replicationFactor = replicationFactor;
    }

    public void setRequiredAcks(int requiredAcks) {
        this.requiredAcks = requiredAcks;
    }

    public void setQueueSize(int queueSize) {
        this.queueSize = queueSize;
    }

    public void setFetchSize(int fetchSize) {
        this.fetchSize = fetchSize;
    }

    public void setMinPartitionCount(int minPartitionCount) {
        this.minPartitionCount = minPartitionCount;
    }

    public void setMaxWait(int maxWait) {
        this.maxWait = maxWait;
    }

    public int getZkSessionTimeout() {
        return this.zkSessionTimeout;
    }

    public void setZkSessionTimeout(int zkSessionTimeout) {
        this.zkSessionTimeout = zkSessionTimeout;
    }

    public int getZkConnectionTimeout() {
        return this.zkConnectionTimeout;
    }

    public void setZkConnectionTimeout(int zkConnectionTimeout) {
        this.zkConnectionTimeout = zkConnectionTimeout;
    }

    public boolean isAutoCreateTopics() {
        return this.autoCreateTopics;
    }

    public void setAutoCreateTopics(boolean autoCreateTopics) {
        this.autoCreateTopics = autoCreateTopics;
    }

    public boolean isAutoAddPartitions() {
        return this.autoAddPartitions;
    }

    public void setAutoAddPartitions(boolean autoAddPartitions) {
        this.autoAddPartitions = autoAddPartitions;
    }

    public KafkaConsumerProperties getExtendedConsumerProperties(String channelName) {
        return this.extendedBindingProperties.getExtendedConsumerProperties(channelName);
    }

    public KafkaProducerProperties getExtendedProducerProperties(String channelName) {
        return this.extendedBindingProperties.getExtendedProducerProperties(channelName);
    }

    java.util.Map<String, Collection<Partition>> getTopicsInUse() {
        return this.topicsInUse;
    }

    protected Binding<MessageChannel> doBindConsumer(String name, String group, MessageChannel inputChannel, ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
        String consumerGroup;
        boolean anonymous = !StringUtils.hasText((String)group);
        Assert.isTrue((!anonymous || !((KafkaConsumerProperties)properties.getExtension()).isEnableDlq() ? 1 : 0) != 0, (String)"DLQ support is not available for anonymous subscriptions");
        String string = consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
        long referencePoint = ((KafkaConsumerProperties)properties.getExtension()).getStartOffset() != null ? ((KafkaConsumerProperties)properties.getExtension()).getStartOffset().getReferencePoint() : (anonymous ? OffsetRequest.LatestTime() : OffsetRequest.EarliestTime());
        return this.createKafkaConsumer(name, inputChannel, properties, consumerGroup, referencePoint);
    }

    public Binding<MessageChannel> doBindProducer(String name, MessageChannel moduleOutputChannel, ExtendedProducerProperties<KafkaProducerProperties> properties) {
        Assert.isInstanceOf(SubscribableChannel.class, (Object)moduleOutputChannel);
        if (this.logger.isInfoEnabled()) {
            this.logger.info((Object)("Using kafka topic for outbound: " + name));
        }
        KafkaMessageChannelBinder.validateTopicName(name);
        Collection<Partition> partitions = this.ensureTopicCreated(name, properties.getPartitionCount());
        if (properties.getPartitionCount() < partitions.size() && this.logger.isInfoEnabled()) {
            this.logger.info((Object)("The `partitionCount` of the producer for topic " + name + " is " + properties.getPartitionCount() + ", smaller than the actual partition count of " + partitions.size() + " of the topic. The larger number will be used instead."));
        }
        this.topicsInUse.put(name, partitions);
        ProducerMetadata producerMetadata = new ProducerMetadata(name, byte[].class, byte[].class, (Serializer)BYTE_ARRAY_SERIALIZER, (Serializer)BYTE_ARRAY_SERIALIZER);
        producerMetadata.setSync(((KafkaProducerProperties)properties.getExtension()).isSync());
        producerMetadata.setCompressionType(((KafkaProducerProperties)properties.getExtension()).getCompressionType());
        producerMetadata.setBatchBytes(((KafkaProducerProperties)properties.getExtension()).getBufferSize());
        Properties additionalProps = new Properties();
        additionalProps.put("acks", String.valueOf(this.requiredAcks));
        additionalProps.put("linger.ms", String.valueOf(((KafkaProducerProperties)properties.getExtension()).getBatchTimeout()));
        ProducerFactoryBean producerFB = new ProducerFactoryBean(producerMetadata, this.brokers, additionalProps);
        try {
            final ProducerConfiguration producerConfiguration = new ProducerConfiguration(producerMetadata, producerFB.getObject());
            producerConfiguration.setProducerListener(this.producerListener);
            SendingHandler handler = new SendingHandler(name, properties, partitions.size(), producerConfiguration);
            EventDrivenConsumer consumer = new EventDrivenConsumer((SubscribableChannel)moduleOutputChannel, (MessageHandler)handler){

                protected void doStop() {
                    super.doStop();
                    producerConfiguration.stop();
                }
            };
            consumer.setBeanFactory((BeanFactory)this.getBeanFactory());
            consumer.setBeanName("outbound." + name);
            consumer.afterPropertiesSet();
            DefaultBinding producerBinding = new DefaultBinding(name, null, (Object)moduleOutputChannel, (AbstractEndpoint)consumer);
            consumer.start();
            return producerBinding;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /*
     * Unable to fully structure code
     */
    private Collection<Partition> ensureTopicCreated(final String topicName, final int partitionCount) {
        zkClient = new ZkClient(this.zkAddress, this.getZkSessionTimeout(), this.getZkConnectionTimeout(), (ZkSerializer)ZKStringSerializer$.MODULE$);
        try {
            block9: {
                block8: {
                    block10: {
                        topicConfig = new Properties();
                        topicMetadata = AdminUtils.fetchTopicMetadataFromZk((String)topicName, (ZkClient)zkClient);
                        if (topicMetadata.errorCode() != ErrorMapping.NoError()) break block8;
                        v0 = effectivePartitionCount = this.isAutoAddPartitions() != false ? Math.max(this.minPartitionCount, partitionCount) : partitionCount;
                        if (topicMetadata.partitionsMetadata().size() >= effectivePartitionCount) break block9;
                        if (!this.isAutoAddPartitions()) break block10;
                        AdminUtils.addPartitions((ZkClient)zkClient, (String)topicName, (int)effectivePartitionCount, null, (boolean)false, (Properties)new Properties());
                        break block9;
                    }
                    topicSize = topicMetadata.partitionsMetadata().size();
                    throw new BinderException("The number of expected partitions was: " + partitionCount + ", but " + topicSize + (topicSize > 1 ? " have " : " has ") + "been found instead." + "Consider either increasing the partition count of the topic or enabling `autoAddPartitions`");
                }
                if (topicMetadata.errorCode() != ErrorMapping.UnknownTopicOrPartitionCode()) ** GOTO lbl24
                if (this.isAutoCreateTopics()) {
                    brokerList = ZkUtils.getSortedBrokerList((ZkClient)zkClient);
                    effectivePartitionCount = Math.max(this.minPartitionCount, partitionCount);
                    replicaAssignment = AdminUtils.assignReplicasToBrokers((Seq)brokerList, (int)effectivePartitionCount, (int)this.replicationFactor, (int)-1, (int)-1);
                    this.metadataRetryOperations.execute((RetryCallback)new RetryCallback<Object, RuntimeException>(){

                        public Object doWithRetry(RetryContext context) throws RuntimeException {
                            AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK((ZkClient)zkClient, (String)topicName, (Map)replicaAssignment, (Properties)topicConfig, (boolean)true);
                            return null;
                        }
                    });
                } else {
                    throw new BinderException("Topic " + topicName + " does not exist");
lbl24:
                    // 1 sources

                    throw new BinderException("Error fetching Kafka topic metadata: ", ErrorMapping.exceptionFor((short)topicMetadata.errorCode()));
                }
            }
            try {
                var7_12 = partitions = (Collection)this.metadataRetryOperations.execute((RetryCallback)new RetryCallback<Collection<Partition>, Exception>(){

                    public Collection<Partition> doWithRetry(RetryContext context) throws Exception {
                        KafkaMessageChannelBinder.this.connectionFactory.refreshMetadata(Collections.singleton(topicName));
                        Collection partitions = KafkaMessageChannelBinder.this.connectionFactory.getPartitions(topicName);
                        if (partitions.size() < partitionCount) {
                            throw new IllegalStateException("The number of expected partitions was: " + partitionCount + ", but " + partitions.size() + (partitions.size() > 1 ? " have " : " has ") + "been found instead");
                        }
                        KafkaMessageChannelBinder.this.connectionFactory.getLeaders((Iterable)partitions);
                        return partitions;
                    }
                });
                return var7_12;
            }
            catch (Exception e) {
                this.logger.error((Object)"Cannot initialize Binder", (Throwable)e);
                throw new BinderException("Cannot initialize binder:", (Throwable)e);
            }
        }
        finally {
            zkClient.close();
        }
    }

    private Binding<MessageChannel> createKafkaConsumer(String name, MessageChannel moduleInputChannel, ExtendedConsumerProperties<KafkaConsumerProperties> properties, String group, long referencePoint) {
        Collection<Partition> listenedPartitions;
        KafkaMessageChannelBinder.validateTopicName(name);
        if (properties.getInstanceCount() == 0) {
            throw new IllegalArgumentException("Instance count cannot be zero");
        }
        Collection<Partition> allPartitions = this.ensureTopicCreated(name, properties.getInstanceCount() * properties.getConcurrency());
        DefaultDecoder valueDecoder = new DefaultDecoder(null);
        DefaultDecoder keyDecoder = new DefaultDecoder(null);
        if (properties.getInstanceCount() == 1) {
            listenedPartitions = allPartitions;
        } else {
            listenedPartitions = new ArrayList<Partition>();
            for (Partition partition : allPartitions) {
                if (partition.getId() % properties.getInstanceCount() != properties.getInstanceIndex()) continue;
                listenedPartitions.add(partition);
            }
        }
        this.topicsInUse.put(name, listenedPartitions);
        ReceivingHandler rh = new ReceivingHandler(properties);
        rh.setOutputChannel(moduleInputChannel);
        FixedSubscriberChannel bridge = new FixedSubscriberChannel((MessageHandler)rh);
        bridge.setBeanName("bridge." + name);
        Assert.isTrue((!CollectionUtils.isEmpty(listenedPartitions) ? 1 : 0) != 0, (String)"A list of partitions must be provided");
        final KafkaMessageListenerContainer messageListenerContainer = new KafkaMessageListenerContainer(this.connectionFactory, listenedPartitions.toArray(new Partition[listenedPartitions.size()]));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Listened partitions: " + StringUtils.collectionToCommaDelimitedString(listenedPartitions)));
        }
        OffsetManager offsetManager = this.createOffsetManager(group, referencePoint);
        if (((KafkaConsumerProperties)properties.getExtension()).isResetOffsets()) {
            offsetManager.resetOffsets(listenedPartitions);
        }
        messageListenerContainer.setOffsetManager(offsetManager);
        messageListenerContainer.setQueueSize(this.queueSize);
        messageListenerContainer.setMaxFetch(this.fetchSize);
        int concurrency = Math.min(properties.getConcurrency(), listenedPartitions.size());
        messageListenerContainer.setConcurrency(concurrency);
        final ExecutorService dispatcherTaskExecutor = Executors.newFixedThreadPool(concurrency, DAEMON_THREAD_FACTORY);
        messageListenerContainer.setDispatcherTaskExecutor((Executor)dispatcherTaskExecutor);
        final KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter(messageListenerContainer);
        kafkaMessageDrivenChannelAdapter.setBeanFactory((BeanFactory)this.getBeanFactory());
        kafkaMessageDrivenChannelAdapter.setKeyDecoder((Decoder)keyDecoder);
        kafkaMessageDrivenChannelAdapter.setPayloadDecoder((Decoder)valueDecoder);
        kafkaMessageDrivenChannelAdapter.setOutputChannel((MessageChannel)bridge);
        kafkaMessageDrivenChannelAdapter.setAutoCommitOffset(((KafkaConsumerProperties)properties.getExtension()).isAutoCommitOffset());
        kafkaMessageDrivenChannelAdapter.afterPropertiesSet();
        final RetryTemplate retryTemplate = this.buildRetryTemplateIfRetryEnabled((ConsumerProperties)properties);
        if (retryTemplate != null) {
            if (((KafkaConsumerProperties)properties.getExtension()).isAutoCommitOffset()) {
                final MessageListener originalMessageListener = (MessageListener)messageListenerContainer.getMessageListener();
                messageListenerContainer.setMessageListener((Object)new MessageListener(){

                    public void onMessage(final KafkaMessage message) {
                        try {
                            retryTemplate.execute((RetryCallback)new RetryCallback<Object, Throwable>(){

                                public Object doWithRetry(RetryContext context) {
                                    originalMessageListener.onMessage(message);
                                    return null;
                                }
                            });
                        }
                        catch (Throwable throwable) {
                            if (throwable instanceof RuntimeException) {
                                throw (RuntimeException)throwable;
                            }
                            throw new RuntimeException(throwable);
                        }
                    }
                });
            } else {
                messageListenerContainer.setMessageListener((Object)new AcknowledgingMessageListener(){
                    final AcknowledgingMessageListener originalMessageListener;
                    {
                        this.originalMessageListener = (AcknowledgingMessageListener)messageListenerContainer.getMessageListener();
                    }

                    public void onMessage(final KafkaMessage message, final Acknowledgment acknowledgment) {
                        retryTemplate.execute((RetryCallback)new RetryCallback<Object, RuntimeException>(){

                            public Object doWithRetry(RetryContext context) {
                                originalMessageListener.onMessage(message, acknowledgment);
                                return null;
                            }
                        });
                    }
                });
            }
        }
        if (((KafkaConsumerProperties)properties.getExtension()).isEnableDlq()) {
            final String dlqTopic = "error." + name + "." + group;
            this.initDlqProducer();
            messageListenerContainer.setErrorHandler(new ErrorHandler(){

                public void handle(Exception thrownException, final KafkaMessage message) {
                    final byte[] key = message.getMessage().key() != null ? Utils.toArray((ByteBuffer)message.getMessage().key()) : null;
                    final byte[] payload = message.getMessage().payload() != null ? Utils.toArray((ByteBuffer)message.getMessage().payload()) : null;
                    KafkaMessageChannelBinder.this.dlqProducer.send(new ProducerRecord(dlqTopic, (Object)key, (Object)payload), new Callback(){

                        public void onCompletion(RecordMetadata metadata, Exception exception) {
                            StringBuffer messageLog = new StringBuffer();
                            messageLog.append(" a message with key='" + KafkaMessageChannelBinder.this.toDisplayString(ObjectUtils.nullSafeToString((byte[])key), 50) + "'");
                            messageLog.append(" and payload='" + KafkaMessageChannelBinder.this.toDisplayString(ObjectUtils.nullSafeToString((byte[])payload), 50) + "'");
                            messageLog.append(" received from " + message.getMetadata().getPartition());
                            if (exception != null) {
                                KafkaMessageChannelBinder.this.logger.error((Object)("Error sending to DLQ" + messageLog.toString()), (Throwable)exception);
                            } else if (KafkaMessageChannelBinder.this.logger.isDebugEnabled()) {
                                KafkaMessageChannelBinder.this.logger.debug((Object)("Sent to DLQ " + messageLog.toString()));
                            }
                        }
                    });
                }
            });
        }
        kafkaMessageDrivenChannelAdapter.start();
        EventDrivenConsumer edc = new EventDrivenConsumer((SubscribableChannel)bridge, (MessageHandler)rh){

            protected void doStop() {
                kafkaMessageDrivenChannelAdapter.stop();
                if (messageListenerContainer.getOffsetManager() instanceof DisposableBean) {
                    try {
                        ((DisposableBean)messageListenerContainer.getOffsetManager()).destroy();
                    }
                    catch (Exception e) {
                        this.logger.error((Object)"Error while closing the offset manager", (Throwable)e);
                    }
                }
                super.doStop();
            }
        };
        String groupedName = this.groupedName(name, group);
        edc.setBeanName("inbound." + groupedName);
        DefaultBinding<MessageChannel> consumerBinding = new DefaultBinding<MessageChannel>(name, group, moduleInputChannel, (AbstractEndpoint)edc){

            protected void afterUnbind() {
                dispatcherTaskExecutor.shutdown();
            }
        };
        edc.start();
        return consumerBinding;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void initDlqProducer() {
        block6: {
            try {
                if (this.dlqProducer != null) break block6;
                KafkaMessageChannelBinder kafkaMessageChannelBinder = this;
                synchronized (kafkaMessageChannelBinder) {
                    if (this.dlqProducer == null) {
                        ProducerMetadata producerMetadata = new ProducerMetadata("dlqKafkaProducer", byte[].class, byte[].class, (Serializer)BYTE_ARRAY_SERIALIZER, (Serializer)BYTE_ARRAY_SERIALIZER);
                        producerMetadata.setSync(false);
                        producerMetadata.setCompressionType(ProducerMetadata.CompressionType.none);
                        producerMetadata.setBatchBytes(16384);
                        Properties additionalProps = new Properties();
                        additionalProps.put("acks", String.valueOf(this.requiredAcks));
                        additionalProps.put("linger.ms", String.valueOf(0));
                        ProducerFactoryBean producerFactoryBean = new ProducerFactoryBean(producerMetadata, this.brokers, additionalProps);
                        this.dlqProducer = producerFactoryBean.getObject();
                    }
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Cannot initialize DLQ producer:", e);
            }
        }
    }

    private OffsetManager createOffsetManager(String group, long referencePoint) {
        try {
            KafkaNativeOffsetManager kafkaOffsetManager = new KafkaNativeOffsetManager(this.connectionFactory, this.zookeeperConnect, Collections.emptyMap());
            kafkaOffsetManager.setConsumerId(group);
            kafkaOffsetManager.setReferenceTimestamp(referencePoint);
            kafkaOffsetManager.afterPropertiesSet();
            WindowingOffsetManager windowingOffsetManager = new WindowingOffsetManager((OffsetManager)kafkaOffsetManager);
            windowingOffsetManager.setTimespan(this.offsetUpdateTimeWindow);
            windowingOffsetManager.setCount(this.offsetUpdateCount);
            windowingOffsetManager.setShutdownTimeout(this.offsetUpdateShutdownTimeout);
            windowingOffsetManager.afterPropertiesSet();
            return windowingOffsetManager;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private String toDisplayString(String original, int maxCharacters) {
        if (original.length() <= maxCharacters) {
            return original;
        }
        return original.substring(0, maxCharacters) + "...";
    }

    public void doManualAck(LinkedList<MessageHeaders> messageHeadersList) {
        for (MessageHeaders headers : messageHeadersList) {
            Acknowledgment acknowledgment = (Acknowledgment)headers.get((Object)"kafka_acknowledgment");
            Assert.notNull((Object)acknowledgment, (String)"Acknowledgement shouldn't be null when acknowledging kafka message manually.");
            acknowledgment.acknowledge();
        }
    }

    public static enum StartOffset {
        earliest(OffsetRequest.EarliestTime()),
        latest(OffsetRequest.LatestTime());

        private final long referencePoint;

        private StartOffset(long referencePoint) {
            this.referencePoint = referencePoint;
        }

        public long getReferencePoint() {
            return this.referencePoint;
        }
    }

    private class SendingHandler
    extends AbstractMessageHandler {
        private final AtomicInteger roundRobinCount = new AtomicInteger();
        private final String topicName;
        private final ExtendedProducerProperties<KafkaProducerProperties> producerProperties;
        private final int numberOfKafkaPartitions;
        private final ProducerConfiguration<byte[], byte[]> producerConfiguration;
        private final PartitionHandler partitionHandler;

        private SendingHandler(String topicName, ExtendedProducerProperties<KafkaProducerProperties> properties, int numberOfPartitions, ProducerConfiguration<byte[], byte[]> producerConfiguration) {
            this.topicName = topicName;
            this.producerProperties = properties;
            this.numberOfKafkaPartitions = numberOfPartitions;
            ConfigurableListableBeanFactory beanFactory = KafkaMessageChannelBinder.this.getBeanFactory();
            this.setBeanFactory((BeanFactory)beanFactory);
            this.producerConfiguration = producerConfiguration;
            this.partitionHandler = new PartitionHandler(beanFactory, KafkaMessageChannelBinder.this.evaluationContext, KafkaMessageChannelBinder.this.partitionSelector, properties);
        }

        protected void handleMessageInternal(Message<?> message) throws Exception {
            int targetPartition = this.producerProperties.isPartitioned() ? this.partitionHandler.determinePartition(message) : this.roundRobin() % this.numberOfKafkaPartitions;
            if (HeaderMode.embeddedHeaders.equals((Object)this.producerProperties.getHeaderMode())) {
                MessageValues transformed = KafkaMessageChannelBinder.this.serializePayloadIfNecessary(message);
                byte[] messageToSend = KafkaMessageChannelBinder.this.embeddedHeadersMessageConverter.embedHeaders(transformed, KafkaMessageChannelBinder.this.headersToMap);
                this.producerConfiguration.send(this.topicName, Integer.valueOf(targetPartition), null, (Object)messageToSend);
            } else if (HeaderMode.raw.equals((Object)this.producerProperties.getHeaderMode())) {
                Object contentType = message.getHeaders().get((Object)"contentType");
                if (contentType != null && !contentType.equals("application/octet-stream")) {
                    this.logger.error((Object)("Raw mode supports only application/octet-stream content type" + message.getPayload().getClass()));
                }
                if (message.getPayload() instanceof byte[]) {
                    this.producerConfiguration.send(this.topicName, Integer.valueOf(targetPartition), null, (Object)((byte[])message.getPayload()));
                } else {
                    this.logger.error((Object)("Raw mode supports only byte[] payloads but value sent was of type " + message.getPayload().getClass()));
                }
            }
        }

        private int roundRobin() {
            int result = this.roundRobinCount.incrementAndGet();
            if (result == Integer.MAX_VALUE) {
                this.roundRobinCount.set(0);
            }
            return result;
        }
    }

    private class ReceivingHandler
    extends AbstractReplyProducingMessageHandler {
        private final ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties;

        public ReceivingHandler(ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
            this.consumerProperties = consumerProperties;
        }

        protected Object handleRequestMessage(Message<?> requestMessage) {
            if (HeaderMode.embeddedHeaders.equals((Object)this.consumerProperties.getHeaderMode())) {
                MessageValues messageValues = KafkaMessageChannelBinder.this.extractMessageValues(requestMessage);
                return MessageBuilder.createMessage((Object)messageValues.getPayload(), (MessageHeaders)new KafkaBinderHeaders((java.util.Map<String, Object>)messageValues));
            }
            return requestMessage;
        }

        protected boolean shouldCopyRequestHeaders() {
            return false;
        }

        private final class KafkaBinderHeaders
        extends MessageHeaders {
            KafkaBinderHeaders(java.util.Map<String, Object> headers) {
                super(headers, MessageHeaders.ID_VALUE_NONE, Long.valueOf(-1L));
            }
        }
    }

    private static class DaemonThreadFactory
    implements ThreadFactory {
        private DaemonThreadFactory() {
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, "kafka-binder-");
            thread.setDaemon(true);
            return thread;
        }
    }
}

