/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.kafka.processors.producer.convert;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.kafka.processors.producer.common.ProducerUtils;
import org.apache.nifi.kafka.processors.producer.convert.KafkaRecordConverter;
import org.apache.nifi.kafka.processors.producer.wrapper.RecordFieldConverter;
import org.apache.nifi.kafka.processors.producer.wrapper.RecordMetadataStrategy;
import org.apache.nifi.kafka.service.api.header.RecordHeader;
import org.apache.nifi.kafka.service.api.record.KafkaRecord;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSet;

public class RecordWrapperStreamKafkaRecordConverter
implements KafkaRecordConverter {
    private final FlowFile flowFile;
    private final RecordMetadataStrategy metadataStrategy;
    private final RecordReaderFactory readerFactory;
    private final RecordSetWriterFactory writerFactory;
    private final RecordSetWriterFactory keyWriterFactory;
    private final int maxMessageSize;
    private final ComponentLog logger;

    public RecordWrapperStreamKafkaRecordConverter(FlowFile flowFile, RecordMetadataStrategy metadataStrategy, RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, RecordSetWriterFactory keyWriterFactory, int maxMessageSize, ComponentLog logger) {
        this.flowFile = flowFile;
        this.metadataStrategy = metadataStrategy;
        this.readerFactory = readerFactory;
        this.writerFactory = writerFactory;
        this.keyWriterFactory = keyWriterFactory;
        this.maxMessageSize = maxMessageSize;
        this.logger = logger;
    }

    @Override
    public Iterator<KafkaRecord> convert(Map<String, String> attributes, InputStream in, long inputLength) throws IOException {
        try {
            RecordReader reader = this.readerFactory.createRecordReader(attributes, in, inputLength, this.logger);
            RecordSet recordSet = reader.createRecordSet();
            PushBackRecordSet pushBackRecordSet = new PushBackRecordSet(recordSet);
            return this.toKafkaRecordIterator(pushBackRecordSet);
        }
        catch (SchemaNotFoundException | MalformedRecordException e) {
            throw new IOException("Stream to Record conversion failed", e);
        }
    }

    private Iterator<KafkaRecord> toKafkaRecordIterator(final PushBackRecordSet pushBackRecordSet) {
        return new Iterator<KafkaRecord>(){

            @Override
            public boolean hasNext() {
                try {
                    return pushBackRecordSet.isAnotherRecord();
                }
                catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }

            @Override
            public KafkaRecord next() {
                try {
                    Record record = pushBackRecordSet.next();
                    RecordFieldConverter converter = new RecordFieldConverter(record, RecordWrapperStreamKafkaRecordConverter.this.flowFile, RecordWrapperStreamKafkaRecordConverter.this.logger);
                    byte[] key = converter.toBytes("key", RecordWrapperStreamKafkaRecordConverter.this.keyWriterFactory);
                    byte[] value = converter.toBytes("value", RecordWrapperStreamKafkaRecordConverter.this.writerFactory);
                    if (value != null) {
                        ProducerUtils.checkMessageSize(RecordWrapperStreamKafkaRecordConverter.this.maxMessageSize, value.length);
                    }
                    List<RecordHeader> headers = this.getKafkaHeaders(record);
                    String topic = null;
                    Integer partition = null;
                    if (RecordWrapperStreamKafkaRecordConverter.this.metadataStrategy == RecordMetadataStrategy.FROM_RECORD) {
                        MapRecord myMetadataRecord = (MapRecord)record.getValue("metadata");
                        topic = myMetadataRecord.getAsString("topic");
                        partition = myMetadataRecord.getAsInt("partition");
                    }
                    return new KafkaRecord(topic, partition, null, key, value, headers);
                }
                catch (IOException e) {
                    throw new UncheckedIOException("Record conversion failed", e);
                }
            }

            private List<RecordHeader> getKafkaHeaders(Record record) {
                ArrayList<RecordHeader> headers = new ArrayList<RecordHeader>();
                MapRecord headersRecord = (MapRecord)record.getValue("headers");
                if (headersRecord != null) {
                    headersRecord.toMap().forEach((key, value) -> headers.add(new RecordHeader(key, value.toString().getBytes(StandardCharsets.UTF_8))));
                }
                return headers;
            }
        };
    }
}

