/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.hadoop;

import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.StandardFlowFileMediaType;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.hadoop.AbstractHadoopProcessor;
import org.apache.nifi.processors.hadoop.FlowFileStreamUnpackerSequenceFileWriter;
import org.apache.nifi.processors.hadoop.PutHDFS;
import org.apache.nifi.processors.hadoop.SequenceFileWriterImpl;
import org.apache.nifi.processors.hadoop.TarUnpackerSequenceFileWriter;
import org.apache.nifi.processors.hadoop.ZipUnpackerSequenceFileWriter;
import org.apache.nifi.util.StopWatch;

@DeprecationNotice(reason="NIFI-14846: Uses custom file format specific to Apache NiFi and minimal maintenance since initial implementation")
@SideEffectFree
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"hadoop", "sequence file", "create", "sequencefile"})
@CapabilityDescription(value="Creates Hadoop Sequence Files from incoming flow files")
@SeeAlso(value={PutHDFS.class})
public class CreateHadoopSequenceFile
extends AbstractHadoopProcessor {
    public static final String TAR_FORMAT = "tar";
    public static final String ZIP_FORMAT = "zip";
    public static final String FLOWFILE_STREAM_FORMAT_V3 = "flowfile-stream-v3";
    private static final String NOT_PACKAGED = "not packaged";
    public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder().name("success").description("Generated Sequence Files are sent to this relationship").build();
    public static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder().name("failure").description("Incoming files that failed to generate a Sequence File are sent to this relationship").build();
    static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder().name("Compression Type").description("Type of compression to use when creating Sequence File").allowableValues((Enum[])SequenceFile.CompressionType.values()).build();
    public static final String DEFAULT_COMPRESSION_TYPE = "NONE";
    private static final Set<Relationship> RELATIONSHIPS = Set.of(RELATIONSHIP_SUCCESS, RELATIONSHIP_FAILURE);
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Stream.concat(CreateHadoopSequenceFile.getCommonPropertyDescriptors().stream(), Stream.of(COMPRESSION_TYPE, COMPRESSION_CODEC)).toList();

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
        String packagingFormat = NOT_PACKAGED;
        if (null != mimeType) {
            if (StandardFlowFileMediaType.VERSION_3.getMediaType().equals(mimeType)) {
                packagingFormat = FLOWFILE_STREAM_FORMAT_V3;
            } else {
                switch (mimeType.toLowerCase()) {
                    case "application/tar": {
                        packagingFormat = TAR_FORMAT;
                        break;
                    }
                    case "application/zip": {
                        packagingFormat = ZIP_FORMAT;
                        break;
                    }
                    default: {
                        this.getLogger().warn("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked", new Object[]{flowFile, mimeType});
                    }
                }
            }
        }
        SequenceFileWriterImpl sequenceFileWriter = switch (packagingFormat) {
            case TAR_FORMAT -> new TarUnpackerSequenceFileWriter();
            case ZIP_FORMAT -> new ZipUnpackerSequenceFileWriter();
            case FLOWFILE_STREAM_FORMAT_V3 -> new FlowFileStreamUnpackerSequenceFileWriter();
            default -> new SequenceFileWriterImpl();
        };
        Configuration configuration = this.getConfiguration();
        if (configuration == null) {
            this.getLogger().error("HDFS not configured properly");
            session.transfer(flowFile, RELATIONSHIP_FAILURE);
            context.yield();
            return;
        }
        CompressionCodec codec = this.getCompressionCodec(context, configuration);
        String value = context.getProperty(COMPRESSION_TYPE).getValue();
        SequenceFile.CompressionType compressionType = value == null ? SequenceFile.CompressionType.valueOf((String)DEFAULT_COMPRESSION_TYPE) : SequenceFile.CompressionType.valueOf((String)value);
        String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf";
        flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
        try {
            StopWatch stopWatch = new StopWatch(true);
            flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, configuration, compressionType, codec);
            session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
            session.transfer(flowFile, RELATIONSHIP_SUCCESS);
            this.getLogger().info("Transferred flowfile {} to {}", new Object[]{flowFile, RELATIONSHIP_SUCCESS});
        }
        catch (ProcessException e) {
            this.getLogger().error("Failed to create Sequence File. Transferring {} to 'failure'", new Object[]{flowFile, e});
            session.transfer(flowFile, RELATIONSHIP_FAILURE);
        }
    }

    public void migrateProperties(PropertyConfiguration config) {
        super.migrateProperties(config);
        config.renameProperty("compression type", COMPRESSION_TYPE.getName());
    }
}

