/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka.streams;

import java.util.Arrays;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.boot.context.properties.bind.BindContext;
import org.springframework.boot.context.properties.bind.BindHandler;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.boot.context.properties.bind.PlaceholdersResolver;
import org.springframework.boot.context.properties.bind.PropertySourcesPlaceholdersResolver;
import org.springframework.boot.context.properties.source.ConfigurationPropertyName;
import org.springframework.boot.context.properties.source.ConfigurationPropertySources;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.DeserializationExceptionHandler;
import org.springframework.cloud.stream.binder.kafka.streams.GlobalKTableBoundElementFactory;
import org.springframework.cloud.stream.binder.kafka.streams.KTableBoundElementFactory;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBindingInformationCatalogue;
import org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolver;
import org.springframework.cloud.stream.binder.kafka.streams.SendToDlqAndContinue;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.ResolvableType;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.Environment;
import org.springframework.core.env.MutablePropertySources;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.streams.RecoveringDeserializationExceptionHandler;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

public abstract class AbstractKafkaStreamsBinderProcessor
implements ApplicationContextAware {
    private static final Log LOG = LogFactory.getLog(AbstractKafkaStreamsBinderProcessor.class);
    private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
    private final BindingServiceProperties bindingServiceProperties;
    private final KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties;
    private final CleanupConfig cleanupConfig;
    private final KeyValueSerdeResolver keyValueSerdeResolver;
    protected ConfigurableApplicationContext applicationContext;
    private Object concurrencyAtTheGlobal;

    public AbstractKafkaStreamsBinderProcessor(BindingServiceProperties bindingServiceProperties, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties, KeyValueSerdeResolver keyValueSerdeResolver, CleanupConfig cleanupConfig) {
        this.bindingServiceProperties = bindingServiceProperties;
        this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
        this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
        this.keyValueSerdeResolver = keyValueSerdeResolver;
        this.cleanupConfig = cleanupConfig;
    }

    public final void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = (ConfigurableApplicationContext)applicationContext;
    }

    protected Topology.AutoOffsetReset getAutoOffsetReset(String inboundName, KafkaStreamsConsumerProperties extendedConsumerProperties) {
        KafkaConsumerProperties.StartOffset startOffset = extendedConsumerProperties.getStartOffset();
        Topology.AutoOffsetReset autoOffsetReset = null;
        if (startOffset != null) {
            switch (startOffset) {
                case earliest: {
                    autoOffsetReset = Topology.AutoOffsetReset.EARLIEST;
                    break;
                }
                case latest: {
                    autoOffsetReset = Topology.AutoOffsetReset.LATEST;
                    break;
                }
            }
        }
        if (extendedConsumerProperties.isResetOffsets()) {
            LOG.warn((Object)("Detected resetOffsets configured on binding " + inboundName + ". Setting resetOffsets in Kafka Streams binder does not have any effect."));
        }
        return autoOffsetReset;
    }

    protected void handleKTableGlobalKTableInputs(Object[] arguments, int index, String input, Class<?> parameterType, Object targetBean, StreamsBuilderFactoryBean streamsBuilderFactoryBean, StreamsBuilder streamsBuilder, KafkaStreamsConsumerProperties extendedConsumerProperties, Serde<?> keySerde, Serde<?> valueSerde, Topology.AutoOffsetReset autoOffsetReset, boolean firstBuild) {
        if (firstBuild) {
            this.addStateStoreBeans(streamsBuilder);
        }
        if (parameterType.isAssignableFrom(KTable.class)) {
            String materializedAs = extendedConsumerProperties.getMaterializedAs();
            String bindingDestination = this.bindingServiceProperties.getBindingDestination(input);
            KTable<?, ?> table = this.getKTable(extendedConsumerProperties, streamsBuilder, keySerde, valueSerde, materializedAs, bindingDestination, autoOffsetReset);
            KTableBoundElementFactory.KTableWrapper kTableWrapper = (KTableBoundElementFactory.KTableWrapper)targetBean;
            kTableWrapper.wrap(table);
            this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactory(streamsBuilderFactoryBean);
            arguments[index] = table;
        } else if (parameterType.isAssignableFrom(GlobalKTable.class)) {
            String materializedAs = extendedConsumerProperties.getMaterializedAs();
            String bindingDestination = this.bindingServiceProperties.getBindingDestination(input);
            GlobalKTable<?, ?> table = this.getGlobalKTable(extendedConsumerProperties, streamsBuilder, keySerde, valueSerde, materializedAs, bindingDestination, autoOffsetReset);
            GlobalKTableBoundElementFactory.GlobalKTableWrapper globalKTableWrapper = (GlobalKTableBoundElementFactory.GlobalKTableWrapper)targetBean;
            globalKTableWrapper.wrap(table);
            this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactory(streamsBuilderFactoryBean);
            arguments[index] = table;
        }
    }

    protected StreamsBuilderFactoryBean buildStreamsBuilderAndRetrieveConfig(String beanNamePostPrefix, ApplicationContext applicationContext, String inboundName, KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties, StreamsBuilderFactoryBeanCustomizer customizer, ConfigurableEnvironment environment, BindingProperties bindingProperties) {
        KafkaStreamsBinderConfigurationProperties.Functions functionConfig;
        Map<String, KafkaStreamsBinderConfigurationProperties.Functions> functionConfigMap;
        ConfigurableListableBeanFactory beanFactory = this.applicationContext.getBeanFactory();
        Map streamConfigGlobalProperties = (Map)applicationContext.getBean("streamConfigGlobalProperties", Map.class);
        if (this.concurrencyAtTheGlobal == null) {
            this.concurrencyAtTheGlobal = streamConfigGlobalProperties.get("num.stream.threads");
        }
        if (kafkaStreamsBinderConfigurationProperties != null && !CollectionUtils.isEmpty(functionConfigMap = kafkaStreamsBinderConfigurationProperties.getFunctions()) && (functionConfig = functionConfigMap.get(beanNamePostPrefix)) != null) {
            String applicationId;
            Map<String, String> functionSpecificConfig = functionConfig.getConfiguration();
            if (!CollectionUtils.isEmpty(functionSpecificConfig)) {
                streamConfigGlobalProperties.putAll(functionSpecificConfig);
            }
            if (!StringUtils.isEmpty((Object)(applicationId = functionConfig.getApplicationId()))) {
                streamConfigGlobalProperties.put("application.id", applicationId);
            }
        }
        MutablePropertySources propertySources = environment.getPropertySources();
        if (!StringUtils.isEmpty((Object)bindingProperties.getBinder())) {
            KafkaStreamsBinderConfigurationProperties multiBinderKafkaStreamsBinderConfigurationProperties = (KafkaStreamsBinderConfigurationProperties)((Object)applicationContext.getBean(bindingProperties.getBinder() + "-KafkaStreamsBinderConfigurationProperties", KafkaStreamsBinderConfigurationProperties.class));
            String connectionString = multiBinderKafkaStreamsBinderConfigurationProperties.getKafkaConnectionString();
            if (StringUtils.isEmpty((Object)connectionString)) {
                connectionString = (String)propertySources.get(bindingProperties.getBinder() + "-kafkaStreamsBinderEnv").getProperty("spring.cloud.stream.kafka.binder.brokers");
            }
            streamConfigGlobalProperties.put("bootstrap.servers", connectionString);
            String binderProvidedApplicationId = multiBinderKafkaStreamsBinderConfigurationProperties.getApplicationId();
            if (StringUtils.hasText((String)binderProvidedApplicationId)) {
                streamConfigGlobalProperties.put("application.id", binderProvidedApplicationId);
            }
            if (multiBinderKafkaStreamsBinderConfigurationProperties.getDeserializationExceptionHandler() == DeserializationExceptionHandler.logAndContinue) {
                streamConfigGlobalProperties.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class);
            } else if (multiBinderKafkaStreamsBinderConfigurationProperties.getDeserializationExceptionHandler() == DeserializationExceptionHandler.logAndFail) {
                streamConfigGlobalProperties.put("default.deserialization.exception.handler", LogAndFailExceptionHandler.class);
            } else if (multiBinderKafkaStreamsBinderConfigurationProperties.getDeserializationExceptionHandler() == DeserializationExceptionHandler.sendToDlq) {
                streamConfigGlobalProperties.put("default.deserialization.exception.handler", RecoveringDeserializationExceptionHandler.class);
                SendToDlqAndContinue sendToDlqAndContinue = (SendToDlqAndContinue)applicationContext.getBean(SendToDlqAndContinue.class);
                streamConfigGlobalProperties.put("spring.deserialization.recoverer", sendToDlqAndContinue);
            }
            if (!ObjectUtils.isEmpty((Object)multiBinderKafkaStreamsBinderConfigurationProperties.getConfiguration())) {
                streamConfigGlobalProperties.putAll(multiBinderKafkaStreamsBinderConfigurationProperties.getConfiguration());
            }
        }
        KafkaStreamsConsumerProperties extendedConsumerProperties = (KafkaStreamsConsumerProperties)((Object)this.kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(inboundName));
        streamConfigGlobalProperties.putAll(extendedConsumerProperties.getConfiguration());
        String bindingLevelApplicationId = extendedConsumerProperties.getApplicationId();
        if (StringUtils.hasText((String)bindingLevelApplicationId)) {
            streamConfigGlobalProperties.put("application.id", bindingLevelApplicationId);
        }
        streamConfigGlobalProperties.computeIfAbsent("application.id", k -> {
            String generatedApplicationID = beanNamePostPrefix + "-applicationId";
            LOG.info((Object)("Binder Generated Kafka Streams Application ID: " + generatedApplicationID));
            LOG.info((Object)"Use the binder generated application ID only for development and testing. ");
            LOG.info((Object)"For production deployments, please consider explicitly setting an application ID using a configuration property.");
            LOG.info((Object)"The generated applicationID is static and will be preserved over application restarts.");
            return generatedApplicationID;
        });
        this.handleConcurrency(applicationContext, inboundName, streamConfigGlobalProperties);
        DeserializationExceptionHandler deserializationExceptionHandler = extendedConsumerProperties.getDeserializationExceptionHandler();
        if (deserializationExceptionHandler == DeserializationExceptionHandler.logAndFail) {
            streamConfigGlobalProperties.put("default.deserialization.exception.handler", LogAndFailExceptionHandler.class);
        } else if (deserializationExceptionHandler == DeserializationExceptionHandler.logAndContinue) {
            streamConfigGlobalProperties.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class);
        } else if (deserializationExceptionHandler == DeserializationExceptionHandler.sendToDlq) {
            streamConfigGlobalProperties.put("default.deserialization.exception.handler", RecoveringDeserializationExceptionHandler.class);
            streamConfigGlobalProperties.put("spring.deserialization.recoverer", applicationContext.getBean(SendToDlqAndContinue.class));
        }
        KafkaStreamsConfiguration kafkaStreamsConfiguration = new KafkaStreamsConfiguration(streamConfigGlobalProperties);
        StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.cleanupConfig == null ? new StreamsBuilderFactoryBean(kafkaStreamsConfiguration) : new StreamsBuilderFactoryBean(kafkaStreamsConfiguration, this.cleanupConfig);
        streamsBuilderFactoryBean.setAutoStartup(false);
        AbstractBeanDefinition streamsBuilderBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(streamsBuilderFactoryBean.getClass(), () -> streamsBuilderFactoryBean).getRawBeanDefinition();
        ((BeanDefinitionRegistry)beanFactory).registerBeanDefinition("stream-builder-" + beanNamePostPrefix, (BeanDefinition)streamsBuilderBeanDefinition);
        extendedConsumerProperties.setApplicationId((String)streamConfigGlobalProperties.get("application.id"));
        streamConfigGlobalProperties.remove("application.id");
        if (this.concurrencyAtTheGlobal != null) {
            streamConfigGlobalProperties.put("num.stream.threads", this.concurrencyAtTheGlobal);
        } else {
            streamConfigGlobalProperties.remove("num.stream.threads");
        }
        StreamsBuilderFactoryBean streamsBuilderFactoryBeanFromContext = (StreamsBuilderFactoryBean)applicationContext.getBean("&stream-builder-" + beanNamePostPrefix, StreamsBuilderFactoryBean.class);
        if (customizer != null) {
            customizer.configure(streamsBuilderFactoryBean);
        }
        return streamsBuilderFactoryBeanFromContext;
    }

    private void handleConcurrency(ApplicationContext applicationContext, final String inboundName, Map<String, Object> streamConfigGlobalProperties) {
        Object concurrencyAtTheGlobal = streamConfigGlobalProperties.get("num.stream.threads");
        Integer concAtTheGlobal = null;
        if (concurrencyAtTheGlobal instanceof String) {
            concAtTheGlobal = Integer.valueOf((String)concurrencyAtTheGlobal);
        } else if (concurrencyAtTheGlobal instanceof Integer) {
            concAtTheGlobal = (Integer)concurrencyAtTheGlobal;
        }
        Binder explicitConcurrencyResolver = new Binder(ConfigurationPropertySources.get((Environment)applicationContext.getEnvironment()), (PlaceholdersResolver)new PropertySourcesPlaceholdersResolver(applicationContext.getEnvironment()), IntegrationUtils.getConversionService((BeanFactory)this.applicationContext.getBeanFactory()), null);
        final boolean[] concurrencyExplicitlyProvided = new boolean[]{false};
        BindHandler handler = new BindHandler(){

            public Object onSuccess(ConfigurationPropertyName name, Bindable<?> target, BindContext context, Object result) {
                if (!concurrencyExplicitlyProvided[0]) {
                    concurrencyExplicitlyProvided[0] = name.getLastElement(ConfigurationPropertyName.Form.UNIFORM).equals("concurrency") && ConfigurationPropertyName.of((CharSequence)("spring.cloud.stream.bindings." + inboundName + ".consumer")).isAncestorOf(name);
                }
                return result;
            }
        };
        try {
            explicitConcurrencyResolver.bind("spring.cloud.stream", Bindable.ofInstance((Object)new BindingServiceProperties()), handler);
        }
        catch (Exception exception) {
            // empty catch block
        }
        int concurrency = this.bindingServiceProperties.getConsumerProperties(inboundName).getConcurrency();
        if (concurrency >= 1 && concurrencyExplicitlyProvided[0]) {
            streamConfigGlobalProperties.put("num.stream.threads", concurrency);
        } else if (concurrencyAtTheGlobal != null) {
            streamConfigGlobalProperties.put("num.stream.threads", concAtTheGlobal);
        }
    }

    protected Serde<?> getValueSerde(String inboundName, KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, ResolvableType resolvableType) {
        if (this.bindingServiceProperties.getConsumerProperties(inboundName).isUseNativeDecoding()) {
            BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(inboundName);
            return this.keyValueSerdeResolver.getInboundValueSerde(bindingProperties.getConsumer(), kafkaStreamsConsumerProperties, resolvableType);
        }
        return Serdes.ByteArray();
    }

    protected KStream<?, ?> getKStream(String inboundName, BindingProperties bindingProperties, KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, StreamsBuilder streamsBuilder, Serde<?> keySerde, Serde<?> valueSerde, Topology.AutoOffsetReset autoOffsetReset, boolean firstBuild) {
        KStream stream;
        if (firstBuild) {
            this.addStateStoreBeans(streamsBuilder);
        }
        if (((KafkaStreamsConsumerProperties)((Object)this.kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(inboundName))).isDestinationIsPattern()) {
            Pattern pattern = Pattern.compile(this.bindingServiceProperties.getBindingDestination(inboundName));
            stream = streamsBuilder.stream(pattern);
        } else {
            String[] bindingTargets = StringUtils.commaDelimitedListToStringArray((String)this.bindingServiceProperties.getBindingDestination(inboundName));
            Consumed<?, ?> consumed = this.getConsumed(kafkaStreamsConsumerProperties, keySerde, valueSerde, autoOffsetReset);
            stream = streamsBuilder.stream(Arrays.asList(bindingTargets), consumed);
        }
        boolean nativeDecoding = this.bindingServiceProperties.getConsumerProperties(inboundName).isUseNativeDecoding();
        if (nativeDecoding) {
            LOG.info((Object)("Native decoding is enabled for " + inboundName + ". Inbound deserialization done at the broker."));
        } else {
            LOG.info((Object)("Native decoding is disabled for " + inboundName + ". Inbound message conversion done by Spring Cloud Stream."));
        }
        return this.getkStream(bindingProperties, stream, nativeDecoding);
    }

    private KStream<?, ?> getkStream(BindingProperties bindingProperties, KStream<?, ?> stream, boolean nativeDecoding) {
        if (!nativeDecoding) {
            stream = stream.mapValues(value -> {
                String contentType = bindingProperties.getContentType();
                Object returnValue = value != null && !StringUtils.isEmpty((Object)contentType) ? MessageBuilder.withPayload((Object)value).setHeader("contentType", (Object)contentType).build() : value;
                return returnValue;
            });
        }
        return stream;
    }

    private void addStateStoreBeans(StreamsBuilder streamsBuilder) {
        try {
            Map storeBuilders = this.applicationContext.getBeansOfType(StoreBuilder.class);
            if (!CollectionUtils.isEmpty((Map)storeBuilders)) {
                storeBuilders.values().forEach(storeBuilder -> {
                    streamsBuilder.addStateStore(storeBuilder);
                    if (LOG.isInfoEnabled()) {
                        LOG.info((Object)("state store " + storeBuilder.name() + " added to topology"));
                    }
                });
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private <K, V> KTable<K, V> materializedAs(StreamsBuilder streamsBuilder, String destination, String storeName, Serde<K> k, Serde<V> v, Topology.AutoOffsetReset autoOffsetReset, KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties) {
        Consumed<K, V> consumed = this.getConsumed(kafkaStreamsConsumerProperties, k, v, autoOffsetReset);
        return streamsBuilder.table(this.bindingServiceProperties.getBindingDestination(destination), consumed, this.getMaterialized(storeName, k, v));
    }

    private <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> getMaterialized(String storeName, Serde<K> k, Serde<V> v) {
        return Materialized.as((String)storeName).withKeySerde(k).withValueSerde(v);
    }

    private <K, V> GlobalKTable<K, V> materializedAsGlobalKTable(StreamsBuilder streamsBuilder, String destination, String storeName, Serde<K> k, Serde<V> v, Topology.AutoOffsetReset autoOffsetReset, KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties) {
        Consumed<K, V> consumed = this.getConsumed(kafkaStreamsConsumerProperties, k, v, autoOffsetReset);
        return streamsBuilder.globalTable(this.bindingServiceProperties.getBindingDestination(destination), consumed, this.getMaterialized(storeName, k, v));
    }

    private GlobalKTable<?, ?> getGlobalKTable(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, StreamsBuilder streamsBuilder, Serde<?> keySerde, Serde<?> valueSerde, String materializedAs, String bindingDestination, Topology.AutoOffsetReset autoOffsetReset) {
        Consumed<?, ?> consumed = this.getConsumed(kafkaStreamsConsumerProperties, keySerde, valueSerde, autoOffsetReset);
        return materializedAs != null ? this.materializedAsGlobalKTable(streamsBuilder, bindingDestination, materializedAs, keySerde, valueSerde, autoOffsetReset, kafkaStreamsConsumerProperties) : streamsBuilder.globalTable(bindingDestination, consumed);
    }

    private KTable<?, ?> getKTable(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, StreamsBuilder streamsBuilder, Serde<?> keySerde, Serde<?> valueSerde, String materializedAs, String bindingDestination, Topology.AutoOffsetReset autoOffsetReset) {
        Consumed<?, ?> consumed = this.getConsumed(kafkaStreamsConsumerProperties, keySerde, valueSerde, autoOffsetReset);
        return materializedAs != null ? this.materializedAs(streamsBuilder, bindingDestination, materializedAs, keySerde, valueSerde, autoOffsetReset, kafkaStreamsConsumerProperties) : streamsBuilder.table(bindingDestination, consumed);
    }

    private <K, V> Consumed<K, V> getConsumed(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, Serde<K> keySerde, Serde<V> valueSerde, Topology.AutoOffsetReset autoOffsetReset) {
        TimestampExtractor timestampExtractor = null;
        if (!StringUtils.isEmpty((Object)kafkaStreamsConsumerProperties.getTimestampExtractorBeanName())) {
            timestampExtractor = (TimestampExtractor)this.applicationContext.getBean(kafkaStreamsConsumerProperties.getTimestampExtractorBeanName(), TimestampExtractor.class);
        }
        Consumed consumed = Consumed.with(keySerde, valueSerde).withOffsetResetPolicy(autoOffsetReset);
        if (timestampExtractor != null) {
            consumed.withTimestampExtractor(timestampExtractor);
        }
        return consumed;
    }
}

