/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.aws.kinesis.stream.record;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream;
import org.apache.nifi.processors.aws.kinesis.stream.record.AbstractKinesisRecordProcessor;
import org.apache.nifi.processors.aws.kinesis.stream.record.converter.RecordConverter;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StopWatch;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

public class KinesisRecordProcessorRecord
extends AbstractKinesisRecordProcessor {
    final RecordReaderFactory readerFactory;
    final RecordSetWriterFactory writerFactory;
    final Map<String, String> schemaRetrievalVariables;
    private RecordSetWriter writer;
    private OutputStream outputStream;
    private final RecordConverter recordConverter;

    public KinesisRecordProcessorRecord(ProcessSessionFactory sessionFactory, ComponentLog log, String streamName, String endpointPrefix, String kinesisEndpoint, long checkpointIntervalMillis, long retryWaitMillis, int numRetries, DateTimeFormatter dateTimeFormatter, RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, RecordConverter recordConverter) {
        super(sessionFactory, log, streamName, endpointPrefix, kinesisEndpoint, checkpointIntervalMillis, retryWaitMillis, numRetries, dateTimeFormatter);
        this.readerFactory = readerFactory;
        this.writerFactory = writerFactory;
        this.schemaRetrievalVariables = Collections.singletonMap("kinesis.name", streamName);
        this.recordConverter = recordConverter;
    }

    @Override
    void startProcessingRecords() {
        super.startProcessingRecords();
        this.outputStream = null;
        this.writer = null;
    }

    @Override
    void processRecord(List<FlowFile> flowFiles, KinesisClientRecord kinesisRecord, boolean lastRecord, ProcessSession session, StopWatch stopWatch) {
        byte[] data;
        boolean firstOutputRecord = true;
        int recordCount = 0;
        ByteBuffer dataBuffer = kinesisRecord.data();
        byte[] byArray = data = dataBuffer != null ? new byte[dataBuffer.remaining()] : new byte[]{};
        if (dataBuffer != null) {
            dataBuffer.get(data);
        }
        FlowFile flowFile = null;
        try (ByteArrayInputStream in = new ByteArrayInputStream(data);
             RecordReader reader = this.readerFactory.createRecordReader(this.schemaRetrievalVariables, (InputStream)in, (long)data.length, this.getLogger());){
            Record intermediateRecord;
            PushBackRecordSet recordSet = new PushBackRecordSet(reader.createRecordSet());
            while ((intermediateRecord = recordSet.next()) != null) {
                Record outputRecord = this.recordConverter.convert(intermediateRecord, kinesisRecord, this.getStreamName(), this.getKinesisShardId());
                if (flowFiles.isEmpty()) {
                    flowFile = session.create();
                    flowFiles.add(flowFile);
                    this.createWriter(flowFile, session, outputRecord);
                }
                WriteResult writeResult = this.writer.write(outputRecord);
                recordCount += writeResult.getRecordCount();
                if (lastRecord && !recordSet.isAnotherRecord()) {
                    this.completeFlowFile(flowFiles, session, recordCount, writeResult, kinesisRecord, stopWatch);
                }
                firstOutputRecord = false;
            }
        }
        catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
            this.getLogger().error("Failed to parse message from Kinesis Stream using configured Record Reader and Writer due to {}", new Object[]{e.getLocalizedMessage(), e});
            this.outputRawRecordOnException(firstOutputRecord, flowFile, flowFiles, session, data, kinesisRecord, (Exception)e);
        }
        if (this.getLogger().isDebugEnabled()) {
            this.getLogger().debug("Sequence No: {}, Partition Key: {}, Data: {}", new Object[]{kinesisRecord.sequenceNumber(), kinesisRecord.partitionKey(), BASE_64_ENCODER.encodeToString(data)});
        }
    }

    private void createWriter(FlowFile flowFile, ProcessSession session, Record outputRecord) throws IOException, SchemaNotFoundException {
        RecordSchema readerSchema = outputRecord.getSchema();
        RecordSchema writeSchema = this.writerFactory.getSchema(this.schemaRetrievalVariables, readerSchema);
        this.outputStream = session.write(flowFile);
        this.writer = this.writerFactory.createWriter(this.getLogger(), writeSchema, this.outputStream, flowFile);
        this.writer.beginRecordSet();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void completeFlowFile(List<FlowFile> flowFiles, ProcessSession session, int recordCount, WriteResult writeResult, KinesisClientRecord lastRecord, StopWatch stopWatch) throws IOException {
        try {
            this.writer.finishRecordSet();
        }
        catch (IOException e) {
            try {
                this.getLogger().error("Failed to finish record output due to {}", new Object[]{e.getLocalizedMessage(), e});
                session.remove(flowFiles.get(0));
                flowFiles.remove(0);
                throw e;
            }
            catch (Throwable throwable) {
                try {
                    this.writer.close();
                    this.outputStream.close();
                    throw throwable;
                }
                catch (IOException e2) {
                    this.getLogger().warn("Failed to close Record Writer due to {}", new Object[]{e2.getLocalizedMessage(), e2});
                }
                throw throwable;
            }
        }
        try {
            this.writer.close();
            this.outputStream.close();
        }
        catch (IOException e) {
            this.getLogger().warn("Failed to close Record Writer due to {}", new Object[]{e.getLocalizedMessage(), e});
        }
        this.reportProvenance(session, flowFiles.get(0), null, null, stopWatch);
        Map<String, String> attributes = this.getDefaultAttributes(lastRecord);
        attributes.put("record.count", String.valueOf(recordCount));
        attributes.put(CoreAttributes.MIME_TYPE.key(), this.writer.getMimeType());
        attributes.putAll(writeResult.getAttributes());
        flowFiles.set(0, session.putAllAttributes(flowFiles.get(0), attributes));
        this.writer = null;
        this.outputStream = null;
    }

    private void outputRawRecordOnException(boolean firstOutputRecord, FlowFile flowFile, List<FlowFile> flowFiles, ProcessSession session, byte[] data, KinesisClientRecord kinesisRecord, Exception e) {
        if (firstOutputRecord && flowFile != null) {
            session.remove(flowFile);
            flowFiles.remove(0);
            if (this.writer != null) {
                try {
                    this.writer.close();
                    this.outputStream.close();
                }
                catch (IOException ioe) {
                    this.getLogger().warn("Failed to close Record Writer due to {}", new Object[]{ioe.getLocalizedMessage(), ioe});
                }
            }
        }
        FlowFile failed = session.create();
        session.write(failed, o -> o.write(data));
        Map<String, String> attributes = this.getDefaultAttributes(kinesisRecord);
        Throwable c = e.getCause() != null ? e.getCause() : e;
        attributes.put("record.error.message", (String)(c.getLocalizedMessage() != null ? c.getLocalizedMessage() : c.getClass().getCanonicalName() + " Thrown"));
        failed = session.putAllAttributes(failed, attributes);
        this.transferTo(ConsumeKinesisStream.REL_PARSE_FAILURE, session, 0, 0, Collections.singletonList(failed));
    }

    private Map<String, String> getDefaultAttributes(KinesisClientRecord kinesisRecord) {
        String partitionKey = kinesisRecord.partitionKey();
        String sequenceNumber = kinesisRecord.sequenceNumber();
        Instant approximateArrivalTimestamp = kinesisRecord.approximateArrivalTimestamp();
        return this.getDefaultAttributes(sequenceNumber, partitionKey, approximateArrivalTimestamp);
    }
}

