/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.avro;

import java.io.BufferedInputStream;
import java.io.InputStream;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.SchemaNormalization;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.DatumReader;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@SideEffectFree
@SupportsBatching
@Tags(value={"avro", "schema", "metadata"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Extracts metadata from the header of an Avro datafile.")
@WritesAttributes(value={@WritesAttribute(attribute="schema.type", description="The type of the schema (i.e. record, enum, etc.)."), @WritesAttribute(attribute="schema.name", description="Contains the name when the type is a record, enum or fixed, otherwise contains the name of the primitive type."), @WritesAttribute(attribute="schema.fingerprint", description="The result of the Fingerprint Algorithm as a Hex string."), @WritesAttribute(attribute="item.count", description="The total number of items in the datafile, only written if Count Items is set to true.")})
public class ExtractAvroMetadata
extends AbstractProcessor {
    static final AllowableValue CRC_64_AVRO = new AllowableValue("CRC-64-AVRO");
    static final AllowableValue MD5 = new AllowableValue("MD5");
    static final AllowableValue SHA_256 = new AllowableValue("SHA-256");
    static final PropertyDescriptor FINGERPRINT_ALGORITHM = new PropertyDescriptor.Builder().name("Fingerprint Algorithm").description("The algorithm used to generate the schema fingerprint. Available choices are based on the Avro recommended practices for fingerprint generation.").allowableValues(new DescribedValue[]{CRC_64_AVRO, MD5, SHA_256}).defaultValue(CRC_64_AVRO.getValue()).required(true).build();
    static final PropertyDescriptor METADATA_KEYS = new PropertyDescriptor.Builder().name("Metadata Keys").description("A comma-separated list of keys indicating key/value pairs to extract from the Avro file header. The key 'avro.schema' can be used to extract the full schema in JSON format, and 'avro.codec' can be used to extract the codec name if one exists.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(false).build();
    static final PropertyDescriptor COUNT_ITEMS = new PropertyDescriptor.Builder().name("Count Items").description("If true the number of items in the datafile will be counted and stored in a FlowFile attribute 'item.count'. The counting is done by reading blocks and getting the number of items for each block, thus avoiding de-serializing. The items being counted will be the top-level items in the datafile. For example, with a schema of type record the items will be the records, and for a schema of type Array the items will be the arrays (not the number of entries in each array).").allowableValues(new String[]{"true", "false"}).defaultValue("false").required(true).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("A FlowFile is routed to this relationship after metadata has been extracted.").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or metadata cannot be extracted for any reason").build();
    static final String SCHEMA_TYPE_ATTR = "schema.type";
    static final String SCHEMA_NAME_ATTR = "schema.name";
    static final String SCHEMA_FINGERPRINT_ATTR = "schema.fingerprint";
    static final String ITEM_COUNT_ATTR = "item.count";
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(FINGERPRINT_ALGORITHM, METADATA_KEYS, COUNT_ITEMS);
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);

    protected void init(ProcessorInitializationContext context) {
        super.init(context);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        HashMap avroMetadata = new HashMap();
        HashSet<String> requestedMetadataKeys = new HashSet<String>();
        boolean countRecords = context.getProperty(COUNT_ITEMS).asBoolean();
        String fingerprintAlgorithm = context.getProperty(FINGERPRINT_ALGORITHM).getValue();
        String metadataKeysValue = context.getProperty(METADATA_KEYS).getValue();
        if (!StringUtils.isEmpty((CharSequence)metadataKeysValue)) {
            String[] keys;
            for (String key : keys = metadataKeysValue.split("\\s*,\\s*")) {
                requestedMetadataKeys.add(key.trim());
            }
        }
        try {
            session.read(flowFile, rawIn -> {
                block17: {
                    try (BufferedInputStream in = new BufferedInputStream(rawIn);
                         DataFileStream reader = new DataFileStream((InputStream)in, (DatumReader)new GenericDatumReader());){
                        Schema schema = reader.getSchema();
                        if (schema == null) {
                            throw new ProcessException("Avro schema was null");
                        }
                        for (String key : reader.getMetaKeys()) {
                            if (!requestedMetadataKeys.contains(key)) continue;
                            avroMetadata.put(key, reader.getMetaString(key));
                        }
                        try {
                            byte[] rawFingerprint = SchemaNormalization.parsingFingerprint((String)fingerprintAlgorithm, (Schema)schema);
                            avroMetadata.put(SCHEMA_FINGERPRINT_ATTR, Hex.encodeHexString((byte[])rawFingerprint));
                            avroMetadata.put(SCHEMA_TYPE_ATTR, schema.getType().getName());
                            avroMetadata.put(SCHEMA_NAME_ATTR, schema.getName());
                        }
                        catch (NoSuchAlgorithmException e) {
                            throw new ProcessException((Throwable)e);
                        }
                        if (!countRecords) break block17;
                        long recordCount = reader.getBlockCount();
                        try {
                            while (reader.nextBlock() != null) {
                                recordCount += reader.getBlockCount();
                            }
                        }
                        catch (NoSuchElementException noSuchElementException) {
                            // empty catch block
                        }
                        avroMetadata.put(ITEM_COUNT_ATTR, String.valueOf(recordCount));
                    }
                }
            });
        }
        catch (ProcessException pe) {
            this.getLogger().error("Transferring to failure since failed to extract Avro metadata for {}", new Object[]{flowFile, pe});
            session.transfer(flowFile, REL_FAILURE);
            return;
        }
        flowFile = session.putAllAttributes(flowFile, avroMetadata);
        session.transfer(flowFile, REL_SUCCESS);
    }
}

