/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.aws.kinesis.stream.record;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processors.aws.kinesis.property.SchemaDifferenceHandlingStrategy;
import org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream;
import org.apache.nifi.processors.aws.kinesis.stream.pause.RecordProcessorBlocker;
import org.apache.nifi.processors.aws.kinesis.stream.record.AbstractKinesisRecordProcessor;
import org.apache.nifi.processors.aws.kinesis.stream.record.StateHandlerStrategy;
import org.apache.nifi.processors.aws.kinesis.stream.record.converter.RecordConverter;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

public class KinesisRecordProcessorRecord
extends AbstractKinesisRecordProcessor {
    private final RecordReaderFactory readerFactory;
    private final RecordSetWriterFactory writerFactory;
    private final Map<String, String> schemaRetrievalVariables;
    private final RecordConverter recordConverter;
    private final StateHandlerStrategy stateHandlerStrategy;

    public KinesisRecordProcessorRecord(ProcessSessionFactory sessionFactory, ComponentLog log, String streamName, String endpointPrefix, String kinesisEndpoint, long checkpointIntervalMillis, long retryWaitMillis, int numRetries, DateTimeFormatter dateTimeFormatter, RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, RecordConverter recordConverter, RecordProcessorBlocker recordProcessorBlocker, SchemaDifferenceHandlingStrategy schemaDifferenceHandlingStrategy) {
        super(sessionFactory, log, streamName, endpointPrefix, kinesisEndpoint, checkpointIntervalMillis, retryWaitMillis, numRetries, dateTimeFormatter, recordProcessorBlocker);
        this.readerFactory = readerFactory;
        this.writerFactory = writerFactory;
        this.schemaRetrievalVariables = Collections.singletonMap("kinesis.name", streamName);
        this.recordConverter = recordConverter;
        this.stateHandlerStrategy = new StateHandlerStrategy(schemaDifferenceHandlingStrategy, this::initializeFlowFileState, this::completeFlowFileState);
    }

    @Override
    void startProcessingRecords() {
        FlowFileState flowFileState;
        super.startProcessingRecords();
        while ((flowFileState = this.stateHandlerStrategy.pop()) != null) {
            this.closeSafe(flowFileState.asClosable(), "FlowFile State");
        }
    }

    @Override
    void finishProcessingRecords(AbstractKinesisRecordProcessor.BatchProcessingContext batchProcessingContext) {
        FlowFileState flowFileState;
        super.finishProcessingRecords(batchProcessingContext);
        List<FlowFile> flowFiles = batchProcessingContext.flowFiles();
        while ((flowFileState = this.stateHandlerStrategy.pop()) != null) {
            if (!flowFiles.contains(flowFileState.flowFile)) {
                throw new IllegalStateException("%s is not available in provided FlowFiles [%d]".formatted(flowFileState.flowFile, flowFiles.size()));
            }
            try {
                this.completeFlowFileState(flowFileState, batchProcessingContext);
            }
            catch (FlowFileCompletionException e) {
                this.handleFlowFileCompletionException(e, batchProcessingContext);
            }
        }
    }

    @Override
    void processRecord(KinesisClientRecord kinesisRecord, AbstractKinesisRecordProcessor.BatchProcessingContext batchProcessingContext) {
        byte[] data = KinesisRecordProcessorRecord.getData(kinesisRecord);
        try (ByteArrayInputStream in = new ByteArrayInputStream(data);
             RecordReader reader = this.readerFactory.createRecordReader(this.schemaRetrievalVariables, (InputStream)in, (long)data.length, this.getLogger());){
            Record intermediateRecord;
            PushBackRecordSet recordSet = new PushBackRecordSet(reader.createRecordSet());
            while ((intermediateRecord = recordSet.next()) != null) {
                FlowFileState flowFileState;
                Record outputRecord = this.recordConverter.convert(intermediateRecord, kinesisRecord, this.getStreamName(), this.getKinesisShardId());
                try {
                    flowFileState = this.stateHandlerStrategy.getOrCreate(outputRecord, batchProcessingContext);
                }
                catch (FlowFileCompletionException e) {
                    this.stateHandlerStrategy.drop(e.flowFileState.recordSchema);
                    this.handleFlowFileCompletionException(e, batchProcessingContext);
                    flowFileState = this.stateHandlerStrategy.create(outputRecord, batchProcessingContext);
                }
                flowFileState.write(outputRecord, kinesisRecord);
            }
        }
        catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
            this.getLogger().error("Failed to parse message from Kinesis Stream using configured Record Reader and Writer due to {}", new Object[]{e.getLocalizedMessage(), e});
            this.outputRawRecordOnException(batchProcessingContext.session(), data, kinesisRecord, (Exception)e);
        }
        if (this.getLogger().isDebugEnabled()) {
            this.getLogger().debug("Sequence No: {}, Partition Key: {}, Data: {}", new Object[]{kinesisRecord.sequenceNumber(), kinesisRecord.partitionKey(), BASE_64_ENCODER.encodeToString(data)});
        }
    }

    private void handleFlowFileCompletionException(FlowFileCompletionException e, AbstractKinesisRecordProcessor.BatchProcessingContext batchProcessingContext) {
        if (!e.flowFileState.containsDataFromExactlyOneKinesisRecord()) {
            throw new AbstractKinesisRecordProcessor.KinesisBatchUnrecoverableException("Not all KinesisClientRecords contained in FlowFile contents can be routed to Failure relationship", e);
        }
        this.dropFlowFileState(e.flowFileState, batchProcessingContext);
        KinesisClientRecord failedKinesisRecord = e.flowFileState.lastSuccessfulWriteInfo.kinesisRecord;
        byte[] failedRecordData = KinesisRecordProcessorRecord.getData(failedKinesisRecord);
        this.outputRawRecordOnException(batchProcessingContext.session(), failedRecordData, failedKinesisRecord, e);
    }

    private static byte[] getData(KinesisClientRecord kinesisRecord) {
        byte[] data;
        ByteBuffer dataBuffer = kinesisRecord.data();
        byte[] byArray = data = dataBuffer != null ? new byte[dataBuffer.remaining()] : new byte[]{};
        if (dataBuffer != null) {
            dataBuffer.get(data);
        }
        return data;
    }

    private FlowFileState initializeFlowFileState(Record record, AbstractKinesisRecordProcessor.BatchProcessingContext batchProcessingContext) throws IOException, SchemaNotFoundException {
        ProcessSession session = batchProcessingContext.session();
        FlowFile flowFile = null;
        OutputStream outputStream = null;
        RecordSetWriter writer = null;
        try {
            flowFile = session.create();
            RecordSchema newReadSchema = record.getSchema();
            RecordSchema writeSchema = this.writerFactory.getSchema(this.schemaRetrievalVariables, newReadSchema);
            outputStream = session.write(flowFile);
            writer = this.writerFactory.createWriter(this.getLogger(), writeSchema, outputStream, flowFile);
            writer.beginRecordSet();
            batchProcessingContext.flowFiles().add(flowFile);
        }
        catch (Exception e) {
            if (flowFile != null) {
                session.remove(flowFile);
            }
            this.closeSafe((Closeable)writer, "Record Writer");
            this.closeSafe(outputStream, "Output Stream");
            throw e;
        }
        return new FlowFileState(flowFile, writer, outputStream, record.getSchema(), this.getLogger());
    }

    private void completeFlowFileState(FlowFileState flowFileState, AbstractKinesisRecordProcessor.BatchProcessingContext batchProcessingContext) throws FlowFileCompletionException {
        ProcessSession session = batchProcessingContext.session();
        List<FlowFile> flowFiles = batchProcessingContext.flowFiles();
        if (flowFileState.isFlowFileEmpty()) {
            this.dropFlowFileState(flowFileState, batchProcessingContext);
            return;
        }
        try {
            flowFileState.writer.finishRecordSet();
            this.closeSafe(flowFileState.asClosable(), "FlowFile State");
            this.reportProvenance(session, flowFileState.flowFile, null, null, batchProcessingContext.stopWatch());
            Map<String, String> attributes = this.getDefaultAttributes(flowFileState.lastSuccessfulWriteInfo.kinesisRecord);
            attributes.put("record.count", String.valueOf(flowFileState.lastSuccessfulWriteInfo.writeResult.getRecordCount()));
            attributes.put(CoreAttributes.MIME_TYPE.key(), flowFileState.writer.getMimeType());
            attributes.putAll(flowFileState.lastSuccessfulWriteInfo.writeResult.getAttributes());
            int flowFileIndex = flowFiles.indexOf(flowFileState.flowFile);
            flowFiles.set(flowFileIndex, session.putAllAttributes(flowFileState.flowFile, attributes));
        }
        catch (IOException e) {
            this.dropFlowFileState(flowFileState, batchProcessingContext);
            String message = "Failed to complete a FlowFile containing records from Stream Name: %s, Shard Id: %s, Sequence/Subsequence No range: [%s/%d, %s/%d)".formatted(this.getStreamName(), this.getKinesisShardId(), flowFileState.firstSuccessfulWriteInfo.kinesisRecord.sequenceNumber(), flowFileState.firstSuccessfulWriteInfo.kinesisRecord.subSequenceNumber(), flowFileState.lastSuccessfulWriteInfo.kinesisRecord.sequenceNumber(), flowFileState.lastSuccessfulWriteInfo.kinesisRecord.subSequenceNumber());
            throw new FlowFileCompletionException(message, e, flowFileState);
        }
    }

    private void dropFlowFileState(FlowFileState flowFileState, AbstractKinesisRecordProcessor.BatchProcessingContext batchProcessingContext) {
        this.closeSafe(flowFileState.asClosable(), "FlowFile State");
        batchProcessingContext.session().remove(flowFileState.flowFile);
        batchProcessingContext.flowFiles().remove(flowFileState.flowFile);
    }

    private void outputRawRecordOnException(ProcessSession session, byte[] data, KinesisClientRecord kinesisRecord, Exception e) {
        FlowFile failed = session.create();
        session.write(failed, o -> o.write(data));
        Map<String, String> attributes = this.getDefaultAttributes(kinesisRecord);
        Throwable c = e.getCause() != null ? e.getCause() : e;
        attributes.put("record.error.message", (String)(c.getLocalizedMessage() != null ? c.getLocalizedMessage() : c.getClass().getCanonicalName() + " Thrown"));
        failed = session.putAllAttributes(failed, attributes);
        this.transferTo(ConsumeKinesisStream.REL_PARSE_FAILURE, session, 0, 0, Collections.singletonList(failed));
    }

    private Map<String, String> getDefaultAttributes(KinesisClientRecord kinesisRecord) {
        String partitionKey = kinesisRecord.partitionKey();
        String sequenceNumber = kinesisRecord.sequenceNumber();
        Instant approximateArrivalTimestamp = kinesisRecord.approximateArrivalTimestamp();
        return this.getDefaultAttributes(sequenceNumber, partitionKey, approximateArrivalTimestamp);
    }

    private void closeSafe(Closeable closeable, String closeableName) {
        KinesisRecordProcessorRecord.closeSafe(closeable, closeableName, this.getLogger());
    }

    private static void closeSafe(Closeable closeable, String closeableName, ComponentLog logger) {
        if (closeable != null) {
            try {
                closeable.close();
            }
            catch (IOException e) {
                logger.warn("Failed to close {}", new Object[]{closeableName, e});
            }
        }
    }

    static class FlowFileState {
        private final FlowFile flowFile;
        private final RecordSetWriter writer;
        private final OutputStream outputStream;
        private final RecordSchema recordSchema;
        private final ComponentLog componentLog;
        private SuccessfulWriteInfo firstSuccessfulWriteInfo;
        private SuccessfulWriteInfo lastSuccessfulWriteInfo;

        private FlowFileState(FlowFile flowFile, RecordSetWriter writer, OutputStream outputStream, RecordSchema recordSchema, ComponentLog componentLog) {
            this.flowFile = flowFile;
            this.writer = writer;
            this.outputStream = outputStream;
            this.recordSchema = recordSchema;
            this.componentLog = componentLog;
        }

        private boolean isFlowFileEmpty() {
            return this.lastSuccessfulWriteInfo == null;
        }

        private boolean containsDataFromExactlyOneKinesisRecord() {
            return !this.isFlowFileEmpty() && this.firstSuccessfulWriteInfo.kinesisRecord == this.lastSuccessfulWriteInfo.kinesisRecord;
        }

        private void write(Record outputRecord, KinesisClientRecord kinesisRecord) throws IOException {
            WriteResult writeResult = this.writer.write(outputRecord);
            this.firstSuccessfulWriteInfo = this.firstSuccessfulWriteInfo == null ? new SuccessfulWriteInfo(kinesisRecord, writeResult) : this.firstSuccessfulWriteInfo;
            this.lastSuccessfulWriteInfo = new SuccessfulWriteInfo(kinesisRecord, writeResult);
        }

        private Closeable asClosable() {
            return () -> {
                KinesisRecordProcessorRecord.closeSafe((Closeable)this.writer, "Record Writer", this.componentLog);
                KinesisRecordProcessorRecord.closeSafe(this.outputStream, "Output Stream", this.componentLog);
            };
        }
    }

    static class FlowFileCompletionException
    extends Exception {
        private final FlowFileState flowFileState;

        private FlowFileCompletionException(String message, Throwable cause, FlowFileState flowFileState) {
            super(message, cause);
            this.flowFileState = flowFileState;
        }
    }

    record SuccessfulWriteInfo(KinesisClientRecord kinesisRecord, WriteResult writeResult) {
    }
}

