/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import kafka.common.ErrorMapping;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Utils;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.kafka.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaTopicUtils;
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.context.Lifecycle;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

public class KafkaMessageChannelBinder
extends AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>, Collection<PartitionInfo>, String>
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties>,
DisposableBean {
    private final KafkaBinderConfigurationProperties configurationProperties;
    private RetryOperations metadataRetryOperations;
    private final Map<String, Collection<PartitionInfo>> topicsInUse = new HashMap<String, Collection<PartitionInfo>>();
    private ProducerListener<byte[], byte[]> producerListener;
    private volatile Producer<byte[], byte[]> dlqProducer;
    private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
    private AdminUtilsOperation adminUtilsOperation;

    public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties) {
        super(false, KafkaMessageChannelBinder.headersToMap(configurationProperties));
        this.configurationProperties = configurationProperties;
    }

    private static String[] headersToMap(KafkaBinderConfigurationProperties configurationProperties) {
        String[] headersToMap;
        if (ObjectUtils.isEmpty((Object[])configurationProperties.getHeaders())) {
            headersToMap = BinderHeaders.STANDARD_HEADERS;
        } else {
            String[] combinedHeadersToMap = Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0, BinderHeaders.STANDARD_HEADERS.length + configurationProperties.getHeaders().length);
            System.arraycopy(configurationProperties.getHeaders(), 0, combinedHeadersToMap, BinderHeaders.STANDARD_HEADERS.length, configurationProperties.getHeaders().length);
            headersToMap = combinedHeadersToMap;
        }
        return headersToMap;
    }

    public void setAdminUtilsOperation(AdminUtilsOperation adminUtilsOperation) {
        this.adminUtilsOperation = adminUtilsOperation;
    }

    public void setMetadataRetryOperations(RetryOperations metadataRetryOperations) {
        this.metadataRetryOperations = metadataRetryOperations;
    }

    public void setExtendedBindingProperties(KafkaExtendedBindingProperties extendedBindingProperties) {
        this.extendedBindingProperties = extendedBindingProperties;
    }

    public void onInit() throws Exception {
        if (this.metadataRetryOperations == null) {
            RetryTemplate retryTemplate = new RetryTemplate();
            SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
            simpleRetryPolicy.setMaxAttempts(10);
            retryTemplate.setRetryPolicy((RetryPolicy)simpleRetryPolicy);
            ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
            backOffPolicy.setInitialInterval(100L);
            backOffPolicy.setMultiplier(2.0);
            backOffPolicy.setMaxInterval(1000L);
            retryTemplate.setBackOffPolicy((BackOffPolicy)backOffPolicy);
            this.metadataRetryOperations = retryTemplate;
        }
    }

    public void destroy() throws Exception {
        if (this.dlqProducer != null) {
            this.dlqProducer.close();
            this.dlqProducer = null;
        }
    }

    public void setProducerListener(ProducerListener<byte[], byte[]> producerListener) {
        this.producerListener = producerListener;
    }

    Map<String, Collection<PartitionInfo>> getTopicsInUse() {
        return this.topicsInUse;
    }

    public KafkaConsumerProperties getExtendedConsumerProperties(String channelName) {
        return this.extendedBindingProperties.getExtendedConsumerProperties(channelName);
    }

    public KafkaProducerProperties getExtendedProducerProperties(String channelName) {
        return this.extendedBindingProperties.getExtendedProducerProperties(channelName);
    }

    protected MessageHandler createProducerMessageHandler(String destination, ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {
        KafkaTopicUtils.validateTopicName(destination);
        this.createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(destination, producerProperties.getPartitionCount());
        Collection<PartitionInfo> partitions = this.getPartitionsForTopic(destination, producerProperties.getPartitionCount());
        if (producerProperties.getPartitionCount() < partitions.size() && this.logger.isInfoEnabled()) {
            this.logger.info((Object)("The `partitionCount` of the producer for topic " + destination + " is " + producerProperties.getPartitionCount() + ", smaller than the actual partition count of " + partitions.size() + " of the topic. The larger number will be used instead."));
        }
        this.topicsInUse.put(destination, partitions);
        DefaultKafkaProducerFactory<byte[], byte[]> producerFB = this.getProducerFactory(producerProperties);
        KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFB);
        if (this.producerListener != null) {
            kafkaTemplate.setProducerListener(this.producerListener);
        }
        return new ProducerConfigurationMessageHandler(kafkaTemplate, destination, producerProperties, producerFB);
    }

    protected String createProducerDestinationIfNecessary(String name, ExtendedProducerProperties<KafkaProducerProperties> properties) {
        if (this.logger.isInfoEnabled()) {
            this.logger.info((Object)("Using kafka topic for outbound: " + name));
        }
        KafkaTopicUtils.validateTopicName(name);
        this.createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(name, properties.getPartitionCount());
        Collection<PartitionInfo> partitions = this.getPartitionsForTopic(name, properties.getPartitionCount());
        if (properties.getPartitionCount() < partitions.size() && this.logger.isInfoEnabled()) {
            this.logger.info((Object)("The `partitionCount` of the producer for topic " + name + " is " + properties.getPartitionCount() + ", smaller than the actual partition count of " + partitions.size() + " of the topic. The larger number will be used instead."));
        }
        this.topicsInUse.put(name, partitions);
        return name;
    }

    private DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(ExtendedProducerProperties<KafkaProducerProperties> producerProperties) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        if (!ObjectUtils.isEmpty(this.configurationProperties.getConfiguration())) {
            props.putAll(this.configurationProperties.getConfiguration());
        }
        props.put("bootstrap.servers", this.configurationProperties.getKafkaConnectionString());
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 0x2000000);
        props.put("key.serializer", ByteArraySerializer.class);
        props.put("value.serializer", ByteArraySerializer.class);
        props.put("acks", String.valueOf(this.configurationProperties.getRequiredAcks()));
        props.put("linger.ms", String.valueOf(((KafkaProducerProperties)producerProperties.getExtension()).getBatchTimeout()));
        props.put("compression.type", ((KafkaProducerProperties)producerProperties.getExtension()).getCompressionType().toString());
        if (!ObjectUtils.isEmpty(((KafkaProducerProperties)producerProperties.getExtension()).getConfiguration())) {
            props.putAll(((KafkaProducerProperties)producerProperties.getExtension()).getConfiguration());
        }
        return new DefaultKafkaProducerFactory(props);
    }

    protected Collection<PartitionInfo> createConsumerDestinationIfNecessary(String name, String group, ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
        Collection<PartitionInfo> listenedPartitions;
        KafkaTopicUtils.validateTopicName(name);
        if (properties.getInstanceCount() == 0) {
            throw new IllegalArgumentException("Instance count cannot be zero");
        }
        int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
        this.createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(name, partitionCount);
        Collection<PartitionInfo> allPartitions = this.getPartitionsForTopic(name, partitionCount);
        if (((KafkaConsumerProperties)properties.getExtension()).isAutoRebalanceEnabled() || properties.getInstanceCount() == 1) {
            listenedPartitions = allPartitions;
        } else {
            listenedPartitions = new ArrayList<PartitionInfo>();
            for (PartitionInfo partition : allPartitions) {
                if (partition.partition() % properties.getInstanceCount() != properties.getInstanceIndex()) continue;
                listenedPartitions.add(partition);
            }
        }
        this.topicsInUse.put(name, listenedPartitions);
        return listenedPartitions;
    }

    protected MessageProducer createConsumerEndpoint(String name, String group, Collection<PartitionInfo> destination, ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
        boolean anonymous = !StringUtils.hasText((String)group);
        Assert.isTrue((!anonymous || !((KafkaConsumerProperties)properties.getExtension()).isEnableDlq() ? 1 : 0) != 0, (String)"DLQ support is not available for anonymous subscriptions");
        String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
        Map<String, Object> props = this.getConsumerConfig(anonymous, consumerGroup);
        ByteArrayDeserializer valueDecoder = new ByteArrayDeserializer();
        ByteArrayDeserializer keyDecoder = new ByteArrayDeserializer();
        if (!ObjectUtils.isEmpty(((KafkaConsumerProperties)properties.getExtension()).getConfiguration())) {
            props.putAll(((KafkaConsumerProperties)properties.getExtension()).getConfiguration());
        }
        DefaultKafkaConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory(props, (Deserializer)keyDecoder, (Deserializer)valueDecoder);
        Collection<PartitionInfo> listenedPartitions = destination;
        Assert.isTrue((!CollectionUtils.isEmpty(listenedPartitions) ? 1 : 0) != 0, (String)"A list of partitions must be provided");
        TopicPartitionInitialOffset[] topicPartitionInitialOffsets = this.getTopicPartitionInitialOffsets(listenedPartitions);
        ContainerProperties containerProperties = anonymous || ((KafkaConsumerProperties)properties.getExtension()).isAutoRebalanceEnabled() ? new ContainerProperties(new String[]{name}) : new ContainerProperties(topicPartitionInitialOffsets);
        int concurrency = Math.min(properties.getConcurrency(), listenedPartitions.size());
        ConcurrentMessageListenerContainer messageListenerContainer = new ConcurrentMessageListenerContainer((ConsumerFactory)consumerFactory, containerProperties){

            public void stop(Runnable callback) {
                super.stop(callback);
            }
        };
        messageListenerContainer.setConcurrency(concurrency);
        messageListenerContainer.getContainerProperties().setAckOnError(this.isAutoCommitOnError(properties));
        if (!((KafkaConsumerProperties)properties.getExtension()).isAutoCommitOffset()) {
            messageListenerContainer.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Listened partitions: " + StringUtils.collectionToCommaDelimitedString(listenedPartitions)));
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Listened partitions: " + StringUtils.collectionToCommaDelimitedString(listenedPartitions)));
        }
        KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter((AbstractMessageListenerContainer)messageListenerContainer);
        kafkaMessageDrivenChannelAdapter.setBeanFactory((BeanFactory)this.getBeanFactory());
        RetryTemplate retryTemplate = this.buildRetryTemplate((ConsumerProperties)properties);
        kafkaMessageDrivenChannelAdapter.setRetryTemplate(retryTemplate);
        if (((KafkaConsumerProperties)properties.getExtension()).isEnableDlq()) {
            final String dlqTopic = "error." + name + "." + group;
            this.initDlqProducer();
            messageListenerContainer.getContainerProperties().setErrorHandler(new ErrorHandler(){

                public void handle(Exception thrownException, final ConsumerRecord message) {
                    final byte[] key = message.key() != null ? Utils.toArray((ByteBuffer)ByteBuffer.wrap((byte[])message.key())) : null;
                    final byte[] payload = message.value() != null ? Utils.toArray((ByteBuffer)ByteBuffer.wrap((byte[])message.value())) : null;
                    KafkaMessageChannelBinder.this.dlqProducer.send(new ProducerRecord(dlqTopic, (Object)key, (Object)payload), new Callback(){

                        public void onCompletion(RecordMetadata metadata, Exception exception) {
                            StringBuffer messageLog = new StringBuffer();
                            messageLog.append(" a message with key='" + KafkaMessageChannelBinder.this.toDisplayString(ObjectUtils.nullSafeToString((byte[])key), 50) + "'");
                            messageLog.append(" and payload='" + KafkaMessageChannelBinder.this.toDisplayString(ObjectUtils.nullSafeToString((byte[])payload), 50) + "'");
                            messageLog.append(" received from " + message.partition());
                            if (exception != null) {
                                KafkaMessageChannelBinder.this.logger.error((Object)("Error sending to DLQ" + messageLog.toString()), (Throwable)exception);
                            } else if (KafkaMessageChannelBinder.this.logger.isDebugEnabled()) {
                                KafkaMessageChannelBinder.this.logger.debug((Object)("Sent to DLQ " + messageLog.toString()));
                            }
                        }
                    });
                }
            });
        }
        return kafkaMessageDrivenChannelAdapter;
    }

    private Map<String, Object> getConsumerConfig(boolean anonymous, String consumerGroup) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        if (!ObjectUtils.isEmpty(this.configurationProperties.getConfiguration())) {
            props.putAll(this.configurationProperties.getConfiguration());
        }
        props.put("bootstrap.servers", this.configurationProperties.getKafkaConnectionString());
        props.put("enable.auto.commit", false);
        props.put("group.id", consumerGroup);
        props.put("auto.offset.reset", anonymous ? "latest" : "earliest");
        props.put("auto.commit.interval.ms", 100);
        return props;
    }

    private boolean isAutoCommitOnError(ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
        return ((KafkaConsumerProperties)properties.getExtension()).getAutoCommitOnError() != null ? ((KafkaConsumerProperties)properties.getExtension()).getAutoCommitOnError() : ((KafkaConsumerProperties)properties.getExtension()).isAutoCommitOffset() && ((KafkaConsumerProperties)properties.getExtension()).isEnableDlq();
    }

    private TopicPartitionInitialOffset[] getTopicPartitionInitialOffsets(Collection<PartitionInfo> listenedPartitions) {
        TopicPartitionInitialOffset[] topicPartitionInitialOffsets = new TopicPartitionInitialOffset[listenedPartitions.size()];
        int i = 0;
        for (PartitionInfo partition : listenedPartitions) {
            topicPartitionInitialOffsets[i++] = new TopicPartitionInitialOffset(partition.topic(), partition.partition());
        }
        return topicPartitionInitialOffsets;
    }

    private void createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(String topicName, int partitionCount) {
        if (this.configurationProperties.isAutoCreateTopics() && this.adminUtilsOperation != null) {
            this.createTopicAndPartitions(topicName, partitionCount);
        } else if (this.configurationProperties.isAutoCreateTopics() && this.adminUtilsOperation == null) {
            this.logger.warn((Object)"Auto creation of topics is enabled, but Kafka AdminUtils class is not present on the classpath. No topic will be created by the binder");
        } else if (!this.configurationProperties.isAutoCreateTopics()) {
            this.logger.info((Object)"Auto creation of topics is disabled.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createTopicAndPartitions(final String topicName, int partitionCount) {
        block6: {
            try (final ZkUtils zkUtils = ZkUtils.apply((String)this.configurationProperties.getZkConnectionString(), (int)this.configurationProperties.getZkSessionTimeout(), (int)this.configurationProperties.getZkConnectionTimeout(), (boolean)JaasUtils.isZkSecurityEnabled());){
                short errorCode = this.adminUtilsOperation.errorCodeFromTopicMetadata(topicName, zkUtils);
                if (errorCode == ErrorMapping.NoError()) {
                    int effectivePartitionCount = this.configurationProperties.isAutoAddPartitions() ? Math.max(this.configurationProperties.getMinPartitionCount(), partitionCount) : partitionCount;
                    int partitionSize = this.adminUtilsOperation.partitionSize(topicName, zkUtils);
                    if (partitionSize >= effectivePartitionCount) break block6;
                    if (this.configurationProperties.isAutoAddPartitions()) {
                        this.adminUtilsOperation.invokeAddPartitions(zkUtils, topicName, effectivePartitionCount, null, false);
                        break block6;
                    }
                    throw new BinderException("The number of expected partitions was: " + partitionCount + ", but " + partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead." + "Consider either increasing the partition count of the topic or enabling " + "`autoAddPartitions`");
                }
                if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode()) {
                    final int effectivePartitionCount = Math.max(this.configurationProperties.getMinPartitionCount(), partitionCount);
                    this.metadataRetryOperations.execute((RetryCallback)new RetryCallback<Object, RuntimeException>(){

                        public Object doWithRetry(RetryContext context) throws RuntimeException {
                            KafkaMessageChannelBinder.this.adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount, KafkaMessageChannelBinder.this.configurationProperties.getReplicationFactor(), new Properties());
                            return null;
                        }
                    });
                    break block6;
                }
                throw new BinderException("Error fetching Kafka topic metadata: ", ErrorMapping.exceptionFor((short)errorCode));
            }
        }
    }

    private Collection<PartitionInfo> getPartitionsForTopic(final String topicName, final int partitionCount) {
        try {
            return (Collection)this.metadataRetryOperations.execute((RetryCallback)new RetryCallback<Collection<PartitionInfo>, Exception>(){

                public Collection<PartitionInfo> doWithRetry(RetryContext context) throws Exception {
                    List partitions = KafkaMessageChannelBinder.this.getProducerFactory((ExtendedProducerProperties<KafkaProducerProperties>)new ExtendedProducerProperties((Object)new KafkaProducerProperties())).createProducer().partitionsFor(topicName);
                    if (partitions.size() < partitionCount) {
                        throw new IllegalStateException("The number of expected partitions was: " + partitionCount + ", but " + partitions.size() + (partitions.size() > 1 ? " have " : " has ") + "been found instead");
                    }
                    return partitions;
                }
            });
        }
        catch (Exception e) {
            this.logger.error((Object)"Cannot initialize Binder", (Throwable)e);
            throw new BinderException("Cannot initialize binder:", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void initDlqProducer() {
        block6: {
            try {
                if (this.dlqProducer != null) break block6;
                KafkaMessageChannelBinder kafkaMessageChannelBinder = this;
                synchronized (kafkaMessageChannelBinder) {
                    if (this.dlqProducer == null) {
                        HashMap<String, Object> props = new HashMap<String, Object>();
                        props.put("bootstrap.servers", this.configurationProperties.getKafkaConnectionString());
                        props.put("retries", 0);
                        props.put("batch.size", 16384);
                        props.put("linger.ms", 1);
                        props.put("buffer.memory", 0x2000000);
                        props.put("key.serializer", ByteArraySerializer.class);
                        props.put("value.serializer", ByteArraySerializer.class);
                        DefaultKafkaProducerFactory defaultKafkaProducerFactory = new DefaultKafkaProducerFactory(props);
                        this.dlqProducer = defaultKafkaProducerFactory.createProducer();
                    }
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Cannot initialize DLQ producer:", e);
            }
        }
    }

    private String toDisplayString(String original, int maxCharacters) {
        if (original.length() <= maxCharacters) {
            return original;
        }
        return original.substring(0, maxCharacters) + "...";
    }

    private final class ProducerConfigurationMessageHandler
    extends KafkaProducerMessageHandler<byte[], byte[]>
    implements Lifecycle {
        private boolean running;
        private final DefaultKafkaProducerFactory<byte[], byte[]> producerFactory;

        private ProducerConfigurationMessageHandler(KafkaTemplate<byte[], byte[]> kafkaTemplate, String topic, ExtendedProducerProperties<KafkaProducerProperties> producerProperties, DefaultKafkaProducerFactory<byte[], byte[]> producerFactory) {
            super(kafkaTemplate);
            this.running = true;
            this.setTopicExpression((Expression)new LiteralExpression(topic));
            this.setBeanFactory((BeanFactory)KafkaMessageChannelBinder.this.getBeanFactory());
            if (producerProperties.isPartitioned()) {
                SpelExpressionParser parser = new SpelExpressionParser();
                this.setPartitionIdExpression(parser.parseExpression("headers.partition"));
            }
            if (((KafkaProducerProperties)producerProperties.getExtension()).isSync()) {
                this.setSync(true);
            }
            this.producerFactory = producerFactory;
        }

        public void start() {
            try {
                super.onInit();
            }
            catch (Exception e) {
                this.logger.error((Object)"Initialization errors: ", (Throwable)e);
                throw new RuntimeException(e);
            }
        }

        public void stop() {
            this.producerFactory.stop();
            this.running = false;
        }

        public boolean isRunning() {
            return this.running;
        }
    }
}

