/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.aws.kinesis.stream.record.converter;

import java.time.Instant;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.processors.aws.kinesis.stream.record.converter.RecordConverter;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

public class RecordConverterWrapper
implements RecordConverter {
    private static final String VALUE = "value";
    private static final String METADATA = "metadata";
    private static final String STREAM = "stream";
    private static final String SHARD_ID = "shardId";
    private static final String SEQUENCE_NUMBER = "sequenceNumber";
    private static final String PARTITION_KEY = "partitionKey";
    private static final String APPROX_ARRIVAL_TIMESTAMP = "approximateArrival";
    private static final RecordField FIELD_STREAM = new RecordField("stream", RecordFieldType.STRING.getDataType());
    private static final RecordField FIELD_SHARD_ID = new RecordField("shardId", RecordFieldType.STRING.getDataType());
    private static final RecordField FIELD_SEQUENCE_NUMBER = new RecordField("sequenceNumber", RecordFieldType.STRING.getDataType());
    private static final RecordField FIELD_PARTITION_KEY = new RecordField("partitionKey", RecordFieldType.STRING.getDataType());
    private static final RecordField FIELD_APPROX_ARRIVAL_TIMESTAMP = new RecordField("approximateArrival", RecordFieldType.TIMESTAMP.getDataType());
    private static final RecordSchema SCHEMA_METADATA = new SimpleRecordSchema(Arrays.asList(FIELD_STREAM, FIELD_SHARD_ID, FIELD_SEQUENCE_NUMBER, FIELD_PARTITION_KEY, FIELD_APPROX_ARRIVAL_TIMESTAMP));
    public static final RecordField FIELD_METADATA = new RecordField("metadata", RecordFieldType.RECORD.getRecordDataType(SCHEMA_METADATA));

    @Override
    public Record convert(Record valueRecord, KinesisClientRecord kinesisRecord, String streamName, String shardId) {
        LinkedHashMap<String, Object> metadata = new LinkedHashMap<String, Object>();
        metadata.put(STREAM, streamName);
        metadata.put(SHARD_ID, shardId);
        metadata.put(SEQUENCE_NUMBER, kinesisRecord.sequenceNumber());
        metadata.put(PARTITION_KEY, kinesisRecord.partitionKey());
        Instant approxArrivalTimestamp = kinesisRecord.approximateArrivalTimestamp();
        metadata.put(APPROX_ARRIVAL_TIMESTAMP, approxArrivalTimestamp == null ? null : Long.valueOf(approxArrivalTimestamp.toEpochMilli()));
        MapRecord metadataRecord = new MapRecord(SCHEMA_METADATA, metadata);
        return new MapRecord(this.convertToWriteSchema(valueRecord.getSchema()), Map.of(METADATA, metadataRecord, VALUE, valueRecord));
    }

    private RecordSchema convertToWriteSchema(RecordSchema readerSchema) {
        RecordField recordField = new RecordField(VALUE, RecordFieldType.RECORD.getRecordDataType(readerSchema));
        return new SimpleRecordSchema(List.of(FIELD_METADATA, recordField));
    }
}

