/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.kafka.processors.producer.convert;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import org.apache.nifi.kafka.processors.producer.common.ProducerUtils;
import org.apache.nifi.kafka.processors.producer.convert.KafkaRecordConverter;
import org.apache.nifi.kafka.processors.producer.header.HeadersFactory;
import org.apache.nifi.kafka.service.api.record.KafkaRecord;
import org.apache.nifi.stream.io.util.StreamDemarcator;

public class DelimitedStreamKafkaRecordConverter
implements KafkaRecordConverter {
    private final byte[] demarcatorBytes;
    private final int maxMessageSize;
    private final HeadersFactory headersFactory;

    public DelimitedStreamKafkaRecordConverter(byte[] demarcatorBytes, int maxMessageSize, HeadersFactory headersFactory) {
        this.demarcatorBytes = demarcatorBytes;
        this.maxMessageSize = maxMessageSize;
        this.headersFactory = headersFactory;
    }

    @Override
    public Iterator<KafkaRecord> convert(Map<String, String> attributes, InputStream in, long inputLength) throws IOException {
        ArrayList<KafkaRecord> kafkaRecords = new ArrayList<KafkaRecord>();
        try (StreamDemarcator demarcator = new StreamDemarcator(in, this.demarcatorBytes, this.maxMessageSize);){
            byte[] messageContent;
            while ((messageContent = demarcator.nextToken()) != null) {
                ProducerUtils.checkMessageSize(this.maxMessageSize, messageContent.length);
                kafkaRecords.add(new KafkaRecord(null, null, null, null, messageContent, this.headersFactory.getHeaders(attributes)));
            }
        }
        return kafkaRecords.iterator();
    }
}

