/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.streams.messaging;

import java.util.ArrayList;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.springframework.kafka.streams.messaging.MessagingFunction;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

public class MessagingProcessor<Kin, Vin, Kout, Vout>
extends ContextualProcessor<Kin, Vin, Kout, Vout> {
    private final MessagingFunction function;
    private final MessagingMessageConverter converter;

    public MessagingProcessor(MessagingFunction function, MessagingMessageConverter converter) {
        Assert.notNull((Object)function, (String)"'function' cannot be null");
        Assert.notNull((Object)converter, (String)"'converter' cannot be null");
        this.function = function;
        this.converter = converter;
    }

    public void process(Record<Kin, Vin> record) {
        ProcessorContext context = this.context();
        RecordMetadata meta = context.recordMetadata().orElse(null);
        Assert.state((meta != null ? 1 : 0) != 0, (String)"No record metadata present");
        Headers headers = record.headers();
        ConsumerRecord rebuilt = new ConsumerRecord(meta.topic(), meta.partition(), meta.offset(), record.timestamp(), TimestampType.NO_TIMESTAMP_TYPE, 0, 0, record.key(), record.value(), headers, Optional.empty());
        Message<?> message = this.converter.toMessage(rebuilt, null, null, null);
        message = this.function.exchange(message);
        ArrayList headerList = new ArrayList();
        headers.forEach(header -> headerList.add(header.key()));
        headerList.forEach(name -> headers.remove(name));
        ProducerRecord<?, ?> fromMessage = this.converter.fromMessage(message, "dummy");
        fromMessage.headers().forEach(header -> {
            if (!header.key().equals("kafka_topic")) {
                headers.add(header);
            }
        });
        context.forward(new Record(message.getHeaders().get((Object)"kafka_messageKey"), message.getPayload(), record.timestamp(), headers));
    }

    public void close() {
    }
}

