package org.springframework.cloud.stream.binder.kafka;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.admin.AdminUtils;
import kafka.api.OffsetRequest;
import kafka.api.TopicMetadata;
import kafka.common.ErrorMapping;
import kafka.serializer.DefaultDecoder;
import kafka.utils.VerifiableProperties;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Utils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.cloud.stream.binder.AbstractBinder;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.DefaultBinding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.MessageValues;
import org.springframework.cloud.stream.binder.PartitionHandler;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.integration.channel.FixedSubscriberChannel;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.kafka.core.ConnectionFactory;
import org.springframework.integration.kafka.core.DefaultConnectionFactory;
import org.springframework.integration.kafka.core.KafkaMessage;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.integration.kafka.core.ZookeeperConfiguration;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.listener.AcknowledgingMessageListener;
import org.springframework.integration.kafka.listener.Acknowledgment;
import org.springframework.integration.kafka.listener.ErrorHandler;
import org.springframework.integration.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.integration.kafka.listener.KafkaNativeOffsetManager;
import org.springframework.integration.kafka.listener.MessageListener;
import org.springframework.integration.kafka.listener.OffsetManager;
import org.springframework.integration.kafka.support.ProducerConfiguration;
import org.springframework.integration.kafka.support.ProducerFactoryBean;
import org.springframework.integration.kafka.support.ProducerListener;
import org.springframework.integration.kafka.support.ProducerMetadata;
import org.springframework.integration.kafka.support.ZookeeperConnect;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.class */
public class KafkaMessageChannelBinder extends AbstractBinder<MessageChannel, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>> implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties>, DisposableBean {
    public static final ByteArraySerializer BYTE_ARRAY_SERIALIZER = new ByteArraySerializer();
    public static final ThreadFactory DAEMON_THREAD_FACTORY;
    private final KafkaBinderConfigurationProperties configurationProperties;
    private final String[] headersToMap;
    private RetryOperations metadataRetryOperations;
    private ConnectionFactory connectionFactory;
    private ProducerListener producerListener;
    private volatile Producer<byte[], byte[]> dlqProducer;
    private final Map<String, Collection<Partition>> topicsInUse = new HashMap();
    private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$5, reason: invalid class name */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder$5.class */
    public class AnonymousClass5 implements AcknowledgingMessageListener {
        final AcknowledgingMessageListener originalMessageListener;
        final /* synthetic */ KafkaMessageListenerContainer val$messageListenerContainer;
        final /* synthetic */ RetryTemplate val$retryTemplate;

        AnonymousClass5(KafkaMessageListenerContainer kafkaMessageListenerContainer, RetryTemplate retryTemplate) {
            this.val$messageListenerContainer = kafkaMessageListenerContainer;
            this.val$retryTemplate = retryTemplate;
            this.originalMessageListener = (AcknowledgingMessageListener) this.val$messageListenerContainer.getMessageListener();
        }

        public void onMessage(final KafkaMessage kafkaMessage, final Acknowledgment acknowledgment) {
            this.val$retryTemplate.execute(new RetryCallback<Object, RuntimeException>() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.5.1
                public Object doWithRetry(RetryContext retryContext) {
                    AnonymousClass5.this.originalMessageListener.onMessage(kafkaMessage, acknowledgment);
                    return null;
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder$ReceivingHandler.class */
    public final class ReceivingHandler extends AbstractReplyProducingMessageHandler {
        private final ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties;

        /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder$ReceivingHandler$KafkaBinderHeaders.class */
        private final class KafkaBinderHeaders extends MessageHeaders {
            KafkaBinderHeaders(Map<String, Object> map) {
                super(map, MessageHeaders.ID_VALUE_NONE, -1L);
            }
        }

        private ReceivingHandler(ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
            this.consumerProperties = extendedConsumerProperties;
        }

        protected Object handleRequestMessage(Message<?> message) {
            if (!HeaderMode.embeddedHeaders.equals(this.consumerProperties.getHeaderMode())) {
                return message;
            }
            MessageValues extractMessageValues = KafkaMessageChannelBinder.this.extractMessageValues(message);
            return MessageBuilder.createMessage(extractMessageValues.getPayload(), new KafkaBinderHeaders(extractMessageValues));
        }

        protected boolean shouldCopyRequestHeaders() {
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder$SendingHandler.class */
    public final class SendingHandler extends AbstractMessageHandler {
        private final AtomicInteger roundRobinCount;
        private final String topicName;
        private final ExtendedProducerProperties<KafkaProducerProperties> producerProperties;
        private final int numberOfKafkaPartitions;
        private final ProducerConfiguration<byte[], byte[]> producerConfiguration;
        private final PartitionHandler partitionHandler;

        private SendingHandler(String str, ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties, int i, ProducerConfiguration<byte[], byte[]> producerConfiguration) {
            this.roundRobinCount = new AtomicInteger();
            this.topicName = str;
            this.producerProperties = extendedProducerProperties;
            this.numberOfKafkaPartitions = i;
            ConfigurableListableBeanFactory beanFactory = KafkaMessageChannelBinder.this.getBeanFactory();
            setBeanFactory(beanFactory);
            this.producerConfiguration = producerConfiguration;
            this.partitionHandler = new PartitionHandler(beanFactory, KafkaMessageChannelBinder.this.evaluationContext, KafkaMessageChannelBinder.this.partitionSelector, extendedProducerProperties);
        }

        protected void handleMessageInternal(Message<?> message) throws Exception {
            int determinePartition = this.producerProperties.isPartitioned() ? this.partitionHandler.determinePartition(message) : roundRobin() % this.numberOfKafkaPartitions;
            if (HeaderMode.embeddedHeaders.equals(this.producerProperties.getHeaderMode())) {
                this.producerConfiguration.send(this.topicName, Integer.valueOf(determinePartition), (Object) null, KafkaMessageChannelBinder.this.embeddedHeadersMessageConverter.embedHeaders(KafkaMessageChannelBinder.this.serializePayloadIfNecessary(message), KafkaMessageChannelBinder.this.headersToMap));
                return;
            }
            if (HeaderMode.raw.equals(this.producerProperties.getHeaderMode())) {
                Object obj = message.getHeaders().get("contentType");
                if (obj != null && !obj.equals("application/octet-stream")) {
                    this.logger.error("Raw mode supports only application/octet-stream content type" + message.getPayload().getClass());
                }
                if (message.getPayload() instanceof byte[]) {
                    this.producerConfiguration.send(this.topicName, Integer.valueOf(determinePartition), (Object) null, (byte[]) message.getPayload());
                } else {
                    this.logger.error("Raw mode supports only byte[] payloads but value sent was of type " + message.getPayload().getClass());
                }
            }
        }

        private int roundRobin() {
            int incrementAndGet = this.roundRobinCount.incrementAndGet();
            if (incrementAndGet == Integer.MAX_VALUE) {
                this.roundRobinCount.set(0);
            }
            return incrementAndGet;
        }
    }

    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder$StartOffset.class */
    public enum StartOffset {
        earliest(OffsetRequest.EarliestTime()),
        latest(OffsetRequest.LatestTime());

        private final long referencePoint;

        StartOffset(long j) {
            this.referencePoint = j;
        }

        public long getReferencePoint() {
            return this.referencePoint;
        }
    }

    public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
        this.configurationProperties = kafkaBinderConfigurationProperties;
        String[] headers = kafkaBinderConfigurationProperties.getHeaders();
        if (ObjectUtils.isEmpty(headers)) {
            this.headersToMap = BinderHeaders.STANDARD_HEADERS;
            return;
        }
        String[] strArr = (String[]) Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0, BinderHeaders.STANDARD_HEADERS.length + headers.length);
        System.arraycopy(headers, 0, strArr, BinderHeaders.STANDARD_HEADERS.length, headers.length);
        this.headersToMap = strArr;
    }

    String getZkAddress() {
        return this.configurationProperties.getZkConnectionString();
    }

    public ConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    public void setProducerListener(ProducerListener producerListener) {
        this.producerListener = producerListener;
    }

    public void setMetadataRetryOperations(RetryOperations retryOperations) {
        this.metadataRetryOperations = retryOperations;
    }

    public void setExtendedBindingProperties(KafkaExtendedBindingProperties kafkaExtendedBindingProperties) {
        this.extendedBindingProperties = kafkaExtendedBindingProperties;
    }

    public void onInit() throws Exception {
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(new ZookeeperConnect(this.configurationProperties.getZkConnectionString()));
        zookeeperConfiguration.setBufferSize(this.configurationProperties.getSocketBufferSize());
        zookeeperConfiguration.setMaxWait(this.configurationProperties.getMaxWait());
        DefaultConnectionFactory defaultConnectionFactory = new DefaultConnectionFactory(zookeeperConfiguration);
        defaultConnectionFactory.afterPropertiesSet();
        this.connectionFactory = defaultConnectionFactory;
        if (this.metadataRetryOperations == null) {
            RetryTemplate retryTemplate = new RetryTemplate();
            SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
            simpleRetryPolicy.setMaxAttempts(10);
            retryTemplate.setRetryPolicy(simpleRetryPolicy);
            ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
            exponentialBackOffPolicy.setInitialInterval(100L);
            exponentialBackOffPolicy.setMultiplier(2.0d);
            exponentialBackOffPolicy.setMaxInterval(1000L);
            retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
            this.metadataRetryOperations = retryTemplate;
        }
    }

    public void destroy() throws Exception {
        if (this.dlqProducer != null) {
            this.dlqProducer.close();
            this.dlqProducer = null;
        }
    }

    public static void validateTopicName(String str) {
        try {
            for (byte b : str.getBytes("UTF-8")) {
                if ((b < 97 || b > 122) && ((b < 65 || b > 90) && !((b >= 48 && b <= 57) || b == 46 || b == 45 || b == 95))) {
                    throw new IllegalArgumentException("Topic name can only have ASCII alphanumerics, '.', '_' and '-'");
                }
            }
        } catch (UnsupportedEncodingException e) {
            throw new AssertionError(e);
        }
    }

    /* renamed from: getExtendedConsumerProperties, reason: merged with bridge method [inline-methods] */
    public KafkaConsumerProperties m4getExtendedConsumerProperties(String str) {
        return this.extendedBindingProperties.m1getExtendedConsumerProperties(str);
    }

    /* renamed from: getExtendedProducerProperties, reason: merged with bridge method [inline-methods] */
    public KafkaProducerProperties m3getExtendedProducerProperties(String str) {
        return this.extendedBindingProperties.m0getExtendedProducerProperties(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, Collection<Partition>> getTopicsInUse() {
        return this.topicsInUse;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Binding<MessageChannel> doBindConsumer(String str, String str2, MessageChannel messageChannel, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        boolean z = !StringUtils.hasText(str2);
        Assert.isTrue((z && ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isEnableDlq()) ? false : true, "DLQ support is not available for anonymous subscriptions");
        return createKafkaConsumer(str, messageChannel, extendedConsumerProperties, z ? "anonymous." + UUID.randomUUID().toString() : str2, ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getStartOffset() != null ? ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getStartOffset().getReferencePoint() : z ? OffsetRequest.LatestTime() : OffsetRequest.EarliestTime());
    }

    public Binding<MessageChannel> doBindProducer(String str, MessageChannel messageChannel, ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties) {
        Assert.isInstanceOf(SubscribableChannel.class, messageChannel);
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Using kafka topic for outbound: " + str);
        }
        validateTopicName(str);
        Collection<Partition> ensureTopicCreated = ensureTopicCreated(str, extendedProducerProperties.getPartitionCount());
        if (extendedProducerProperties.getPartitionCount() < ensureTopicCreated.size() && this.logger.isInfoEnabled()) {
            this.logger.info("The `partitionCount` of the producer for topic " + str + " is " + extendedProducerProperties.getPartitionCount() + ", smaller than the actual partition count of " + ensureTopicCreated.size() + " of the topic. The larger number will be used instead.");
        }
        this.topicsInUse.put(str, ensureTopicCreated);
        ProducerMetadata producerMetadata = new ProducerMetadata(str, byte[].class, byte[].class, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER);
        producerMetadata.setSync(((KafkaProducerProperties) extendedProducerProperties.getExtension()).isSync());
        producerMetadata.setCompressionType(((KafkaProducerProperties) extendedProducerProperties.getExtension()).getCompressionType());
        producerMetadata.setBatchBytes(((KafkaProducerProperties) extendedProducerProperties.getExtension()).getBufferSize());
        Properties properties = new Properties();
        if (!ObjectUtils.isEmpty(((KafkaProducerProperties) extendedProducerProperties.getExtension()).getConfiguration())) {
            properties.putAll(((KafkaProducerProperties) extendedProducerProperties.getExtension()).getConfiguration());
        }
        properties.put("acks", String.valueOf(this.configurationProperties.getRequiredAcks()));
        properties.put("linger.ms", String.valueOf(((KafkaProducerProperties) extendedProducerProperties.getExtension()).getBatchTimeout()));
        properties.put("max.request.size", String.valueOf(((KafkaProducerProperties) extendedProducerProperties.getExtension()).getMaxRequestSize()));
        try {
            final ProducerConfiguration producerConfiguration = new ProducerConfiguration(producerMetadata, new ProducerFactoryBean(producerMetadata, this.configurationProperties.getKafkaConnectionString(), properties).getObject());
            producerConfiguration.setProducerListener(this.producerListener);
            EventDrivenConsumer eventDrivenConsumer = new EventDrivenConsumer((SubscribableChannel) messageChannel, new SendingHandler(str, extendedProducerProperties, ensureTopicCreated.size(), producerConfiguration)) { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.1
                protected void doStop() {
                    super.doStop();
                    producerConfiguration.stop();
                }
            };
            eventDrivenConsumer.setBeanFactory(getBeanFactory());
            eventDrivenConsumer.setBeanName("outbound." + str);
            eventDrivenConsumer.afterPropertiesSet();
            DefaultBinding defaultBinding = new DefaultBinding(str, (String) null, messageChannel, eventDrivenConsumer);
            eventDrivenConsumer.start();
            return defaultBinding;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private Collection<Partition> ensureTopicCreated(final String str, final int i) {
        final ZkClient zkClient = new ZkClient(this.configurationProperties.getZkConnectionString(), this.configurationProperties.getZkSessionTimeout(), this.configurationProperties.getZkConnectionTimeout(), ZKStringSerializer$.MODULE$);
        try {
            final Properties properties = new Properties();
            TopicMetadata fetchTopicMetadataFromZk = AdminUtils.fetchTopicMetadataFromZk(str, zkClient);
            if (fetchTopicMetadataFromZk.errorCode() == ErrorMapping.NoError()) {
                int max = this.configurationProperties.isAutoAddPartitions() ? Math.max(this.configurationProperties.getMinPartitionCount(), i) : i;
                if (fetchTopicMetadataFromZk.partitionsMetadata().size() < max) {
                    if (!this.configurationProperties.isAutoAddPartitions()) {
                        int size = fetchTopicMetadataFromZk.partitionsMetadata().size();
                        throw new BinderException("The number of expected partitions was: " + i + ", but " + size + (size > 1 ? " have " : " has ") + "been found instead.Consider either increasing the partition count of the topic or enabling `autoAddPartitions`");
                    }
                    AdminUtils.addPartitions(zkClient, str, max, (String) null, false, new Properties());
                }
            } else {
                if (fetchTopicMetadataFromZk.errorCode() != ErrorMapping.UnknownTopicOrPartitionCode()) {
                    throw new BinderException("Error fetching Kafka topic metadata: ", ErrorMapping.exceptionFor(fetchTopicMetadataFromZk.errorCode()));
                }
                if (!this.configurationProperties.isAutoCreateTopics()) {
                    throw new BinderException("Topic " + str + " does not exist");
                }
                final scala.collection.Map assignReplicasToBrokers = AdminUtils.assignReplicasToBrokers(ZkUtils.getSortedBrokerList(zkClient), Math.max(this.configurationProperties.getMinPartitionCount(), i), this.configurationProperties.getReplicationFactor(), -1, -1);
                this.metadataRetryOperations.execute(new RetryCallback<Object, RuntimeException>() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.2
                    public Object doWithRetry(RetryContext retryContext) throws RuntimeException {
                        AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, str, assignReplicasToBrokers, properties, true);
                        return null;
                    }
                });
            }
            try {
                Collection<Partition> collection = (Collection) this.metadataRetryOperations.execute(new RetryCallback<Collection<Partition>, Exception>() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.3
                    /* renamed from: doWithRetry, reason: merged with bridge method [inline-methods] */
                    public Collection<Partition> m5doWithRetry(RetryContext retryContext) throws Exception {
                        KafkaMessageChannelBinder.this.connectionFactory.refreshMetadata(Collections.singleton(str));
                        Collection<Partition> partitions = KafkaMessageChannelBinder.this.connectionFactory.getPartitions(str);
                        if (partitions.size() < i) {
                            throw new IllegalStateException("The number of expected partitions was: " + i + ", but " + partitions.size() + (partitions.size() > 1 ? " have " : " has ") + "been found instead");
                        }
                        KafkaMessageChannelBinder.this.connectionFactory.getLeaders(partitions);
                        return partitions;
                    }
                });
                zkClient.close();
                return collection;
            } catch (Exception e) {
                this.logger.error("Cannot initialize Binder", e);
                throw new BinderException("Cannot initialize binder:", e);
            }
        } catch (Throwable th) {
            zkClient.close();
            throw th;
        }
    }

    private Binding<MessageChannel> createKafkaConsumer(String str, MessageChannel messageChannel, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties, String str2, long j) {
        Collection<Partition> arrayList;
        validateTopicName(str);
        if (extendedConsumerProperties.getInstanceCount() == 0) {
            throw new IllegalArgumentException("Instance count cannot be zero");
        }
        Collection<Partition> ensureTopicCreated = ensureTopicCreated(str, extendedConsumerProperties.getInstanceCount() * extendedConsumerProperties.getConcurrency());
        DefaultDecoder defaultDecoder = new DefaultDecoder((VerifiableProperties) null);
        DefaultDecoder defaultDecoder2 = new DefaultDecoder((VerifiableProperties) null);
        if (extendedConsumerProperties.getInstanceCount() == 1) {
            arrayList = ensureTopicCreated;
        } else {
            arrayList = new ArrayList();
            for (Partition partition : ensureTopicCreated) {
                if (partition.getId() % extendedConsumerProperties.getInstanceCount() == extendedConsumerProperties.getInstanceIndex()) {
                    arrayList.add(partition);
                }
            }
        }
        this.topicsInUse.put(str, arrayList);
        ReceivingHandler receivingHandler = new ReceivingHandler(extendedConsumerProperties);
        receivingHandler.setOutputChannel(messageChannel);
        FixedSubscriberChannel fixedSubscriberChannel = new FixedSubscriberChannel(receivingHandler);
        fixedSubscriberChannel.setBeanName("bridge." + str);
        Assert.isTrue(!CollectionUtils.isEmpty(arrayList), "A list of partitions must be provided");
        final KafkaMessageListenerContainer kafkaMessageListenerContainer = new KafkaMessageListenerContainer(this.connectionFactory, (Partition[]) arrayList.toArray(new Partition[arrayList.size()]));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Listened partitions: " + StringUtils.collectionToCommaDelimitedString(arrayList));
        }
        OffsetManager createOffsetManager = createOffsetManager(str2, j);
        if (((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isResetOffsets()) {
            createOffsetManager.resetOffsets(arrayList);
        }
        kafkaMessageListenerContainer.setOffsetManager(createOffsetManager);
        kafkaMessageListenerContainer.setQueueSize(this.configurationProperties.getQueueSize());
        kafkaMessageListenerContainer.setMaxFetch(this.configurationProperties.getFetchSize());
        kafkaMessageListenerContainer.setAutoCommitOnError(((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getAutoCommitOnError() != null ? ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getAutoCommitOnError().booleanValue() : ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isAutoCommitOffset() && ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isEnableDlq());
        kafkaMessageListenerContainer.setRecoveryInterval(((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getRecoveryInterval());
        int min = Math.min(extendedConsumerProperties.getConcurrency(), arrayList.size());
        kafkaMessageListenerContainer.setConcurrency(min);
        final ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(min, DAEMON_THREAD_FACTORY);
        kafkaMessageListenerContainer.setDispatcherTaskExecutor(newFixedThreadPool);
        final KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter(kafkaMessageListenerContainer);
        kafkaMessageDrivenChannelAdapter.setBeanFactory(getBeanFactory());
        kafkaMessageDrivenChannelAdapter.setKeyDecoder(defaultDecoder2);
        kafkaMessageDrivenChannelAdapter.setPayloadDecoder(defaultDecoder);
        kafkaMessageDrivenChannelAdapter.setOutputChannel(fixedSubscriberChannel);
        kafkaMessageDrivenChannelAdapter.setAutoCommitOffset(((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isAutoCommitOffset());
        kafkaMessageDrivenChannelAdapter.afterPropertiesSet();
        final RetryTemplate buildRetryTemplateIfRetryEnabled = buildRetryTemplateIfRetryEnabled(extendedConsumerProperties);
        if (buildRetryTemplateIfRetryEnabled != null) {
            if (((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isAutoCommitOffset()) {
                final MessageListener messageListener = (MessageListener) kafkaMessageListenerContainer.getMessageListener();
                kafkaMessageListenerContainer.setMessageListener(new MessageListener() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.4
                    public void onMessage(final KafkaMessage kafkaMessage) {
                        try {
                            buildRetryTemplateIfRetryEnabled.execute(new RetryCallback<Object, Throwable>() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.4.1
                                public Object doWithRetry(RetryContext retryContext) {
                                    messageListener.onMessage(kafkaMessage);
                                    return null;
                                }
                            });
                        } catch (Throwable th) {
                            if (!(th instanceof RuntimeException)) {
                                throw new RuntimeException(th);
                            }
                            throw ((RuntimeException) th);
                        }
                    }
                });
            } else {
                kafkaMessageListenerContainer.setMessageListener(new AnonymousClass5(kafkaMessageListenerContainer, buildRetryTemplateIfRetryEnabled));
            }
        }
        if (((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isEnableDlq()) {
            final String str3 = "error." + str + "." + str2;
            initDlqProducer();
            kafkaMessageListenerContainer.setErrorHandler(new ErrorHandler() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.6
                public void handle(Exception exc, final KafkaMessage kafkaMessage) {
                    final byte[] array = kafkaMessage.getMessage().key() != null ? Utils.toArray(kafkaMessage.getMessage().key()) : null;
                    final byte[] array2 = kafkaMessage.getMessage().payload() != null ? Utils.toArray(kafkaMessage.getMessage().payload()) : null;
                    KafkaMessageChannelBinder.this.dlqProducer.send(new ProducerRecord(str3, array, array2), new Callback() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.6.1
                        public void onCompletion(RecordMetadata recordMetadata, Exception exc2) {
                            StringBuffer stringBuffer = new StringBuffer();
                            stringBuffer.append(" a message with key='" + KafkaMessageChannelBinder.this.toDisplayString(ObjectUtils.nullSafeToString(array), 50) + "'");
                            stringBuffer.append(" and payload='" + KafkaMessageChannelBinder.this.toDisplayString(ObjectUtils.nullSafeToString(array2), 50) + "'");
                            stringBuffer.append(" received from " + kafkaMessage.getMetadata().getPartition());
                            if (exc2 != null) {
                                KafkaMessageChannelBinder.this.logger.error("Error sending to DLQ" + stringBuffer.toString(), exc2);
                            } else if (KafkaMessageChannelBinder.this.logger.isDebugEnabled()) {
                                KafkaMessageChannelBinder.this.logger.debug("Sent to DLQ " + stringBuffer.toString());
                            }
                        }
                    });
                }
            });
        }
        kafkaMessageDrivenChannelAdapter.start();
        EventDrivenConsumer eventDrivenConsumer = new EventDrivenConsumer(fixedSubscriberChannel, receivingHandler) { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.7
            protected void doStop() {
                kafkaMessageDrivenChannelAdapter.stop();
                if (kafkaMessageListenerContainer.getOffsetManager() instanceof DisposableBean) {
                    try {
                        kafkaMessageListenerContainer.getOffsetManager().destroy();
                    } catch (Exception e) {
                        this.logger.error("Error while closing the offset manager", e);
                    }
                }
                super.doStop();
            }
        };
        eventDrivenConsumer.setBeanName("inbound." + groupedName(str, str2));
        DefaultBinding<MessageChannel> defaultBinding = new DefaultBinding<MessageChannel>(str, str2, messageChannel, eventDrivenConsumer) { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.8
            protected void afterUnbind() {
                newFixedThreadPool.shutdown();
            }
        };
        eventDrivenConsumer.start();
        return defaultBinding;
    }

    private synchronized void initDlqProducer() {
        try {
            if (this.dlqProducer == null) {
                synchronized (this) {
                    if (this.dlqProducer == null) {
                        ProducerMetadata producerMetadata = new ProducerMetadata("dlqKafkaProducer", byte[].class, byte[].class, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER);
                        producerMetadata.setSync(false);
                        producerMetadata.setCompressionType(ProducerMetadata.CompressionType.none);
                        producerMetadata.setBatchBytes(16384);
                        Properties properties = new Properties();
                        properties.put("acks", String.valueOf(this.configurationProperties.getRequiredAcks()));
                        properties.put("linger.ms", String.valueOf(0));
                        this.dlqProducer = new ProducerFactoryBean(producerMetadata, this.configurationProperties.getKafkaConnectionString(), properties).getObject();
                    }
                }
            }
        } catch (Exception e) {
            throw new RuntimeException("Cannot initialize DLQ producer:", e);
        }
    }

    private OffsetManager createOffsetManager(String str, long j) {
        try {
            KafkaNativeOffsetManager kafkaNativeOffsetManager = new KafkaNativeOffsetManager(this.connectionFactory, new ZookeeperConnect(this.configurationProperties.getZkConnectionString()), Collections.emptyMap());
            kafkaNativeOffsetManager.setConsumerId(str);
            kafkaNativeOffsetManager.setReferenceTimestamp(j);
            kafkaNativeOffsetManager.afterPropertiesSet();
            WindowingOffsetManager windowingOffsetManager = new WindowingOffsetManager(kafkaNativeOffsetManager);
            windowingOffsetManager.setTimespan(this.configurationProperties.getOffsetUpdateTimeWindow());
            windowingOffsetManager.setCount(this.configurationProperties.getOffsetUpdateCount());
            windowingOffsetManager.setShutdownTimeout(this.configurationProperties.getOffsetUpdateShutdownTimeout());
            windowingOffsetManager.afterPropertiesSet();
            return windowingOffsetManager;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String toDisplayString(String str, int i) {
        return str.length() <= i ? str : str.substring(0, i) + "...";
    }

    public void doManualAck(LinkedList<MessageHeaders> linkedList) {
        Iterator<MessageHeaders> it = linkedList.iterator();
        while (it.hasNext()) {
            Acknowledgment acknowledgment = (Acknowledgment) it.next().get("kafka_acknowledgment");
            Assert.notNull(acknowledgment, "Acknowledgement shouldn't be null when acknowledging kafka message manually.");
            acknowledgment.acknowledge();
        }
    }

    static {
        CustomizableThreadFactory customizableThreadFactory = new CustomizableThreadFactory("kafka-binder-");
        customizableThreadFactory.setDaemon(true);
        DAEMON_THREAD_FACTORY = customizableThreadFactory;
    }
}
