/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipException;
import java.util.zip.ZipOutputStream;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.documentation.UseCases;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.flowfile.attributes.StandardFlowFileMediaType;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.bin.Bin;
import org.apache.nifi.processor.util.bin.BinFiles;
import org.apache.nifi.processor.util.bin.BinManager;
import org.apache.nifi.processor.util.bin.BinProcessingResult;
import org.apache.nifi.processor.util.bin.InsertionLocation;
import org.apache.nifi.processors.standard.MergeRecord;
import org.apache.nifi.processors.standard.SegmentContent;
import org.apache.nifi.processors.standard.merge.AttributeStrategy;
import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil;
import org.apache.nifi.stream.io.NonCloseableOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FlowFilePackager;
import org.apache.nifi.util.FlowFilePackagerV1;
import org.apache.nifi.util.FlowFilePackagerV2;
import org.apache.nifi.util.FlowFilePackagerV3;

@SideEffectFree
@TriggerWhenEmpty
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"merge", "content", "correlation", "tar", "zip", "stream", "concatenation", "archive", "flowfile-stream", "flowfile-stream-v3"})
@CapabilityDescription(value="Merges a Group of FlowFiles together based on a user-defined strategy and packages them into a single FlowFile. It is recommended that the Processor be configured with only a single incoming connection, as Group of FlowFiles will not be created from FlowFiles in different connections. This processor updates the mime.type attribute as appropriate. NOTE: this processor should NOT be configured with Cron Driven for the Scheduling Strategy.")
@ReadsAttributes(value={@ReadsAttribute(attribute="fragment.identifier", description="Applicable only if the <Merge Strategy> property is set to Defragment. All FlowFiles with the same value for this attribute will be bundled together."), @ReadsAttribute(attribute="fragment.index", description="Applicable only if the <Merge Strategy> property is set to Defragment. This attribute indicates the order in which the fragments should be assembled. This attribute must be present on all FlowFiles when using the Defragment Merge Strategy and must be a unique (i.e., unique across all FlowFiles that have the same value for the \"fragment.identifier\" attribute) integer between 0 and the value of the fragment.count attribute. If two or more FlowFiles have the same value for the \"fragment.identifier\" attribute and the same value for the \"fragment.index\" attribute, the first FlowFile processed will be accepted and subsequent FlowFiles will not be accepted into the Bin."), @ReadsAttribute(attribute="fragment.count", description="Applicable only if the <Merge Strategy> property is set to Defragment. This attribute indicates how many FlowFiles should be expected in the given bundle. At least one FlowFile must have this attribute in the bundle. If multiple FlowFiles contain the \"fragment.count\" attribute in a given bundle, all must have the same value."), @ReadsAttribute(attribute="segment.original.filename", description="Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same bundle must have the same value for this attribute. The value of this attribute will be used for the filename of the completed merged FlowFile."), @ReadsAttribute(attribute="tar.permissions", description="Applicable only if the <Merge Format> property is set to TAR. The value of this attribute must be 3 characters; each character must be in the range 0 to 7 (inclusive) and indicates the file permissions that should be used for the FlowFile's TAR entry. If this attribute is missing or has an invalid value, the default value of 644 will be used")})
@WritesAttributes(value={@WritesAttribute(attribute="filename", description="When more than 1 file is merged, the filename comes from the segment.original.filename attribute. If that attribute does not exist in the source FlowFiles, then the filename is set to the number of nanoseconds matching system time. Then a filename extension may be applied:if Merge Format is TAR, then the filename will be appended with .tar, if Merge Format is ZIP, then the filename will be appended with .zip, if Merge Format is FlowFileStream, then the filename will be appended with .pkg"), @WritesAttribute(attribute="merge.count", description="The number of FlowFiles that were merged into this bundle"), @WritesAttribute(attribute="merge.bin.age", description="The age of the bin, in milliseconds, when it was merged and output. Effectively this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output"), @WritesAttribute(attribute="merge.uuid", description="UUID of the merged flow file that will be added to the original flow files attributes."), @WritesAttribute(attribute="merge.reason", description="This processor allows for several thresholds to be configured for merging FlowFiles. This attribute indicates which of the Thresholds resulted in the FlowFiles being merged. For an explanation of each of the possible values and their meanings, see the Processor's Usage / documentation and see the 'Additional Details' page.")})
@SeeAlso(value={SegmentContent.class, MergeRecord.class})
@SystemResourceConsideration(resource=SystemResource.MEMORY, description="While content is not stored in memory, the FlowFiles' attributes are. The configuration of MergeContent (maximum bin size, maximum group size, maximum bin age, max number of entries) will influence how much memory is used. If merging together many small FlowFiles, a two-stage approach may be necessary in order to avoid excessive use of memory.")
@UseCases(value={@UseCase(description="Concatenate FlowFiles with textual content together in order to create fewer, larger FlowFiles.", keywords={"concatenate", "bundle", "aggregate", "bin", "merge", "combine", "smash"}, configuration="\"Merge Strategy\" = \"Bin Packing Algorithm\"\n\"Merge Format\" = \"Binary Concatenation\"\n\"Delimiter Strategy\" = \"Text\"\n\"Demarcator\" = \"\\n\" (a newline can be inserted by pressing Shift + Enter)\n\"Minimum Number of Entries\" = \"1\"\n\"Maximum Number of Entries\" = \"500000000\"\n\"Minimum Group Size\" = the minimum amount of data to write to an output FlowFile. A reasonable value might be \"128 MB\"\n\"Maximum Group Size\" = the maximum amount of data to write to an output FlowFile. A reasonable value might be \"256 MB\"\n\"Max Bin Age\" = the maximum amount of time to wait for incoming data before timing out and transferring the FlowFile along even though it is smaller than the Max Bin Age. A reasonable value might be \"5 mins\"\n"), @UseCase(description="Concatenate FlowFiles with binary content together in order to create fewer, larger FlowFiles.", notes="Not all binary data can be concatenated together. Whether or not this configuration is valid depends on the type of your data.", keywords={"concatenate", "bundle", "aggregate", "bin", "merge", "combine", "smash"}, configuration="\"Merge Strategy\" = \"Bin Packing Algorithm\"\n\"Merge Format\" = \"Binary Concatenation\"\n\"Delimiter Strategy\" = \"Text\"\n\"Minimum Number of Entries\" = \"1\"\n\"Maximum Number of Entries\" = \"500000000\"\n\"Minimum Group Size\" = the minimum amount of data to write to an output FlowFile. A reasonable value might be \"128 MB\"\n\"Maximum Group Size\" = the maximum amount of data to write to an output FlowFile. A reasonable value might be \"256 MB\"\n\"Max Bin Age\" = the maximum amount of time to wait for incoming data before timing out and transferring the FlowFile along even though it is smaller than the Max Bin Age. A reasonable value might be \"5 mins\"\n"), @UseCase(description="Reassemble a FlowFile that was previously split apart into smaller FlowFiles by a processor such as SplitText, UnpackContext, SplitRecord, etc.", keywords={"reassemble", "repack", "merge", "recombine"}, configuration="\"Merge Strategy\" = \"Defragment\"\n\"Merge Format\" = the value of Merge Format depends on the desired output format. If the file was previously zipped together and was split apart by UnpackContent,\n    a Merge Format of \"ZIP\" makes sense. If it was previously a .tar file, a Merge Format of \"TAR\" makes sense. If the data is textual, \"Binary Concatenation\" can be\n    used to combine the text into a single document.\n\"Delimiter Strategy\" = \"Text\"\n\"Max Bin Age\" = the maximum amount of time to wait for incoming data before timing out and transferring the fragments to 'failure'. A reasonable value might be \"5 mins\"\n\nFor textual data, \"Demarcator\" should be set to a newline (\\n), set by pressing Shift+Enter in the UI. For binary data, \"Demarcator\" should be left blank.\n")})
public class MergeContent
extends BinFiles {
    public static final String FRAGMENT_ID_ATTRIBUTE = FragmentAttributes.FRAGMENT_ID.key();
    public static final String FRAGMENT_INDEX_ATTRIBUTE = FragmentAttributes.FRAGMENT_INDEX.key();
    public static final String FRAGMENT_COUNT_ATTRIBUTE = FragmentAttributes.FRAGMENT_COUNT.key();
    public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
    public static final String TAR_PERMISSIONS_ATTRIBUTE = "tar.permissions";
    public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
    public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age";
    public static final String MERGE_UUID_ATTRIBUTE = "merge.uuid";
    public static final String REASON_FOR_MERGING = "merge.reason";
    public static final PropertyDescriptor MERGE_STRATEGY = new PropertyDescriptor.Builder().name("Merge Strategy").description("Specifies the algorithm used to merge content. The 'Defragment' algorithm combines fragments that are associated by attributes back into a single cohesive FlowFile. The 'Bin-Packing Algorithm' generates a FlowFile populated by arbitrarily chosen FlowFiles").required(true).allowableValues(MergeStrategy.class).defaultValue((DescribedValue)MergeStrategy.BIN_PACK).build();
    public static final PropertyDescriptor MERGE_FORMAT = new PropertyDescriptor.Builder().required(true).name("Merge Format").description("Determines the format that will be used to merge the content.").allowableValues(MergeFormat.class).defaultValue((DescribedValue)MergeFormat.CONCAT).build();
    public static final PropertyDescriptor METADATA_STRATEGY = new PropertyDescriptor.Builder().required(true).name("mergecontent-metadata-strategy").displayName("Metadata Strategy").description("For FlowFiles whose input format supports metadata (Avro, e.g.), this property determines which metadata should be added to the bundle. If 'Use First Metadata' is selected, the metadata keys/values from the first FlowFile to be bundled will be used. If 'Keep Only Common Metadata' is selected, only the metadata that exists on all FlowFiles in the bundle, with the same value, will be preserved. If 'Ignore Metadata' is selected, no metadata is transferred to the outgoing bundled FlowFile. If 'Do Not Merge Uncommon Metadata' is selected, any FlowFile whose metadata values do not match those of the first bundled FlowFile will not be merged.").allowableValues(MetadataStrategy.class).defaultValue((DescribedValue)MetadataStrategy.DO_NOT_MERGE).dependsOn(MERGE_FORMAT, (DescribedValue)MergeFormat.AVRO, new DescribedValue[0]).build();
    public static final PropertyDescriptor CORRELATION_ATTRIBUTE_NAME = new PropertyDescriptor.Builder().name("Correlation Attribute Name").description("If specified, like FlowFiles will be binned together, where 'like FlowFiles' means FlowFiles that have the same value for this Attribute. If not specified, FlowFiles are bundled by the order in which they are pulled from the queue.").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR).dependsOn(MERGE_STRATEGY, (DescribedValue)MergeStrategy.BIN_PACK, new DescribedValue[0]).build();
    public static final PropertyDescriptor DELIMITER_STRATEGY = new PropertyDescriptor.Builder().required(true).name("Delimiter Strategy").description("Determines if Header, Footer, and Demarcator should point to files containing the respective content, or if the values of the properties should be used as the content.").allowableValues(DelimiterStrategy.class).defaultValue((DescribedValue)DelimiterStrategy.NONE).dependsOn(MERGE_FORMAT, (DescribedValue)MergeFormat.CONCAT, new DescribedValue[0]).build();
    public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder().name("Header File").displayName("Header").description("Filename or text specifying the header to use. If not specified, no header is supplied.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dependsOn(DELIMITER_STRATEGY, (DescribedValue)DelimiterStrategy.FILENAME, new DescribedValue[]{DelimiterStrategy.TEXT}).dependsOn(MERGE_FORMAT, (DescribedValue)MergeFormat.CONCAT, new DescribedValue[0]).identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, new ResourceType[]{ResourceType.TEXT}).build();
    public static final PropertyDescriptor FOOTER = new PropertyDescriptor.Builder().name("Footer File").displayName("Footer").description("Filename or text specifying the footer to use. If not specified, no footer is supplied.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dependsOn(DELIMITER_STRATEGY, (DescribedValue)DelimiterStrategy.FILENAME, new DescribedValue[]{DelimiterStrategy.TEXT}).dependsOn(MERGE_FORMAT, (DescribedValue)MergeFormat.CONCAT, new DescribedValue[0]).identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, new ResourceType[]{ResourceType.TEXT}).build();
    public static final PropertyDescriptor DEMARCATOR = new PropertyDescriptor.Builder().name("Demarcator File").displayName("Demarcator").description("Filename or text specifying the demarcator to use. If not specified, no demarcator is supplied.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dependsOn(DELIMITER_STRATEGY, (DescribedValue)DelimiterStrategy.FILENAME, new DescribedValue[]{DelimiterStrategy.TEXT}).dependsOn(MERGE_FORMAT, (DescribedValue)MergeFormat.CONCAT, new DescribedValue[0]).identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, new ResourceType[]{ResourceType.TEXT}).build();
    public static final PropertyDescriptor COMPRESSION_LEVEL = new PropertyDescriptor.Builder().name("Compression Level").description("Specifies the compression level to use when using the Zip Merge Format; if not using the Zip Merge Format, this value is ignored").required(true).allowableValues(new String[]{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}).defaultValue("1").dependsOn(MERGE_FORMAT, (DescribedValue)MergeFormat.ZIP, new DescribedValue[0]).build();
    public static final PropertyDescriptor KEEP_PATH = new PropertyDescriptor.Builder().name("Keep Path").description("If using the Zip or Tar Merge Format, specifies whether or not the FlowFiles' paths should be included in their entry names.").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("false").dependsOn(MERGE_FORMAT, (DescribedValue)MergeFormat.TAR, new DescribedValue[]{MergeFormat.ZIP}).build();
    public static final PropertyDescriptor TAR_MODIFIED_TIME = new PropertyDescriptor.Builder().name("Tar Modified Time").description("If using the Tar Merge Format, specifies if the Tar entry should store the modified timestamp either by expression (e.g. ${file.lastModifiedTime} or static value, both of which must match the ISO8601 format 'yyyy-MM-dd'T'HH:mm:ssZ'.").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("${file.lastModifiedTime}").dependsOn(MERGE_FORMAT, (DescribedValue)MergeFormat.TAR, new DescribedValue[0]).build();
    public static final PropertyDescriptor BIN_TERMINATION_CHECK = new PropertyDescriptor.Builder().name("Bin Termination Check").description("Specifies an Expression Language Expression that is to be evaluated against each FlowFile. If the result of the expression is 'true', the\nbin that the FlowFile corresponds to will be terminated, even if the bin has not met the minimum number of entries or minimum size.\nNote that if the FlowFile that triggers the termination of the bin is itself larger than the Maximum Bin Size, it will be placed into its\nown bin without triggering the termination of any other bin. When using this property, it is recommended to use Prioritizers in the flow's\nconnections to ensure that the ordering is as desired.\n").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.createAttributeExpressionLanguageValidator((AttributeExpression.ResultType)AttributeExpression.ResultType.BOOLEAN, (boolean)false)).dependsOn(MERGE_STRATEGY, (DescribedValue)MergeStrategy.BIN_PACK, new DescribedValue[0]).build();
    public static final PropertyDescriptor FLOWFILE_INSERTION_STRATEGY = new PropertyDescriptor.Builder().name("FlowFile Insertion Strategy").description("If a given FlowFile terminates the bin based on the <Bin Termination Check> property, specifies where the FlowFile should be included in the bin.").required(true).dependsOn(BIN_TERMINATION_CHECK, new AllowableValue[0]).defaultValue((DescribedValue)InsertionLocation.LAST_IN_BIN).allowableValues(InsertionLocation.class).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(MERGE_STRATEGY, MERGE_FORMAT, AttributeStrategyUtil.ATTRIBUTE_STRATEGY, CORRELATION_ATTRIBUTE_NAME, METADATA_STRATEGY, MergeContent.addBinPackingDependency(MIN_ENTRIES), MergeContent.addBinPackingDependency(MAX_ENTRIES), MergeContent.addBinPackingDependency(MIN_SIZE), MergeContent.addBinPackingDependency(MAX_SIZE), BIN_TERMINATION_CHECK, FLOWFILE_INSERTION_STRATEGY, MAX_BIN_AGE, MAX_BIN_COUNT, DELIMITER_STRATEGY, HEADER, FOOTER, DEMARCATOR, COMPRESSION_LEVEL, KEEP_PATH, TAR_MODIFIED_TIME);
    public static final Relationship REL_MERGED = new Relationship.Builder().name("merged").description("The FlowFile containing the merged content").build();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_ORIGINAL, REL_FAILURE, REL_MERGED);
    public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");

    private static PropertyDescriptor addBinPackingDependency(PropertyDescriptor original) {
        return new PropertyDescriptor.Builder().fromPropertyDescriptor(original).dependsOn(MERGE_STRATEGY, (DescribedValue)MergeStrategy.BIN_PACK, new DescribedValue[0]).build();
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    protected Collection<ValidationResult> additionalCustomValidation(ValidationContext context) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>();
        if (context.getProperty(MERGE_FORMAT).asAllowableValue(MergeFormat.class) == MergeFormat.CONCAT && context.getProperty(DELIMITER_STRATEGY).asAllowableValue(DelimiterStrategy.class) == DelimiterStrategy.FILENAME) {
            String demarcatorValue;
            String footerValue;
            String headerValue = context.getProperty(HEADER).getValue();
            if (headerValue != null) {
                results.add(StandardValidators.FILE_EXISTS_VALIDATOR.validate(HEADER.getName(), headerValue, context));
            }
            if ((footerValue = context.getProperty(FOOTER).getValue()) != null) {
                results.add(StandardValidators.FILE_EXISTS_VALIDATOR.validate(FOOTER.getName(), footerValue, context));
            }
            if ((demarcatorValue = context.getProperty(DEMARCATOR).getValue()) != null) {
                results.add(StandardValidators.FILE_EXISTS_VALIDATOR.validate(DEMARCATOR.getName(), demarcatorValue, context));
            }
        }
        return results;
    }

    private byte[] readContent(String filename) throws IOException {
        return Files.readAllBytes(Paths.get(filename, new String[0]));
    }

    protected FlowFile preprocessFlowFile(ProcessContext context, ProcessSession session, FlowFile flowFile) {
        return flowFile;
    }

    protected String getGroupId(ProcessContext context, FlowFile flowFile, ProcessSession session) {
        return switch (((MergeStrategy)context.getProperty(MERGE_STRATEGY).asAllowableValue(MergeStrategy.class)).ordinal()) {
            default -> throw new MatchException(null, null);
            case 0 -> {
                String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).evaluateAttributeExpressions(flowFile).getValue();
                if (correlationAttributeName == null) {
                    yield null;
                }
                yield flowFile.getAttribute(correlationAttributeName);
            }
            case 1 -> flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
        };
    }

    protected void setUpBinManager(BinManager binManager, ProcessContext context) {
        switch (((MergeStrategy)context.getProperty(MERGE_STRATEGY).asAllowableValue(MergeStrategy.class)).ordinal()) {
            case 0: {
                binManager.setFileCountAttribute(null);
                PropertyValue terminationCheck = context.getProperty(BIN_TERMINATION_CHECK);
                if (!terminationCheck.isSet()) break;
                InsertionLocation insertionLocation = (InsertionLocation)context.getProperty(FLOWFILE_INSERTION_STRATEGY).asAllowableValue(InsertionLocation.class);
                Predicate<FlowFile> predicate = flowFile -> terminationCheck.evaluateAttributeExpressions(flowFile).asBoolean();
                binManager.setBinTermination(predicate, insertionLocation);
                break;
            }
            case 1: {
                binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
            }
        }
    }

    protected BinProcessingResult processBin(Bin bin, ProcessContext context) throws ProcessException {
        BinProcessingResult binProcessingResult = new BinProcessingResult(true);
        MergeBin merger = switch (((MergeFormat)context.getProperty(MERGE_FORMAT).asAllowableValue(MergeFormat.class)).ordinal()) {
            default -> throw new MatchException(null, null);
            case 0 -> new TarMerge();
            case 1 -> new ZipMerge(context.getProperty(COMPRESSION_LEVEL).asInteger());
            case 2 -> new FlowFileStreamMerger((FlowFilePackager)new FlowFilePackagerV3(), StandardFlowFileMediaType.VERSION_3.getMediaType());
            case 3 -> new FlowFileStreamMerger((FlowFilePackager)new FlowFilePackagerV2(), StandardFlowFileMediaType.VERSION_2.getMediaType());
            case 4 -> new FlowFileStreamMerger((FlowFilePackager)new FlowFilePackagerV1(), StandardFlowFileMediaType.VERSION_1.getMediaType());
            case 5 -> new BinaryConcatenationMerge();
            case 6 -> new AvroMerge();
        };
        AttributeStrategy attributeStrategy = AttributeStrategyUtil.strategyFor(context);
        List contents = bin.getContents();
        ProcessSession binSession = bin.getSession();
        if (context.getProperty(MERGE_STRATEGY).asAllowableValue(MergeStrategy.class) == MergeStrategy.DEFRAGMENT) {
            String error = this.getDefragmentValidationError(bin.getContents());
            if (error != null) {
                Object binDescription = contents.size() <= 10 ? contents.toString() : contents.size() + " FlowFiles";
                this.getLogger().error("{}; routing {} to failure", new Object[]{error, binDescription});
                binSession.transfer((Collection)contents, REL_FAILURE);
                binSession.commitAsync();
                return binProcessingResult;
            }
            contents.sort(new FragmentComparator());
        }
        FlowFile bundle = merger.merge(bin, context);
        String filename = bundle.getAttribute(CoreAttributes.FILENAME.key());
        Map<String, String> bundleAttributes = attributeStrategy.getMergedAttributes(contents);
        bundleAttributes.put(CoreAttributes.MIME_TYPE.key(), merger.getMergedContentType());
        bundleAttributes.put(CoreAttributes.FILENAME.key(), filename);
        bundleAttributes.put(MERGE_COUNT_ATTRIBUTE, Integer.toString(contents.size()));
        bundleAttributes.put(MERGE_BIN_AGE_ATTRIBUTE, Long.toString(bin.getBinAge()));
        bundleAttributes.put(REASON_FOR_MERGING, bin.getEvictionReason().name());
        bundle = binSession.putAllAttributes(bundle, bundleAttributes);
        Object inputDescription = contents.size() < 10 ? contents.toString() : contents.size() + " FlowFiles";
        this.getLogger().info("Merged {} into {}. Reason for merging: {}", new Object[]{inputDescription, bundle, bin.getEvictionReason()});
        binSession.transfer(bundle, REL_MERGED);
        binProcessingResult.getAttributes().put(MERGE_UUID_ATTRIBUTE, bundle.getAttribute(CoreAttributes.UUID.key()));
        for (FlowFile unmerged : merger.getUnmergedFlowFiles()) {
            FlowFile unmergedCopy = binSession.clone(unmerged);
            binSession.transfer(unmergedCopy, REL_FAILURE);
        }
        binProcessingResult.setCommitted(false);
        return binProcessingResult;
    }

    protected int getMinEntries(PropertyContext context) {
        return switch (((MergeStrategy)context.getProperty(MERGE_STRATEGY).asAllowableValue(MergeStrategy.class)).ordinal()) {
            default -> throw new MatchException(null, null);
            case 0 -> super.getMinEntries(context);
            case 1 -> Integer.MAX_VALUE;
        };
    }

    protected int getMaxEntries(PropertyContext context) {
        return switch (((MergeStrategy)context.getProperty(MERGE_STRATEGY).asAllowableValue(MergeStrategy.class)).ordinal()) {
            default -> throw new MatchException(null, null);
            case 0 -> super.getMaxEntries(context);
            case 1 -> Integer.MAX_VALUE;
        };
    }

    protected long getMinBytes(PropertyContext context) {
        return switch (((MergeStrategy)context.getProperty(MERGE_STRATEGY).asAllowableValue(MergeStrategy.class)).ordinal()) {
            default -> throw new MatchException(null, null);
            case 0 -> super.getMinBytes(context);
            case 1 -> 0L;
        };
    }

    protected long getMaxBytes(PropertyContext context) {
        return switch (((MergeStrategy)context.getProperty(MERGE_STRATEGY).asAllowableValue(MergeStrategy.class)).ordinal()) {
            default -> throw new MatchException(null, null);
            case 0 -> super.getMaxBytes(context);
            case 1 -> Long.MAX_VALUE;
        };
    }

    private String getDefragmentValidationError(List<FlowFile> binContents) {
        int numericFragmentCount;
        if (binContents.isEmpty()) {
            return null;
        }
        String decidedFragmentCount = null;
        String fragmentIdentifier = null;
        for (FlowFile flowFile : binContents) {
            String fragmentIndex = flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE);
            if (!this.isNumber(fragmentIndex)) {
                return "Cannot Defragment " + String.valueOf(flowFile) + " because it does not have an integer value for the " + FRAGMENT_INDEX_ATTRIBUTE + " attribute";
            }
            fragmentIdentifier = flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
            String fragmentCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTRIBUTE);
            if (fragmentCount == null) continue;
            if (!this.isNumber(fragmentCount)) {
                return "Cannot Defragment " + String.valueOf(flowFile) + " because it does not have an integer value for the " + FRAGMENT_COUNT_ATTRIBUTE + " attribute";
            }
            if (decidedFragmentCount == null) {
                decidedFragmentCount = fragmentCount;
                continue;
            }
            if (decidedFragmentCount.equals(fragmentCount)) continue;
            return "Cannot Defragment " + String.valueOf(flowFile) + " because it is grouped with another FlowFile, and the two have differing values for the " + FRAGMENT_COUNT_ATTRIBUTE + " attribute: " + decidedFragmentCount + " and " + fragmentCount;
        }
        if (decidedFragmentCount == null) {
            return "Cannot Defragment FlowFiles with Fragment Identifier " + fragmentIdentifier + " because no FlowFile arrived with the " + FRAGMENT_COUNT_ATTRIBUTE + " attribute and the expected number of fragments is unknown";
        }
        try {
            numericFragmentCount = Integer.parseInt(decidedFragmentCount);
        }
        catch (NumberFormatException nfe) {
            return "Cannot Defragment FlowFiles with Fragment Identifier " + fragmentIdentifier + " because the " + FRAGMENT_COUNT_ATTRIBUTE + " has a non-integer value of " + decidedFragmentCount;
        }
        if (binContents.size() < numericFragmentCount) {
            return "Cannot Defragment FlowFiles with Fragment Identifier " + fragmentIdentifier + " because the expected number of fragments is " + decidedFragmentCount + " but found only " + binContents.size() + " fragments";
        }
        if (binContents.size() > numericFragmentCount) {
            return "Cannot Defragment FlowFiles with Fragment Identifier " + fragmentIdentifier + " because the expected number of fragments is " + decidedFragmentCount + " but found " + binContents.size() + " fragments for this identifier";
        }
        return null;
    }

    private boolean isNumber(String value) {
        if (value == null) {
            return false;
        }
        return NUMBER_PATTERN.matcher(value).matches();
    }

    private void removeFlowFileFromSession(ProcessSession session, FlowFile flowFile, ProcessContext context) {
        try {
            session.remove(flowFile);
        }
        catch (Exception e) {
            this.getLogger().error("Failed to remove merged FlowFile from the session after merge failure during \"" + context.getProperty(MERGE_FORMAT).getValue() + "\" merge.", (Throwable)e);
        }
    }

    private String getPath(FlowFile flowFile) {
        Path path = Paths.get(flowFile.getAttribute(CoreAttributes.PATH.key()), new String[0]);
        if (path.getNameCount() == 0) {
            return "";
        }
        if (".".equals(path.getName(0).toString())) {
            path = path.getNameCount() == 1 ? null : path.subpath(1, path.getNameCount());
        }
        return path == null ? "" : String.valueOf(path) + "/";
    }

    private String createFilename(List<FlowFile> flowFiles) {
        if (flowFiles.size() == 1) {
            return flowFiles.getFirst().getAttribute(CoreAttributes.FILENAME.key());
        }
        FlowFile ff = flowFiles.getFirst();
        String origFilename = ff.getAttribute(SEGMENT_ORIGINAL_FILENAME);
        if (origFilename != null) {
            return origFilename;
        }
        return String.valueOf(System.nanoTime());
    }

    public static enum MergeStrategy implements DescribedValue
    {
        BIN_PACK("Bin-Packing Algorithm", "Generates 'bins' of FlowFiles and fills each bin as full as possible. FlowFiles are placed into a bin based on their size and optionally their attributes (if the <Correlation Attribute> property is set)"),
        DEFRAGMENT("Defragment", "Combines fragments that are associated by attributes back into a single cohesive FlowFile. If using this strategy, all FlowFiles must have the attributes <fragment.identifier>, <fragment.count>, and <fragment.index>. All FlowFiles with the same value for \"fragment.identifier\" will be grouped together. All FlowFiles in this group must have the same value for the \"fragment.count\" attribute. All FlowFiles in this group must have a unique value for the \"fragment.index\" attribute between 0 and the value of the \"fragment.count\" attribute.");

        private final String value;
        private final String description;

        private MergeStrategy(String value, String description) {
            this.value = value;
            this.description = description;
        }

        public String getValue() {
            return this.value;
        }

        public String getDisplayName() {
            return this.value;
        }

        public String getDescription() {
            return this.description;
        }
    }

    public static enum MergeFormat implements DescribedValue
    {
        TAR("TAR", "A bin of FlowFiles will be combined into a single TAR file. The FlowFiles' <path> attribute will be used to create a directory in the TAR file if the <Keep Paths> property is set to true; otherwise, all FlowFiles will be added at the root of the TAR file. If a FlowFile has an attribute named <tar.permissions> that is 3 characters, each between 0-7, that attribute will be used as the TAR entry's 'mode'."),
        ZIP("ZIP", "A bin of FlowFiles will be combined into a single ZIP file. The FlowFiles' <path> attribute will be used to create a directory in the ZIP file if the <Keep Paths> property is set to true; otherwise, all FlowFiles will be added at the root of the ZIP file. The <Compression Level> property indicates the ZIP compression to use."),
        FLOWFILE_STREAM_V3("FlowFile Stream, v3", "A bin of FlowFiles will be combined into a single Version 3 FlowFile Stream"),
        FLOWFILE_STREAM_V2("FlowFile Stream, v2", "A bin of FlowFiles will be combined into a single Version 2 FlowFile Stream"),
        FLOWFILE_TAR_V1("FlowFile Tar, v1", "A bin of FlowFiles will be combined into a single Version 1 FlowFile Package"),
        CONCAT("Binary Concatenation", "The contents of all FlowFiles will be concatenated together into a single FlowFile"),
        AVRO("Avro", "The Avro contents of all FlowFiles will be concatenated together into a single FlowFile");

        private final String value;
        private final String description;

        private MergeFormat(String value, String description) {
            this.value = value;
            this.description = description;
        }

        public String getValue() {
            return this.value;
        }

        public String getDisplayName() {
            return this.value;
        }

        public String getDescription() {
            return this.description;
        }
    }

    public static enum DelimiterStrategy implements DescribedValue
    {
        FILENAME("Filename", "The values of Header, Footer, and Demarcator will be retrieved from the contents of a file"),
        TEXT("Text", "The values of Header, Footer, and Demarcator will be specified as property values"),
        NONE("Do Not Use Delimiters", "No Header, Footer, or Demarcator will be used");

        private final String value;
        private final String description;

        private DelimiterStrategy(String value, String description) {
            this.value = value;
            this.description = description;
        }

        public String getValue() {
            return this.value;
        }

        public String getDisplayName() {
            return this.value;
        }

        public String getDescription() {
            return this.description;
        }
    }

    private class TarMerge
    implements MergeBin {
        private TarMerge() {
        }

        @Override
        public FlowFile merge(Bin bin, ProcessContext context) {
            List contents = bin.getContents();
            ProcessSession session = bin.getSession();
            boolean keepPath = context.getProperty(KEEP_PATH).asBoolean();
            FlowFile bundle = session.create();
            try {
                bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), MergeContent.this.createFilename(contents) + ".tar");
                bundle = session.write(bundle, rawOut -> {
                    try (BufferedOutputStream bufferedOut = new BufferedOutputStream(rawOut);
                         TarArchiveOutputStream out = new TarArchiveOutputStream((OutputStream)bufferedOut);){
                        out.setLongFileMode(2);
                        if (this.getMaxEntrySize(contents) >= 0x1FFFFFFFFL) {
                            out.setBigNumberMode(2);
                        }
                        for (FlowFile flowFile : contents) {
                            String modTime;
                            String path = keepPath ? MergeContent.this.getPath(flowFile) : "";
                            String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key());
                            TarArchiveEntry tarEntry = new TarArchiveEntry(entryName);
                            tarEntry.setSize(flowFile.getSize());
                            String permissionsVal = flowFile.getAttribute(MergeContent.TAR_PERMISSIONS_ATTRIBUTE);
                            if (permissionsVal != null) {
                                try {
                                    tarEntry.setMode(Integer.parseInt(permissionsVal));
                                }
                                catch (Exception e) {
                                    MergeContent.this.getLogger().debug("Attribute {} of {} is set to {}; expected 3 digits between 0-7, so ignoring", new Object[]{MergeContent.TAR_PERMISSIONS_ATTRIBUTE, flowFile, permissionsVal});
                                }
                            }
                            if (StringUtils.isNotBlank((CharSequence)(modTime = context.getProperty(TAR_MODIFIED_TIME).evaluateAttributeExpressions(flowFile).getValue()))) {
                                try {
                                    tarEntry.setModTime(Instant.parse(modTime).toEpochMilli());
                                }
                                catch (Exception e) {
                                    MergeContent.this.getLogger().debug("Attribute {} of {} is set to {}; expected ISO8601 format, so ignoring", new Object[]{TAR_MODIFIED_TIME, flowFile, modTime});
                                }
                            }
                            out.putArchiveEntry(tarEntry);
                            bin.getSession().exportTo(flowFile, (OutputStream)out);
                            out.closeArchiveEntry();
                        }
                    }
                });
            }
            catch (Exception e) {
                MergeContent.this.removeFlowFileFromSession(session, bundle, context);
                throw e;
            }
            bin.getSession().getProvenanceReporter().join((Collection)contents, bundle);
            return bundle;
        }

        private long getMaxEntrySize(List<FlowFile> contents) {
            OptionalLong maxSize = ((Stream)contents.stream().parallel()).mapToLong(FlowFile::getSize).max();
            return maxSize.orElse(0L);
        }

        @Override
        public String getMergedContentType() {
            return "application/tar";
        }

        @Override
        public List<FlowFile> getUnmergedFlowFiles() {
            return Collections.emptyList();
        }
    }

    private class ZipMerge
    implements MergeBin {
        private final int compressionLevel;
        private final List<FlowFile> unmerged = new ArrayList<FlowFile>();

        public ZipMerge(int compressionLevel) {
            this.compressionLevel = compressionLevel;
        }

        @Override
        public FlowFile merge(Bin bin, ProcessContext context) {
            boolean keepPath = context.getProperty(KEEP_PATH).asBoolean();
            ProcessSession session = bin.getSession();
            List contents = bin.getContents();
            this.unmerged.addAll(contents);
            FlowFile bundle = session.create((Collection)contents);
            try {
                bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), MergeContent.this.createFilename(contents) + ".zip");
                bundle = session.write(bundle, rawOut -> {
                    try (BufferedOutputStream bufferedOut = new BufferedOutputStream(rawOut);
                         ZipOutputStream out = new ZipOutputStream(bufferedOut);){
                        out.setLevel(this.compressionLevel);
                        for (FlowFile flowFile : contents) {
                            String path = keepPath ? MergeContent.this.getPath(flowFile) : "";
                            String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key());
                            ZipEntry zipEntry = new ZipEntry(entryName);
                            zipEntry.setSize(flowFile.getSize());
                            try {
                                out.putNextEntry(zipEntry);
                                bin.getSession().exportTo(flowFile, (OutputStream)out);
                                out.closeEntry();
                                this.unmerged.remove(flowFile);
                            }
                            catch (ZipException e) {
                                MergeContent.this.getLogger().error("Encountered exception merging {}", new Object[]{flowFile, e});
                            }
                        }
                        out.finish();
                        out.flush();
                    }
                });
            }
            catch (Exception e) {
                MergeContent.this.removeFlowFileFromSession(session, bundle, context);
                throw e;
            }
            session.getProvenanceReporter().join((Collection)contents, bundle);
            return bundle;
        }

        @Override
        public String getMergedContentType() {
            return "application/zip";
        }

        @Override
        public List<FlowFile> getUnmergedFlowFiles() {
            return this.unmerged;
        }
    }

    private class FlowFileStreamMerger
    implements MergeBin {
        private final FlowFilePackager packager;
        private final String mimeType;

        public FlowFileStreamMerger(FlowFilePackager packager, String mimeType) {
            this.packager = packager;
            this.mimeType = mimeType;
        }

        @Override
        public FlowFile merge(Bin bin, ProcessContext context) {
            ProcessSession session = bin.getSession();
            List contents = bin.getContents();
            FlowFile bundle = session.create((Collection)contents);
            try {
                bundle = session.write(bundle, rawOut -> {
                    try (BufferedOutputStream bufferedOut = new BufferedOutputStream(rawOut);){
                        NonCloseableOutputStream out = new NonCloseableOutputStream((OutputStream)bufferedOut);
                        for (FlowFile flowFile : contents) {
                            bin.getSession().read(flowFile, arg_0 -> this.lambda$merge$0(flowFile, (OutputStream)out, arg_0));
                        }
                    }
                });
            }
            catch (Exception e) {
                MergeContent.this.removeFlowFileFromSession(session, bundle, context);
                throw e;
            }
            bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), MergeContent.this.createFilename(contents) + ".pkg");
            session.getProvenanceReporter().join((Collection)contents, bundle);
            return bundle;
        }

        @Override
        public String getMergedContentType() {
            return this.mimeType;
        }

        @Override
        public List<FlowFile> getUnmergedFlowFiles() {
            return Collections.emptyList();
        }

        private /* synthetic */ void lambda$merge$0(FlowFile flowFile, OutputStream out, InputStream rawIn) throws IOException {
            try (BufferedInputStream in = new BufferedInputStream(rawIn);){
                HashMap attributes = new HashMap(flowFile.getAttributes());
                this.packager.packageFlowFile((InputStream)in, out, attributes, flowFile.getSize());
            }
        }
    }

    private class BinaryConcatenationMerge
    implements MergeBin {
        private String mimeType = "application/octet-stream";

        @Override
        public FlowFile merge(Bin bin, ProcessContext context) {
            List contents = bin.getContents();
            ProcessSession session = bin.getSession();
            FlowFile bundle = session.create((Collection)bin.getContents());
            AtomicReference<Object> bundleMimeTypeRef = new AtomicReference<Object>(null);
            try {
                bundle = session.write(bundle, out -> {
                    byte[] header = this.getDelimiterContent(context, contents, HEADER);
                    if (header != null) {
                        out.write(header);
                    }
                    byte[] demarcator = this.getDelimiterContent(context, contents, DEMARCATOR);
                    boolean isFirst = true;
                    Iterator itr = contents.iterator();
                    while (itr.hasNext()) {
                        FlowFile flowFile = (FlowFile)itr.next();
                        bin.getSession().read(flowFile, in -> StreamUtils.copy((InputStream)in, (OutputStream)out));
                        if (itr.hasNext() && demarcator != null) {
                            out.write(demarcator);
                        }
                        String flowFileMimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
                        if (isFirst) {
                            bundleMimeTypeRef.set(flowFileMimeType);
                            isFirst = false;
                            continue;
                        }
                        if (bundleMimeTypeRef.get() == null || ((String)bundleMimeTypeRef.get()).equals(flowFileMimeType)) continue;
                        bundleMimeTypeRef.set(null);
                    }
                    byte[] footer = this.getDelimiterContent(context, contents, FOOTER);
                    if (footer != null) {
                        out.write(footer);
                    }
                });
            }
            catch (Exception e) {
                MergeContent.this.removeFlowFileFromSession(session, bundle, context);
                throw e;
            }
            session.getProvenanceReporter().join((Collection)contents, bundle);
            bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), MergeContent.this.createFilename(contents));
            if (bundleMimeTypeRef.get() != null) {
                this.mimeType = bundleMimeTypeRef.get();
            }
            return bundle;
        }

        private byte[] getDelimiterContent(ProcessContext context, List<FlowFile> wrappers, PropertyDescriptor descriptor) throws IOException {
            return switch (((DelimiterStrategy)context.getProperty(DELIMITER_STRATEGY).asAllowableValue(DelimiterStrategy.class)).ordinal()) {
                default -> throw new MatchException(null, null);
                case 0 -> this.getDelimiterFileContent(context, wrappers, descriptor);
                case 1 -> this.getDelimiterTextContent(context, wrappers, descriptor);
                case 2 -> null;
            };
        }

        private byte[] getDelimiterFileContent(ProcessContext context, List<FlowFile> flowFiles, PropertyDescriptor descriptor) throws IOException {
            String value;
            FlowFile flowFile;
            byte[] property = null;
            if (flowFiles != null && !flowFiles.isEmpty() && (flowFile = flowFiles.getFirst()) != null && (value = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue()) != null) {
                property = MergeContent.this.readContent(value);
            }
            return property;
        }

        private byte[] getDelimiterTextContent(ProcessContext context, List<FlowFile> flowFiles, PropertyDescriptor descriptor) {
            String value;
            FlowFile flowFile;
            byte[] property = null;
            if (flowFiles != null && !flowFiles.isEmpty() && (flowFile = flowFiles.getFirst()) != null && (value = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue()) != null) {
                property = value.getBytes(StandardCharsets.UTF_8);
            }
            return property;
        }

        @Override
        public String getMergedContentType() {
            return this.mimeType;
        }

        @Override
        public List<FlowFile> getUnmergedFlowFiles() {
            return Collections.emptyList();
        }
    }

    private class AvroMerge
    implements MergeBin {
        private final List<FlowFile> unmerged = new ArrayList<FlowFile>();

        private AvroMerge() {
        }

        @Override
        public FlowFile merge(Bin bin, ProcessContext context) {
            Collection parents;
            ProcessSession session = bin.getSession();
            List contents = bin.getContents();
            MetadataStrategy metadataStrategy = (MetadataStrategy)context.getProperty(METADATA_STRATEGY).asAllowableValue(MetadataStrategy.class);
            TreeMap metadata = new TreeMap();
            AtomicReference<Object> schema = new AtomicReference<Object>(null);
            AtomicReference<Object> inputCodec = new AtomicReference<Object>(null);
            DataFileWriter writer = new DataFileWriter((DatumWriter)new GenericDatumWriter());
            FlowFile bundle = session.create((Collection)contents);
            try {
                bundle = session.write(bundle, rawOut -> {
                    try (BufferedOutputStream out = new BufferedOutputStream(rawOut);){
                        for (FlowFile flowFile : contents) {
                            bin.getSession().read(flowFile, in -> {
                                boolean canMerge = true;
                                try (DataFileStream reader = new DataFileStream(in, (DatumReader)new GenericDatumReader());){
                                    if (schema.get() == null) {
                                        schema.set(reader.getSchema());
                                        if (metadataStrategy != MetadataStrategy.IGNORE) {
                                            for (String key : reader.getMetaKeys()) {
                                                if (DataFileWriter.isReservedMeta((String)key)) continue;
                                                byte[] metadatum = reader.getMeta(key);
                                                metadata.put(key, metadatum);
                                                writer.setMeta(key, metadatum);
                                            }
                                        }
                                        inputCodec.set(reader.getMetaString("avro.codec"));
                                        if (inputCodec.get() == null) {
                                            inputCodec.set("null");
                                        }
                                        writer.setCodec(CodecFactory.fromString((String)((String)inputCodec.get())));
                                        writer.create((Schema)schema.get(), out);
                                    } else {
                                        String thisCodec;
                                        if (!((Schema)schema.get()).equals((Object)reader.getSchema())) {
                                            MergeContent.this.getLogger().debug("Input file {} has different schema - {}, not merging", new Object[]{flowFile.getId(), reader.getSchema().getName()});
                                            canMerge = false;
                                            this.unmerged.add(flowFile);
                                        }
                                        if (metadataStrategy == MetadataStrategy.DO_NOT_MERGE || metadataStrategy == MetadataStrategy.ALL_COMMON) {
                                            for (String key : reader.getMetaKeys()) {
                                                byte[] writersMetadatum;
                                                byte[] metadatum;
                                                if (DataFileWriter.isReservedMeta((String)key) || Arrays.equals(metadatum = reader.getMeta(key), writersMetadatum = (byte[])metadata.get(key)) || metadataStrategy == MetadataStrategy.ALL_COMMON && writersMetadatum == null) continue;
                                                MergeContent.this.getLogger().debug("Input file {} has different non-reserved metadata, not merging", new Object[]{flowFile.getId()});
                                                canMerge = false;
                                                this.unmerged.add(flowFile);
                                            }
                                        }
                                        if ((thisCodec = reader.getMetaString("avro.codec")) == null) {
                                            thisCodec = "null";
                                        }
                                        if (!((String)inputCodec.get()).equals(thisCodec)) {
                                            MergeContent.this.getLogger().debug("Input file {} has different codec, not merging", new Object[]{flowFile.getId()});
                                            canMerge = false;
                                            this.unmerged.add(flowFile);
                                        }
                                    }
                                    if (canMerge) {
                                        writer.appendAllFrom(reader, false);
                                    }
                                }
                            });
                        }
                        writer.flush();
                    }
                    finally {
                        writer.close();
                    }
                });
            }
            catch (Exception e) {
                MergeContent.this.removeFlowFileFromSession(session, bundle, context);
                throw e;
            }
            if (this.unmerged.isEmpty()) {
                parents = contents;
            } else {
                parents = new HashSet(contents);
                parents.removeAll(this.unmerged);
            }
            session.getProvenanceReporter().join(parents, bundle);
            return bundle;
        }

        @Override
        public String getMergedContentType() {
            return "application/avro-binary";
        }

        @Override
        public List<FlowFile> getUnmergedFlowFiles() {
            return this.unmerged;
        }
    }

    private static class FragmentComparator
    implements Comparator<FlowFile> {
        private FragmentComparator() {
        }

        @Override
        public int compare(FlowFile o1, FlowFile o2) {
            int fragmentIndex1 = Integer.parseInt(o1.getAttribute(FRAGMENT_INDEX_ATTRIBUTE));
            int fragmentIndex2 = Integer.parseInt(o2.getAttribute(FRAGMENT_INDEX_ATTRIBUTE));
            return Integer.compare(fragmentIndex1, fragmentIndex2);
        }
    }

    private static interface MergeBin {
        public FlowFile merge(Bin var1, ProcessContext var2);

        public String getMergedContentType();

        public List<FlowFile> getUnmergedFlowFiles();
    }

    public static enum MetadataStrategy implements DescribedValue
    {
        USE_FIRST("Use First Metadata", "For any input format that supports metadata (Avro, e.g.), the metadata for the first FlowFile in the bin will be set on the output FlowFile."),
        ALL_COMMON("Keep Only Common Metadata", "For any input format that supports metadata (Avro, e.g.), any FlowFile whose metadata values match those of the first FlowFile, any additional metadata will be dropped but the FlowFile will be merged. Any FlowFile whose metadata values do not match those of the first FlowFile in the bin will not be merged."),
        IGNORE("Ignore Metadata", "Ignores (does not transfer, compare, etc.) any metadata from a FlowFile whose content supports embedded metadata."),
        DO_NOT_MERGE("Do Not Merge Uncommon Metadata", "For any input format that supports metadata (Avro, e.g.), any FlowFile whose metadata values do not match those of the first FlowFile in the bin will not be merged.");

        private final String value;
        private final String description;

        private MetadataStrategy(String value, String description) {
            this.value = value;
            this.description = description;
        }

        public String getValue() {
            return this.value;
        }

        public String getDisplayName() {
            return this.value;
        }

        public String getDescription() {
            return this.description;
        }
    }
}

