/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.kafka.processors.consumer.convert;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.kafka.processors.ConsumeKafka;
import org.apache.nifi.kafka.processors.common.KafkaUtils;
import org.apache.nifi.kafka.processors.consumer.OffsetTracker;
import org.apache.nifi.kafka.processors.consumer.convert.KafkaMessageConverter;
import org.apache.nifi.kafka.service.api.record.ByteRecord;
import org.apache.nifi.kafka.shared.property.KeyEncoding;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;

public abstract class AbstractRecordStreamKafkaMessageConverter
implements KafkaMessageConverter {
    private static final RecordSchema EMPTY_SCHEMA = new SimpleRecordSchema(List.of());
    protected final RecordReaderFactory readerFactory;
    protected final RecordSetWriterFactory writerFactory;
    protected final Charset headerEncoding;
    protected final Pattern headerNamePattern;
    protected final KeyEncoding keyEncoding;
    protected final boolean commitOffsets;
    protected final OffsetTracker offsetTracker;
    protected final ComponentLog logger;
    protected final String brokerUri;

    public AbstractRecordStreamKafkaMessageConverter(RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, Charset headerEncoding, Pattern headerNamePattern, KeyEncoding keyEncoding, boolean commitOffsets, OffsetTracker offsetTracker, ComponentLog logger, String brokerUri) {
        this.readerFactory = readerFactory;
        this.writerFactory = writerFactory;
        this.headerEncoding = headerEncoding;
        this.headerNamePattern = headerNamePattern;
        this.keyEncoding = keyEncoding;
        this.commitOffsets = commitOffsets;
        this.offsetTracker = offsetTracker;
        this.logger = logger;
        this.brokerUri = brokerUri;
    }

    @Override
    public void toFlowFiles(ProcessSession session, Iterator<ByteRecord> consumerRecords) {
        HashMap<RecordGroupCriteria, RecordGroup> recordGroups = new HashMap<RecordGroupCriteria, RecordGroup>();
        while (consumerRecords.hasNext()) {
            ByteRecord consumerRecord = consumerRecords.next();
            String topic = consumerRecord.getTopic();
            int partition = consumerRecord.getPartition();
            byte[] value = consumerRecord.getValue();
            Map<String, String> attributes = KafkaUtils.toAttributes(consumerRecord, this.keyEncoding, this.headerNamePattern, this.headerEncoding, this.commitOffsets);
            Map<String, String> extraAttrs = this.extractHeaders(consumerRecord);
            try (ByteArrayInputStream in = new ByteArrayInputStream(value);
                 RecordReader reader = this.readerFactory.createRecordReader(attributes, (InputStream)in, (long)value.length, this.logger);){
                Record record;
                while ((record = reader.nextRecord()) != null) {
                    this.processSingleRecord(session, recordGroups, consumerRecord, record, attributes, extraAttrs, topic, partition);
                }
            }
            catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
                this.logger.debug("Reader or Writer failed to process Kafka Record with Topic [{}] Partition [{}] Offset [{}]", new Object[]{consumerRecord.getTopic(), consumerRecord.getPartition(), consumerRecord.getOffset(), e});
                this.handleParseFailure(session, consumerRecord, attributes, value);
                this.offsetTracker.update(consumerRecord);
                continue;
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to process Kafka message", e);
            }
            this.offsetTracker.update(consumerRecord);
        }
        this.finishAllGroups(session, recordGroups);
    }

    private void processSingleRecord(ProcessSession session, Map<RecordGroupCriteria, RecordGroup> recordGroups, ByteRecord consumerRecord, Record record, Map<String, String> attributes, Map<String, String> extraAttrs, String topic, int partition) throws Exception {
        RecordSchema inputSchema = record == null ? EMPTY_SCHEMA : record.getSchema();
        RecordSchema writeSchema = this.getWriteSchema(inputSchema, consumerRecord, attributes);
        RecordGroupCriteria criteria = new RecordGroupCriteria(writeSchema, extraAttrs, topic, partition);
        RecordGroup group = recordGroups.get(criteria);
        if (group == null) {
            RecordSetWriter writer;
            FlowFile ff = session.create();
            ff = session.putAllAttributes(ff, Map.of("kafka.topic", topic, "kafka.partition", String.valueOf(partition)));
            OutputStream out = session.write(ff);
            try {
                writer = this.writerFactory.createWriter(this.logger, writeSchema, out, attributes);
                writer.beginRecordSet();
            }
            catch (Exception ex) {
                out.close();
                throw ex;
            }
            long offset = consumerRecord.getOffset();
            AtomicLong maxOffset = new AtomicLong(offset);
            AtomicLong minOffset = new AtomicLong(offset);
            group = new RecordGroup(ff, writer, maxOffset, minOffset);
            recordGroups.put(criteria, group);
        } else {
            AtomicLong minOffset;
            AtomicLong maxOffset;
            long recordOffset = consumerRecord.getOffset();
            if (recordOffset > (maxOffset = group.maxOffset()).get()) {
                maxOffset.set(recordOffset);
            }
            if (recordOffset < (minOffset = group.minOffset()).get()) {
                minOffset.set(recordOffset);
            }
        }
        Record toWrite = this.convertRecord(consumerRecord, record, attributes);
        if (toWrite != null) {
            group.writer().write(toWrite);
        }
    }

    private void finishAllGroups(ProcessSession session, Map<RecordGroupCriteria, RecordGroup> recordGroups) {
        for (Map.Entry<RecordGroupCriteria, RecordGroup> e : recordGroups.entrySet()) {
            int recordCount;
            RecordGroupCriteria criteria = e.getKey();
            RecordGroup group = e.getValue();
            HashMap<String, String> resultAttrs = new HashMap<String, String>();
            try (RecordSetWriter writer = group.writer();){
                WriteResult wr = writer.finishRecordSet();
                resultAttrs.putAll(wr.getAttributes());
                resultAttrs.put("record.count", String.valueOf(wr.getRecordCount()));
                resultAttrs.put("kafka.count", String.valueOf(wr.getRecordCount()));
                resultAttrs.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
                long maxOffset = group.maxOffset().get();
                resultAttrs.put("kafka.max.offset", Long.toString(maxOffset));
                long minOffset = group.minOffset().get();
                resultAttrs.put("kafka.offset", Long.toString(minOffset));
                resultAttrs.putAll(criteria.extraAttributes());
                resultAttrs.put("kafka.consumer.offsets.committed", String.valueOf(this.commitOffsets));
                recordCount = wr.getRecordCount();
            }
            catch (Exception ex) {
                throw new ProcessException("Failed to write Kafka records to FlowFile", (Throwable)ex);
            }
            FlowFile ff = group.flowFile();
            ff = session.putAllAttributes(ff, resultAttrs);
            session.getProvenanceReporter().receive(ff, this.brokerUri + "/" + criteria.topic());
            session.adjustCounter("Records Received from " + criteria.topic(), (long)recordCount, false);
            session.transfer(ff, ConsumeKafka.SUCCESS);
        }
    }

    protected void handleParseFailure(ProcessSession session, ByteRecord consumerRecord, Map<String, String> attributes, byte[] value) {
        FlowFile ff = session.create();
        ff = session.putAllAttributes(ff, attributes);
        ff = session.write(ff, out -> out.write(value));
        session.transfer(ff, ConsumeKafka.PARSE_FAILURE);
        session.adjustCounter("Records Received from " + consumerRecord.getTopic(), 1L, false);
    }

    protected Map<String, String> extractHeaders(ByteRecord consumerRecord) {
        return Map.of();
    }

    protected abstract RecordSchema getWriteSchema(RecordSchema var1, ByteRecord var2, Map<String, String> var3) throws IOException;

    protected abstract Record convertRecord(ByteRecord var1, Record var2, Map<String, String> var3) throws IOException;

    private record RecordGroupCriteria(RecordSchema schema, Map<String, String> extraAttributes, String topic, int partition) {
    }

    private record RecordGroup(FlowFile flowFile, RecordSetWriter writer, AtomicLong maxOffset, AtomicLong minOffset) {
    }
}

