/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.kafka.processors.consumer.convert;

import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.kafka.processors.ConsumeKafka;
import org.apache.nifi.kafka.processors.common.KafkaUtils;
import org.apache.nifi.kafka.processors.consumer.OffsetTracker;
import org.apache.nifi.kafka.processors.consumer.convert.KafkaMessageConverter;
import org.apache.nifi.kafka.service.api.record.ByteRecord;
import org.apache.nifi.kafka.shared.property.KeyEncoding;
import org.apache.nifi.processor.ProcessSession;

public class FlowFileStreamKafkaMessageConverter
implements KafkaMessageConverter {
    private final Charset headerEncoding;
    private final Pattern headerNamePattern;
    private final KeyEncoding keyEncoding;
    private final boolean commitOffsets;
    private final OffsetTracker offsetTracker;
    private final String brokerUri;

    public FlowFileStreamKafkaMessageConverter(Charset headerEncoding, Pattern headerNamePattern, KeyEncoding keyEncoding, boolean commitOffsets, OffsetTracker offsetTracker, String brokerUri) {
        this.headerEncoding = headerEncoding;
        this.headerNamePattern = headerNamePattern;
        this.keyEncoding = keyEncoding;
        this.commitOffsets = commitOffsets;
        this.offsetTracker = offsetTracker;
        this.brokerUri = brokerUri;
    }

    @Override
    public void toFlowFiles(ProcessSession session, Iterator<ByteRecord> consumerRecords) {
        while (consumerRecords.hasNext()) {
            ByteRecord consumerRecord = consumerRecords.next();
            byte[] value = consumerRecord.getValue();
            FlowFile flowFile = session.create();
            if (consumerRecord.getValue().length > 0) {
                flowFile = session.write(flowFile, outputStream -> outputStream.write(value));
            } else {
                session.putAttribute(flowFile, "kafka.tombstone", Boolean.TRUE.toString());
            }
            Map<String, String> attributes = KafkaUtils.toAttributes(consumerRecord, this.keyEncoding, this.headerNamePattern, this.headerEncoding, this.commitOffsets);
            flowFile = session.putAllAttributes(flowFile, attributes);
            session.getProvenanceReporter().receive(flowFile, this.brokerUri + "/" + consumerRecord.getTopic());
            session.adjustCounter("Records Received from " + consumerRecord.getTopic(), consumerRecord.getBundledCount(), false);
            session.transfer(flowFile, ConsumeKafka.SUCCESS);
            this.offsetTracker.update(consumerRecord);
        }
    }
}

