/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.hadoop;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.io.BufferedInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.mapred.FsInput;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.fileresource.service.api.FileResource;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.AbstractHadoopProcessor;
import org.apache.nifi.processors.hadoop.CompressionType;
import org.apache.nifi.processors.hadoop.GetHDFS;
import org.apache.nifi.processors.hadoop.HadoopValidators;
import org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
import org.apache.nifi.processors.transfer.ResourceTransferProperties;
import org.apache.nifi.processors.transfer.ResourceTransferSource;
import org.apache.nifi.processors.transfer.ResourceTransferUtils;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;

@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"hadoop", "HCFS", "HDFS", "put", "copy", "filesystem"})
@CapabilityDescription(value="Write FlowFile data to Hadoop Distributed File System (HDFS)")
@ReadsAttribute(attribute="filename", description="The name of the file written to HDFS comes from the value of this attribute.")
@WritesAttributes(value={@WritesAttribute(attribute="filename", description="The name of the file written to HDFS is stored in this attribute."), @WritesAttribute(attribute="absolute.hdfs.path", description="The absolute path to the file on HDFS is stored in this attribute."), @WritesAttribute(attribute="hadoop.file.url", description="The hadoop url for the file is stored in this attribute."), @WritesAttribute(attribute="target.dir.created", description="The result(true/false) indicates if the folder is created by the processor.")})
@SeeAlso(value={GetHDFS.class})
@Restricted(restrictions={@Restriction(requiredPermission=RequiredPermission.WRITE_DISTRIBUTED_FILESYSTEM, explanation="Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.")})
public class PutHDFS
extends AbstractHadoopProcessor {
    protected static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
    protected static final int BUFFER_SIZE_DEFAULT = 4096;
    private Cache<Path, AclStatus> aclCache;
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Files that have been successfully written to HDFS are transferred to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Files that could not be written to HDFS for some reason are transferred to this relationship").build();
    public static final String DEFAULT_APPEND_MODE = "DEFAULT";
    public static final String AVRO_APPEND_MODE = "AVRO";
    protected static final String REPLACE_RESOLUTION = "replace";
    protected static final String IGNORE_RESOLUTION = "ignore";
    protected static final String FAIL_RESOLUTION = "fail";
    protected static final String APPEND_RESOLUTION = "append";
    protected static final String WRITE_AND_RENAME = "writeAndRename";
    protected static final String SIMPLE_WRITE = "simpleWrite";
    protected static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue("replace", "replace", "Replaces the existing file if any.");
    protected static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue("ignore", "ignore", "Ignores the flow file and routes it to success.");
    protected static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue("fail", "fail", "Penalizes the flow file and routes it to failure.");
    protected static final AllowableValue APPEND_RESOLUTION_AV = new AllowableValue("append", "append", "Appends to the existing file if any, creates a new file otherwise.");
    protected static final AllowableValue WRITE_AND_RENAME_AV = new AllowableValue("writeAndRename", "Write and rename", "The processor writes FlowFile data into a temporary file and renames it after completion. This prevents other processes from reading partially written files.");
    protected static final AllowableValue SIMPLE_WRITE_AV = new AllowableValue("simpleWrite", "Simple write", "The processor writes FlowFile data directly to the destination file. In some cases this might cause reading partially written files.");
    protected static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder().name("Conflict Resolution Strategy").description("Indicates what should happen when a file with the same name already exists in the output directory").required(true).defaultValue((DescribedValue)FAIL_RESOLUTION_AV).allowableValues(new DescribedValue[]{REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV, APPEND_RESOLUTION_AV}).build();
    protected static final PropertyDescriptor WRITING_STRATEGY = new PropertyDescriptor.Builder().name("writing-strategy").displayName("Writing Strategy").description("Defines the approach for writing the FlowFile data.").required(true).defaultValue((DescribedValue)WRITE_AND_RENAME_AV).allowableValues(new DescribedValue[]{WRITE_AND_RENAME_AV, SIMPLE_WRITE_AV}).build();
    public static final PropertyDescriptor APPEND_MODE = new PropertyDescriptor.Builder().name("Append Mode").description("Defines the append strategy to use when the Conflict Resolution Strategy is set to 'append'.").allowableValues(new String[]{"DEFAULT", "AVRO"}).defaultValue("DEFAULT").dependsOn(CONFLICT_RESOLUTION, "append", new String[0]).required(true).build();
    public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder().name("Block Size").description("Size of each block as written to HDFS. This overrides the Hadoop Configuration").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).build();
    public static final PropertyDescriptor BUFFER_SIZE = new PropertyDescriptor.Builder().name("IO Buffer Size").description("Amount of memory to use to buffer file contents during IO. This overrides the Hadoop Configuration").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).build();
    public static final PropertyDescriptor REPLICATION_FACTOR = new PropertyDescriptor.Builder().name("Replication").description("Number of times that HDFS will replicate each file. This overrides the Hadoop Configuration").addValidator(HadoopValidators.POSITIVE_SHORT_VALIDATOR).build();
    public static final PropertyDescriptor UMASK = new PropertyDescriptor.Builder().name("Permissions umask").description("A umask represented as an octal number which determines the permissions of files written to HDFS. This overrides the Hadoop property \"fs.permissions.umask-mode\". If this property and \"fs.permissions.umask-mode\" are undefined, the Hadoop default \"022\" will be used. If the PutHDFS target folder has a default ACL defined, the umask property is ignored by HDFS.").addValidator(HadoopValidators.UMASK_VALIDATOR).build();
    public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder().name("Remote Owner").description("Changes the owner of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change owner").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder().name("Remote Group").description("Changes the group of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change group").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor IGNORE_LOCALITY = new PropertyDescriptor.Builder().name("Ignore Locality").description("Directs the HDFS system to ignore locality rules so that data is distributed randomly throughout the cluster").required(false).defaultValue("false").allowableValues(new String[]{"true", "false"}).addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Stream.concat(PutHDFS.getCommonPropertyDescriptors().stream(), Stream.of(new PropertyDescriptor.Builder().fromPropertyDescriptor(DIRECTORY).description("The parent HDFS directory to which files should be written. The directory will be created if it doesn't exist.").build(), CONFLICT_RESOLUTION, APPEND_MODE, WRITING_STRATEGY, BLOCK_SIZE, BUFFER_SIZE, REPLICATION_FACTOR, UMASK, REMOTE_OWNER, REMOTE_GROUP, COMPRESSION_CODEC, IGNORE_LOCALITY, ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE, ResourceTransferProperties.FILE_RESOURCE_SERVICE)).toList();

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        boolean isCodecSet;
        ArrayList<ValidationResult> problems = new ArrayList<ValidationResult>(super.customValidate(validationContext));
        PropertyValue codec = validationContext.getProperty(COMPRESSION_CODEC);
        boolean bl = isCodecSet = codec.isSet() && !CompressionType.NONE.name().equals(codec.getValue());
        if (isCodecSet && APPEND_RESOLUTION.equals(validationContext.getProperty(CONFLICT_RESOLUTION).getValue()) && AVRO_APPEND_MODE.equals(validationContext.getProperty(APPEND_MODE).getValue())) {
            problems.add(new ValidationResult.Builder().subject("Codec").valid(false).explanation("Compression codec cannot be set when used in 'append avro' mode").build());
        }
        return problems;
    }

    protected void preProcessConfiguration(Configuration config, ProcessContext context) {
        PropertyValue umaskProp = context.getProperty(UMASK);
        short dfsUmask = umaskProp.isSet() ? Short.parseShort(umaskProp.getValue(), 8) : FsPermission.getUMask((Configuration)config).toShort();
        FsPermission.setUMask((Configuration)config, (FsPermission)new FsPermission(dfsUmask));
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) {
        this.aclCache = Caffeine.newBuilder().maximumSize(20L).expireAfterWrite(Duration.ofHours(1L)).build();
    }

    @OnStopped
    public void onStopped() {
        if (this.aclCache != null) {
            this.aclCache.invalidateAll();
        }
    }

    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        final FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        final FileSystem hdfs = this.getFileSystem();
        final Configuration configuration = this.getConfiguration();
        UserGroupInformation ugi = this.getUserGroupInformation();
        if (configuration == null || hdfs == null || ugi == null) {
            this.getLogger().error("HDFS not configured properly");
            session.transfer(flowFile, this.getFailureRelationship());
            context.yield();
            return;
        }
        ugi.doAs((PrivilegedAction)new PrivilegedAction<Object>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Object run() {
                Path tempDotCopyFile = null;
                FlowFile putFlowFile = flowFile;
                try {
                    String writingStrategy = context.getProperty(WRITING_STRATEGY).getValue();
                    Path dirPath = PutHDFS.this.getNormalizedPath(context, AbstractHadoopProcessor.DIRECTORY, putFlowFile);
                    String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
                    long blockSize = PutHDFS.this.getBlockSize(context, session, putFlowFile, dirPath);
                    int bufferSize = PutHDFS.this.getBufferSize(context, session, putFlowFile);
                    short replication = PutHDFS.this.getReplication(context, session, putFlowFile, dirPath);
                    CompressionCodec codec = PutHDFS.this.getCompressionCodec(context, configuration);
                    String filename = codec != null ? putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension() : putFlowFile.getAttribute(CoreAttributes.FILENAME.key());
                    Path tempCopyFile = new Path(dirPath, "." + filename);
                    Path copyFile = new Path(dirPath, filename);
                    Path actualCopyFile = writingStrategy.equals(PutHDFS.WRITE_AND_RENAME) ? tempCopyFile : copyFile;
                    boolean targetDirCreated = false;
                    try {
                        FileStatus fileStatus = hdfs.getFileStatus(dirPath);
                        if (!fileStatus.isDirectory()) {
                            throw new IOException(dirPath.toString() + " already exists and is not a directory");
                        }
                        if (fileStatus.hasAcl()) {
                            this.checkAclStatus(this.getAclStatus(dirPath));
                        }
                    }
                    catch (FileNotFoundException fe) {
                        targetDirCreated = hdfs.mkdirs(dirPath);
                        if (!targetDirCreated) {
                            throw new IOException(dirPath.toString() + " could not be created");
                        }
                        Object fileStatus = hdfs.getFileStatus(dirPath);
                        if (fileStatus.hasAcl()) {
                            this.checkAclStatus(this.getAclStatus(dirPath));
                        }
                        PutHDFS.this.changeOwner(context, hdfs, dirPath, flowFile);
                    }
                    boolean destinationExists = hdfs.exists(copyFile);
                    if (destinationExists) {
                        switch (conflictResponse) {
                            case "replace": {
                                if (!hdfs.delete(copyFile, false)) break;
                                PutHDFS.this.getLogger().info("deleted {} in order to replace with the contents of {}", new Object[]{copyFile, putFlowFile});
                                break;
                            }
                            case "ignore": {
                                session.transfer(putFlowFile, PutHDFS.this.getSuccessRelationship());
                                PutHDFS.this.getLogger().info("transferring {} to success because file with same name already exists", new Object[]{putFlowFile});
                                return null;
                            }
                            case "fail": {
                                session.transfer(session.penalize(putFlowFile), PutHDFS.this.getFailureRelationship());
                                PutHDFS.this.getLogger().warn("penalizing {} and routing to failure because file with same name already exists", new Object[]{putFlowFile});
                                return null;
                            }
                        }
                    }
                    StopWatch stopWatch = new StopWatch(true);
                    ResourceTransferSource resourceTransferSource = (ResourceTransferSource)context.getProperty(ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE).asAllowableValue(ResourceTransferSource.class);
                    try (InputStream in = ResourceTransferUtils.getFileResource((ResourceTransferSource)resourceTransferSource, (ProcessContext)context, (Map)flowFile.getAttributes()).map(FileResource::getInputStream).orElseGet(() -> session.read(flowFile));){
                        Path createdFile;
                        OutputStream fos;
                        block64: {
                            fos = null;
                            createdFile = null;
                            try {
                                if (conflictResponse.equals(PutHDFS.APPEND_RESOLUTION) && destinationExists) {
                                    fos = hdfs.append(copyFile, bufferSize);
                                } else {
                                    EnumSet<CreateFlag> cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
                                    if (PutHDFS.this.shouldIgnoreLocality(context, session)) {
                                        cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
                                    }
                                    fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask((FsPermission)FsPermission.getFileDefault(), (FsPermission)FsPermission.getUMask((Configuration)hdfs.getConf())), cflags, bufferSize, replication, blockSize, null, null);
                                }
                                if (codec != null) {
                                    fos = codec.createOutputStream(fos);
                                }
                                createdFile = actualCopyFile;
                                if (PutHDFS.APPEND_RESOLUTION.equals(conflictResponse) && context.getProperty(APPEND_MODE).getValue().equals(PutHDFS.AVRO_APPEND_MODE) && destinationExists) {
                                    PutHDFS.this.getLogger().info("Appending avro record to existing avro file");
                                    try (DataFileStream reader = new DataFileStream(in, (DatumReader)new GenericDatumReader());
                                         DataFileWriter writer = new DataFileWriter((DatumWriter)new GenericDatumWriter());){
                                        writer.appendTo((SeekableInput)new FsInput(copyFile, configuration), fos);
                                        writer.appendAllFrom(reader, false);
                                        writer.flush();
                                        PutHDFS.this.getLogger().info("Successfully appended avro record");
                                        break block64;
                                    }
                                    catch (Exception e) {
                                        PutHDFS.this.getLogger().error("Error occurred during appending to existing avro file", (Throwable)e);
                                        throw new ProcessException((Throwable)e);
                                    }
                                }
                                BufferedInputStream bis = new BufferedInputStream(in);
                                StreamUtils.copy((InputStream)bis, (OutputStream)fos);
                                bis = null;
                                fos.flush();
                            }
                            catch (Throwable throwable) {
                                try {
                                    if (fos != null) {
                                        fos.close();
                                    }
                                }
                                catch (Throwable t) {
                                    if (createdFile != null) {
                                        try {
                                            hdfs.delete(createdFile, false);
                                        }
                                        catch (Throwable throwable2) {
                                            // empty catch block
                                        }
                                    }
                                    throw t;
                                }
                                fos = null;
                                throw throwable;
                            }
                        }
                        try {
                            if (fos != null) {
                                fos.close();
                            }
                        }
                        catch (Throwable t) {
                            if (createdFile != null) {
                                try {
                                    hdfs.delete(createdFile, false);
                                }
                                catch (Throwable writer) {
                                    // empty catch block
                                }
                            }
                            throw t;
                        }
                        fos = null;
                    }
                    stopWatch.stop();
                    String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize());
                    long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
                    tempDotCopyFile = tempCopyFile;
                    if (writingStrategy.equals(PutHDFS.WRITE_AND_RENAME) && (!conflictResponse.equals(PutHDFS.APPEND_RESOLUTION) || conflictResponse.equals(PutHDFS.APPEND_RESOLUTION) && !destinationExists)) {
                        boolean renamed = false;
                        for (int i = 0; i < 10; ++i) {
                            if (hdfs.rename(tempCopyFile, copyFile)) {
                                renamed = true;
                                break;
                            }
                            Thread.sleep(200L);
                        }
                        if (!renamed) {
                            hdfs.delete(tempCopyFile, false);
                            throw new ProcessException("Copied file to HDFS but could not rename dot file " + String.valueOf(tempCopyFile) + " to its final filename");
                        }
                        PutHDFS.this.changeOwner(context, hdfs, copyFile, flowFile);
                    }
                    PutHDFS.this.getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}", new Object[]{putFlowFile, copyFile, millis, dataRate});
                    String newFilename = copyFile.getName();
                    String hdfsPath = copyFile.getParent().toString();
                    putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename);
                    putFlowFile = session.putAttribute(putFlowFile, "absolute.hdfs.path", hdfsPath);
                    putFlowFile = session.putAttribute(putFlowFile, "target.dir.created", String.valueOf(targetDirCreated));
                    Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
                    putFlowFile = session.putAttribute(putFlowFile, "hadoop.file.url", qualifiedPath.toString());
                    session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
                    session.transfer(putFlowFile, PutHDFS.this.getSuccessRelationship());
                }
                catch (Throwable t) {
                    if (PutHDFS.this.handleAuthErrors(t, session, context, new GSSExceptionRollbackYieldSessionHandler())) {
                        return null;
                    }
                    if (tempDotCopyFile != null) {
                        try {
                            hdfs.delete(tempDotCopyFile, false);
                        }
                        catch (Exception e) {
                            PutHDFS.this.getLogger().error("Unable to remove temporary file {}", new Object[]{tempDotCopyFile, e});
                        }
                    }
                    PutHDFS.this.getLogger().error("Failed to write to HDFS", t);
                    session.transfer(session.penalize(putFlowFile), PutHDFS.this.getFailureRelationship());
                    context.yield();
                }
                return null;
            }

            private void checkAclStatus(AclStatus aclStatus) throws IOException {
                boolean isDefaultACL = aclStatus.getEntries().stream().anyMatch(aclEntry -> AclEntryScope.DEFAULT.equals((Object)aclEntry.getScope()));
                boolean isSetUmask = context.getProperty(UMASK).isSet();
                if (isDefaultACL && isSetUmask) {
                    throw new IOException("PutHDFS umask setting is ignored by HDFS when HDFS default ACL is set.");
                }
            }

            private AclStatus getAclStatus(Path dirPath) {
                return (AclStatus)PutHDFS.this.aclCache.get((Object)dirPath, fn -> {
                    try {
                        return hdfs.getAclStatus(dirPath);
                    }
                    catch (IOException e) {
                        throw new UncheckedIOException(String.format("Unable to query ACL for directory [%s]", dirPath), e);
                    }
                });
            }
        });
    }

    protected Relationship getSuccessRelationship() {
        return REL_SUCCESS;
    }

    protected Relationship getFailureRelationship() {
        return REL_FAILURE;
    }

    protected long getBlockSize(ProcessContext context, ProcessSession session, FlowFile flowFile, Path dirPath) {
        Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
        return blockSizeProp != null ? blockSizeProp.longValue() : this.getFileSystem().getDefaultBlockSize(dirPath);
    }

    protected int getBufferSize(ProcessContext context, ProcessSession session, FlowFile flowFile) {
        Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
        return bufferSizeProp != null ? bufferSizeProp.intValue() : this.getConfiguration().getInt(BUFFER_SIZE_KEY, 4096);
    }

    protected short getReplication(ProcessContext context, ProcessSession session, FlowFile flowFile, Path dirPath) {
        Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger();
        return replicationProp != null ? replicationProp.shortValue() : this.getFileSystem().getDefaultReplication(dirPath);
    }

    protected boolean shouldIgnoreLocality(ProcessContext context, ProcessSession session) {
        return context.getProperty(IGNORE_LOCALITY).asBoolean();
    }

    protected String getOwner(ProcessContext context, FlowFile flowFile) {
        String owner = context.getProperty(REMOTE_OWNER).evaluateAttributeExpressions(flowFile).getValue();
        return owner == null || owner.isEmpty() ? null : owner;
    }

    protected String getGroup(ProcessContext context, FlowFile flowFile) {
        String group = context.getProperty(REMOTE_GROUP).evaluateAttributeExpressions(flowFile).getValue();
        return group == null || group.isEmpty() ? null : group;
    }

    protected void changeOwner(ProcessContext context, FileSystem hdfs, Path name, FlowFile flowFile) {
        try {
            String owner = this.getOwner(context, flowFile);
            String group = this.getGroup(context, flowFile);
            if (owner != null || group != null) {
                hdfs.setOwner(name, owner, group);
            }
        }
        catch (Exception e) {
            this.getLogger().warn("Could not change owner or group of {} on HDFS", new Object[]{name, e});
        }
    }
}

