/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka.streams;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBindingInformationCatalogue;
import org.springframework.cloud.stream.binder.kafka.streams.SendToDlqAndContinue;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.StringUtils;

class KafkaStreamsMessageConversionDelegate {
    private static final ThreadLocal<KeyValue<Object, Object>> keyValueThreadLocal = new ThreadLocal();
    private final CompositeMessageConverterFactory compositeMessageConverterFactory;
    private final SendToDlqAndContinue sendToDlqAndContinue;
    private final KafkaStreamsBindingInformationCatalogue kstreamBindingInformationCatalogue;
    private final KafkaStreamsBinderConfigurationProperties kstreamBinderConfigurationProperties;

    KafkaStreamsMessageConversionDelegate(CompositeMessageConverterFactory compositeMessageConverterFactory, SendToDlqAndContinue sendToDlqAndContinue, KafkaStreamsBindingInformationCatalogue kstreamBindingInformationCatalogue, KafkaStreamsBinderConfigurationProperties kstreamBinderConfigurationProperties) {
        this.compositeMessageConverterFactory = compositeMessageConverterFactory;
        this.sendToDlqAndContinue = sendToDlqAndContinue;
        this.kstreamBindingInformationCatalogue = kstreamBindingInformationCatalogue;
        this.kstreamBinderConfigurationProperties = kstreamBinderConfigurationProperties;
    }

    public KStream serializeOnOutbound(KStream<?, ?> outboundBindTarget) {
        String contentType = this.kstreamBindingInformationCatalogue.getContentType(outboundBindTarget);
        CompositeMessageConverter messageConverter = this.compositeMessageConverterFactory.getMessageConverterForAllRegistered();
        return outboundBindTarget.map((arg_0, arg_1) -> KafkaStreamsMessageConversionDelegate.lambda$serializeOnOutbound$0(contentType, (MessageConverter)messageConverter, arg_0, arg_1));
    }

    public KStream deserializeOnInbound(Class<?> valueClass, KStream<?, ?> bindingTarget) {
        CompositeMessageConverter messageConverter = this.compositeMessageConverterFactory.getMessageConverterForAllRegistered();
        KStream[] branch = bindingTarget.branch(new Predicate[]{(arg_0, arg_1) -> this.lambda$deserializeOnInbound$1(valueClass, (MessageConverter)messageConverter, arg_0, arg_1), (k, v) -> true});
        this.processErrorFromDeserialization(bindingTarget, branch[1]);
        return branch[0].map((o, o2) -> {
            KeyValue<Object, Object> objectObjectKeyValue = keyValueThreadLocal.get();
            keyValueThreadLocal.remove();
            return objectObjectKeyValue;
        });
    }

    private void convertAndSetMessage(Object o, Class<?> valueClass, MessageConverter messageConverter, Message<?> msg) {
        Object messageConverted = messageConverter.fromMessage(msg, valueClass);
        if (messageConverted == null) {
            throw new IllegalStateException("Inbound data conversion failed.");
        }
        keyValueThreadLocal.set((KeyValue<Object, Object>)new KeyValue(o, messageConverted));
    }

    private void processErrorFromDeserialization(final KStream<?, ?> bindingTarget, KStream<?, ?> branch) {
        branch.process(() -> new Processor(){
            ProcessorContext context;

            public void init(ProcessorContext context) {
                this.context = context;
            }

            public void process(Object o, Object o2) {
                if (KafkaStreamsMessageConversionDelegate.this.kstreamBindingInformationCatalogue.isDlqEnabled(bindingTarget)) {
                    String destination = KafkaStreamsMessageConversionDelegate.this.kstreamBindingInformationCatalogue.getDestination(bindingTarget);
                    if (o2 instanceof Message) {
                        Message message = (Message)o2;
                        KafkaStreamsMessageConversionDelegate.this.sendToDlqAndContinue.sendToDlq(destination, (byte[])o, (byte[])message.getPayload(), this.context.partition());
                    } else {
                        KafkaStreamsMessageConversionDelegate.this.sendToDlqAndContinue.sendToDlq(destination, (byte[])o, (byte[])o2, this.context.partition());
                    }
                } else {
                    if (KafkaStreamsMessageConversionDelegate.this.kstreamBinderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.logAndFail) {
                        throw new IllegalStateException("Inbound deserialization failed.");
                    }
                    if (KafkaStreamsMessageConversionDelegate.this.kstreamBinderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.logAndContinue) {
                        // empty if block
                    }
                }
            }

            public void punctuate(long timestamp) {
            }

            public void close() {
            }
        }, new String[0]);
    }

    private /* synthetic */ boolean lambda$deserializeOnInbound$1(Class valueClass, MessageConverter messageConverter, Object o, Object o2) {
        boolean isValidRecord = false;
        try {
            if (valueClass.isAssignableFrom(o2.getClass())) {
                keyValueThreadLocal.set((KeyValue<Object, Object>)new KeyValue(o, o2));
            } else if (o2 instanceof Message) {
                if (valueClass.isAssignableFrom(((Message)o2).getPayload().getClass())) {
                    keyValueThreadLocal.set((KeyValue<Object, Object>)new KeyValue(o, ((Message)o2).getPayload()));
                } else {
                    this.convertAndSetMessage(o, valueClass, messageConverter, (Message)o2);
                }
            } else if (o2 instanceof String || o2 instanceof byte[]) {
                Message message = MessageBuilder.withPayload((Object)o2).build();
                this.convertAndSetMessage(o, valueClass, messageConverter, message);
            } else {
                keyValueThreadLocal.set((KeyValue<Object, Object>)new KeyValue(o, o2));
            }
            isValidRecord = true;
        }
        catch (Exception exception) {
            // empty catch block
        }
        return isValidRecord;
    }

    private static /* synthetic */ KeyValue lambda$serializeOnOutbound$0(String contentType, MessageConverter messageConverter, Object k, Object v) {
        Message message = v instanceof Message ? (Message)v : MessageBuilder.withPayload((Object)v).build();
        HashMap<String, String> headers = new HashMap<String, String>((Map<String, String>)message.getHeaders());
        if (!StringUtils.isEmpty((Object)contentType)) {
            headers.put("contentType", contentType);
        }
        MessageHeaders messageHeaders = new MessageHeaders(headers);
        return new KeyValue(k, messageConverter.toMessage(message.getPayload(), messageHeaders).getPayload());
    }
}

