/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka.streams;

import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.springframework.aop.framework.Advised;
import org.springframework.cloud.stream.binder.AbstractBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.DefaultBinding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderUtils;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBindingInformationCatalogue;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
import org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolver;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsProducerProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.Lifecycle;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.StringUtils;

class KStreamBinder
extends AbstractBinder<KStream<Object, Object>, ExtendedConsumerProperties<KafkaStreamsConsumerProperties>, ExtendedProducerProperties<KafkaStreamsProducerProperties>>
implements ExtendedPropertiesBinder<KStream<Object, Object>, KafkaStreamsConsumerProperties, KafkaStreamsProducerProperties> {
    private static final Log LOG = LogFactory.getLog(KStreamBinder.class);
    private final KafkaTopicProvisioner kafkaTopicProvisioner;
    private KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties = new KafkaStreamsExtendedBindingProperties();
    private final KafkaStreamsBinderConfigurationProperties binderConfigurationProperties;
    private final KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate;
    private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
    private final KeyValueSerdeResolver keyValueSerdeResolver;
    private final KafkaStreamsRegistry kafkaStreamsRegistry;

    KStreamBinder(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties, KafkaTopicProvisioner kafkaTopicProvisioner, KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate, KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue2, KeyValueSerdeResolver keyValueSerdeResolver, KafkaStreamsRegistry kafkaStreamsRegistry) {
        this.binderConfigurationProperties = binderConfigurationProperties;
        this.kafkaTopicProvisioner = kafkaTopicProvisioner;
        this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate;
        this.kafkaStreamsBindingInformationCatalogue = KafkaStreamsBindingInformationCatalogue2;
        this.keyValueSerdeResolver = keyValueSerdeResolver;
        this.kafkaStreamsRegistry = kafkaStreamsRegistry;
    }

    protected Binding<KStream<Object, Object>> doBindConsumer(String name, String group, KStream<Object, Object> inputTarget, ExtendedConsumerProperties<KafkaStreamsConsumerProperties> properties) {
        KStream<Object, Object> delegate = ((KStreamBoundElementFactory.KStreamWrapperHandler)((Advised)inputTarget).getAdvisors()[0].getAdvice()).getDelegate();
        this.kafkaStreamsBindingInformationCatalogue.registerConsumerProperties(delegate, (KafkaStreamsConsumerProperties)((Object)properties.getExtension()));
        if (!StringUtils.hasText((String)group)) {
            group = ((KafkaStreamsConsumerProperties)((Object)properties.getExtension())).getApplicationId();
        }
        RetryTemplate retryTemplate = this.buildRetryTemplate((ConsumerProperties)properties);
        String bindingName = this.kafkaStreamsBindingInformationCatalogue.bindingNamePerTarget(inputTarget);
        final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsBindingInformationCatalogue.getStreamsBuilderFactoryBeanPerBinding().get(bindingName);
        KafkaStreamsBinderUtils.prepareConsumerBinding(name, group, (ApplicationContext)this.getApplicationContext(), this.kafkaTopicProvisioner, this.binderConfigurationProperties, properties, retryTemplate, this.getBeanFactory(), this.kafkaStreamsBindingInformationCatalogue.bindingNamePerTarget(inputTarget), this.kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
        return new DefaultBinding<KStream<Object, Object>>(bindingName, group, inputTarget, (Lifecycle)streamsBuilderFactoryBean){

            public boolean isInput() {
                return true;
            }

            public synchronized void start() {
                if (!streamsBuilderFactoryBean.isRunning()) {
                    super.start();
                    KStreamBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
                    String applicationId = (String)streamsBuilderFactoryBean.getStreamsConfiguration().get("application.id");
                    if (KStreamBinder.this.kafkaStreamsBindingInformationCatalogue.getStoppedKafkaStreams().containsKey(applicationId)) {
                        KStreamBinder.this.kafkaStreamsBindingInformationCatalogue.removePreviousKafkaStreamsForApplicationId(applicationId);
                    }
                }
            }

            public synchronized void stop() {
                if (streamsBuilderFactoryBean.isRunning()) {
                    KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
                    super.stop();
                    KStreamBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
                    KafkaStreamsBinderUtils.closeDlqProducerFactories(KStreamBinder.this.kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
                    KStreamBinder.this.kafkaStreamsBindingInformationCatalogue.addPreviousKafkaStreamsForApplicationId((String)streamsBuilderFactoryBean.getStreamsConfiguration().get("application.id"), kafkaStreams);
                }
            }
        };
    }

    protected Binding<KStream<Object, Object>> doBindProducer(String name, KStream<Object, Object> outboundBindTarget, ExtendedProducerProperties<KafkaStreamsProducerProperties> properties) {
        ExtendedProducerProperties<KafkaStreamsProducerProperties> extendedProducerProperties = properties;
        this.kafkaTopicProvisioner.provisionProducerDestination(name, extendedProducerProperties);
        Serde<?> keySerde = this.keyValueSerdeResolver.getOuboundKeySerde((KafkaStreamsProducerProperties)((Object)properties.getExtension()), this.kafkaStreamsBindingInformationCatalogue.getOutboundKStreamResolvable(outboundBindTarget));
        LOG.info((Object)("Key Serde used for (outbound) " + name + ": " + keySerde.getClass().getName()));
        Serde<?> valueSerde = properties.isUseNativeEncoding() ? this.keyValueSerdeResolver.getOutboundValueSerde((ProducerProperties)properties, (KafkaStreamsProducerProperties)((Object)properties.getExtension()), this.kafkaStreamsBindingInformationCatalogue.getOutboundKStreamResolvable(outboundBindTarget)) : Serdes.ByteArray();
        LOG.info((Object)("Value Serde used for (outbound) " + name + ": " + valueSerde.getClass().getName()));
        this.to(properties.isUseNativeEncoding(), name, outboundBindTarget, keySerde, valueSerde, (KafkaStreamsProducerProperties)((Object)properties.getExtension()));
        String bindingName = this.kafkaStreamsBindingInformationCatalogue.bindingNamePerTarget(outboundBindTarget);
        final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsBindingInformationCatalogue.getStreamsBuilderFactoryBeanPerBinding().get(bindingName);
        Properties streamsConfiguration = streamsBuilderFactoryBean.getStreamsConfiguration();
        String applicationId = streamsConfiguration != null ? (String)streamsConfiguration.get("application.id") : bindingName;
        return new DefaultBinding<KStream<Object, Object>>(bindingName, applicationId, outboundBindTarget, (Lifecycle)streamsBuilderFactoryBean){

            public boolean isInput() {
                return false;
            }

            public synchronized void start() {
                if (!streamsBuilderFactoryBean.isRunning()) {
                    super.start();
                    KStreamBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
                    String applicationId = (String)streamsBuilderFactoryBean.getStreamsConfiguration().get("application.id");
                    if (KStreamBinder.this.kafkaStreamsBindingInformationCatalogue.getStoppedKafkaStreams().containsKey(applicationId)) {
                        KStreamBinder.this.kafkaStreamsBindingInformationCatalogue.removePreviousKafkaStreamsForApplicationId(applicationId);
                    }
                }
            }

            public synchronized void stop() {
                if (streamsBuilderFactoryBean.isRunning()) {
                    KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
                    super.stop();
                    KStreamBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
                    KafkaStreamsBinderUtils.closeDlqProducerFactories(KStreamBinder.this.kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
                    KStreamBinder.this.kafkaStreamsBindingInformationCatalogue.addPreviousKafkaStreamsForApplicationId((String)streamsBuilderFactoryBean.getStreamsConfiguration().get("application.id"), kafkaStreams);
                }
            }
        };
    }

    private void to(boolean isNativeEncoding, String name, KStream<Object, Object> outboundBindTarget, Serde<Object> keySerde, Serde<Object> valueSerde, KafkaStreamsProducerProperties properties) {
        Produced produced = Produced.with(keySerde, valueSerde);
        if (StringUtils.hasText((String)properties.getProducedAs())) {
            produced.withName(properties.getProducedAs());
        }
        StreamPartitioner streamPartitioner = null;
        if (StringUtils.hasText((String)properties.getStreamPartitionerBeanName())) {
            streamPartitioner = (StreamPartitioner)this.getApplicationContext().getBean(properties.getStreamPartitionerBeanName(), StreamPartitioner.class);
        }
        if (streamPartitioner != null) {
            produced.withStreamPartitioner(streamPartitioner);
        }
        if (!isNativeEncoding) {
            LOG.info((Object)("Native encoding is disabled for " + name + ". Outbound message conversion done by Spring Cloud Stream."));
            outboundBindTarget.filter((k, v) -> v == null).to(name, produced);
            this.kafkaStreamsMessageConversionDelegate.serializeOnOutbound(outboundBindTarget).to(name, produced);
        } else {
            LOG.info((Object)("Native encoding is enabled for " + name + ". Outbound serialization done at the broker."));
            outboundBindTarget.to(name, produced);
        }
    }

    public KafkaStreamsConsumerProperties getExtendedConsumerProperties(String channelName) {
        return (KafkaStreamsConsumerProperties)((Object)this.kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(channelName));
    }

    public KafkaStreamsProducerProperties getExtendedProducerProperties(String channelName) {
        return (KafkaStreamsProducerProperties)((Object)this.kafkaStreamsExtendedBindingProperties.getExtendedProducerProperties(channelName));
    }

    public void setKafkaStreamsExtendedBindingProperties(KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties) {
        this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
    }

    public String getDefaultsPrefix() {
        return this.kafkaStreamsExtendedBindingProperties.getDefaultsPrefix();
    }

    public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
        return this.kafkaStreamsExtendedBindingProperties.getExtendedPropertiesEntryClass();
    }
}

