/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.stream.io.StreamUtils;

@TriggerSerially
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"tail", "file", "log", "text", "source"})
@CapabilityDescription(value="\"Tails\" a file, or a list of files, ingesting data from the file as it is written to the file. The file is expected to be textual. Data is ingested only when a new line is encountered (carriage return or new-line character or combination). If the file to tail is periodically \"rolled over\", as is generally the case with log files, an optional Rolling Filename Pattern can be used to retrieve data from files that have rolled over, even if the rollover occurred while NiFi was not running (provided that the data still exists upon restart of NiFi). It is generally advisable to set the Run Schedule to a few seconds, rather than running with the default value of 0 secs, as this Processor will consume a lot of resources if scheduled very aggressively. At this time, this Processor does not support ingesting files that have been compressed when 'rolled over'.")
@Stateful(scopes={Scope.LOCAL, Scope.CLUSTER}, description="Stores state about where in the Tailed File it left off so that on restart it does not have to duplicate data. State is stored either local or clustered depend on the <File Location> property.")
@WritesAttribute(attribute="tailfile.original.path", description="Path of the original file the flow file comes from.")
@Restricted(restrictions={@Restriction(requiredPermission=RequiredPermission.READ_FILESYSTEM, explanation="Provides operator the ability to read from any file that NiFi has access to.")})
@DefaultSchedule(strategy=SchedulingStrategy.TIMER_DRIVEN, period="30 sec")
public class TailFile
extends AbstractProcessor {
    static final String MAP_PREFIX = "file.";
    private static final byte[] NEW_LINE_BYTES = "\n".getBytes(StandardCharsets.UTF_8);
    static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", "Local", "State is stored locally. Each node in a cluster will tail a different file.");
    static final AllowableValue LOCATION_REMOTE = new AllowableValue("Remote", "Remote", "State is located on a remote resource. This Processor will store state across the cluster so that it can be run on Primary Node Only and a new Primary Node can pick up where the last one left off.");
    static final AllowableValue MODE_SINGLEFILE = new AllowableValue("Single file", "Single file", "In this mode, only the one file indicated in the 'Files to tail' property will be watched by the processor. In this mode, the file may not exist when starting the processor.");
    static final AllowableValue MODE_MULTIFILE = new AllowableValue("Multiple files", "Multiple files", "In this mode, the 'Files to tail' property accepts a regular expression and the processor will look for files in 'Base directory' to list the files to tail by the processor.");
    static final AllowableValue START_BEGINNING_OF_TIME = new AllowableValue("Beginning of Time", "Beginning of Time", "Start with the oldest data that matches the Rolling Filename Pattern and then begin reading from the File to Tail");
    static final AllowableValue START_CURRENT_FILE = new AllowableValue("Beginning of File", "Beginning of File", "Start with the beginning of the File to Tail. Do not ingest any data that has already been rolled over");
    static final AllowableValue START_CURRENT_TIME = new AllowableValue("Current Time", "Current Time", "Start with the data at the end of the File to Tail. Do not ingest any data thas has already been rolled over or any data in the File to Tail that has already been written.");
    static final PropertyDescriptor BASE_DIRECTORY = new PropertyDescriptor.Builder().name("Base Directory").description("Base directory used to look for files to tail. This property is required when using Multifile mode.").expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).required(false).build();
    static final PropertyDescriptor MODE = new PropertyDescriptor.Builder().name("Tailing Mode").description("Mode to use: single file will tail only one file, multiple file will look for a list of file. In Multiple mode the Base directory is required.").expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).allowableValues(new DescribedValue[]{MODE_SINGLEFILE, MODE_MULTIFILE}).defaultValue((DescribedValue)MODE_SINGLEFILE).build();
    static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder().name("Files to Tail").description("Path of the file to tail in case of single file mode. If using multifile mode, regular expression to find files to tail in the base directory. In case recursive is set to true, the regular expression will be used to match the path starting from the base directory (see additional details for examples).").expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.createRegexValidator((int)0, (int)Integer.MAX_VALUE, (boolean)true)).required(true).build();
    static final PropertyDescriptor ROLLING_FILENAME_PATTERN = new PropertyDescriptor.Builder().name("Rolling Filename Pattern").description("If the file to tail \"rolls over\" as would be the case with log files, this filename pattern will be used to identify files that have rolled over so that if NiFi is restarted, and the file has rolled over, it will be able to pick up where it left off. This pattern supports wildcard characters * and ?, it also supports the notation ${filename} to specify a pattern based on the name of the file (without extension), and will assume that the files that have rolled over live in the same directory as the file being tailed. The same glob pattern will be used for all files.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final PropertyDescriptor POST_ROLLOVER_TAIL_PERIOD = new PropertyDescriptor.Builder().name("Post-Rollover Tail Period").description("When a file is rolled over, the processor will continue tailing the rolled over file until it has not been modified for this amount of time. This allows for another process to rollover a file, and then flush out any buffered data. Note that when this value is set, and the tailed file rolls over, the new file will not be tailed until the old file has not been modified for the configured amount of time. Additionally, when using this capability, in order to avoid data duplication, this period must be set longer than the Processor's Run Schedule, and the Processor must not be stopped after the file being tailed has been rolled over and before the data has been fully consumed. Otherwise, the data may be duplicated, as the entire file may be written out as the contents of a single FlowFile.").required(false).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).defaultValue("0 sec").build();
    static final PropertyDescriptor STATE_LOCATION = new PropertyDescriptor.Builder().name("State Location").description("Specifies where the state is located either local or cluster so that state can be stored appropriately in order to ensure that all data is consumed without duplicating data upon restart of NiFi").required(true).allowableValues(new DescribedValue[]{LOCATION_LOCAL, LOCATION_REMOTE}).defaultValue((DescribedValue)LOCATION_LOCAL).build();
    static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder().name("Initial Start Position").description("When the Processor first begins to tail data, this property specifies where the Processor should begin reading data. Once data has been ingested from a file, the Processor will continue from the last point from which it has received data.").allowableValues(new DescribedValue[]{START_BEGINNING_OF_TIME, START_CURRENT_FILE, START_CURRENT_TIME}).defaultValue((DescribedValue)START_CURRENT_FILE).required(true).build();
    static final PropertyDescriptor RECURSIVE = new PropertyDescriptor.Builder().name("Recursive Lookup").description("When using Multiple files mode, this property defines if files must be listed recursively or not in the base directory.").allowableValues(new String[]{"true", "false"}).defaultValue("false").required(true).build();
    static final PropertyDescriptor LOOKUP_FREQUENCY = new PropertyDescriptor.Builder().name("Lookup Frequency").description("Only used in Multiple files mode. It specifies the minimum duration the processor will wait before listing again the files to tail.").required(false).defaultValue("10 minutes").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    static final PropertyDescriptor MAXIMUM_AGE = new PropertyDescriptor.Builder().name("Maximum Age").description("Only used in Multiple files mode. It specifies the necessary minimum duration to consider that no new messages will be appended in a file regarding its last modification date. This should not be set too low to avoid duplication of data in case new messages are appended at a lower frequency.").required(false).defaultValue("24 hours").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    static final PropertyDescriptor REREAD_ON_NUL = new PropertyDescriptor.Builder().name("Reread on NUL Encountered").description("If this option is set to 'true', when a NUL character is read, the processor will yield and try to read the same part again later. (Note: Yielding may delay the processing of other files tailed by this processor, not just the one with the NUL character.) The purpose of this flag is to allow users to handle cases where reading a file may return temporary NUL values. NFS for example may send file contents out of order. In this case the missing parts are temporarily replaced by NUL values. CAUTION! If the file contains legitimate NUL values, setting this flag causes this processor to get stuck indefinitely. For this reason users should refrain from using this feature if they can help it and try to avoid having the target file on a file system where reads are unreliable.").required(false).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    static final PropertyDescriptor LINE_START_PATTERN = new PropertyDescriptor.Builder().name("Line Start Pattern").description("A Regular Expression to match against the start of a log line. If specified, any line that matches the expression, and any following lines, will be buffered until another line matches the Expression. In doing this, we can avoid splitting apart multi-line messages in the file. This assumes that the data is in UTF-8 format.").required(false).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).dependsOn(MODE, new AllowableValue[]{MODE_SINGLEFILE}).build();
    static final PropertyDescriptor MAX_BUFFER_LENGTH = new PropertyDescriptor.Builder().name("Max Buffer Size").description("When using the Line Start Pattern, there may be situations in which the data in the file being tailed never matches the Regular Expression. This would result in the processor buffering all data from the tailed file, which can quickly exhaust the heap. To avoid this, the Processor will buffer only up to this amount of data before flushing the buffer, even if it means ingesting partial data from the file.").required(true).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).defaultValue("64 KB").dependsOn(LINE_START_PATTERN, new AllowableValue[0]).build();
    static final PropertyDescriptor PRE_ALLOCATED_BUFFER_SIZE = new PropertyDescriptor.Builder().name("Preallocated Buffer Size").description("Sets the amount of memory that is preallocated for each tailed file.").required(true).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).defaultValue("65536 B").build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(MODE, FILENAME, ROLLING_FILENAME_PATTERN, POST_ROLLOVER_TAIL_PERIOD, BASE_DIRECTORY, START_POSITION, STATE_LOCATION, RECURSIVE, LOOKUP_FREQUENCY, MAXIMUM_AGE, REREAD_ON_NUL, LINE_START_PATTERN, PRE_ALLOCATED_BUFFER_SIZE, MAX_BUFFER_LENGTH);
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles are routed to this Relationship.").build();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
    private volatile Map<String, TailFileObject> states = new HashMap<String, TailFileObject>();
    private final AtomicLong lastLookup = new AtomicLong(0L);
    private final AtomicBoolean isMultiChanging = new AtomicBoolean(false);
    private volatile boolean requireStateLookup = true;
    private final ByteArrayOutputStream linesBuffer = new ByteArrayOutputStream();
    private volatile Pattern lineStartPattern;
    private volatile long maxBufferBytes;
    private volatile int preAllocatedBufferSize;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        if (this.isConfigurationRestored() && FILENAME.equals((Object)descriptor)) {
            this.states = new HashMap<String, TailFileObject>();
        }
    }

    protected Collection<ValidationResult> customValidate(ValidationContext context) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>(super.customValidate(context));
        if (context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())) {
            String path = context.getProperty(BASE_DIRECTORY).evaluateAttributeExpressions().getValue();
            if (path == null) {
                results.add(new ValidationResult.Builder().subject(BASE_DIRECTORY.getName()).valid(false).explanation("Base directory property cannot be empty in Multifile mode.").build());
            } else if (!new File(path).isDirectory()) {
                results.add(new ValidationResult.Builder().subject(BASE_DIRECTORY.getName()).valid(false).explanation(path + " is not a directory.").build());
            }
        }
        return results;
    }

    @OnPrimaryNodeStateChange
    public void onPrimaryNodeChange() {
        this.requireStateLookup = true;
    }

    private List<String> lookup(ProcessContext context) {
        this.lastLookup.set(new Date().getTime());
        long maxAge = context.getProperty(MAXIMUM_AGE).getValue() == null ? Long.MAX_VALUE : context.getProperty(MAXIMUM_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
        ArrayList<String> filesToTail = new ArrayList<String>();
        if (context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())) {
            filesToTail.addAll(this.getFilesToTail(context.getProperty(BASE_DIRECTORY).evaluateAttributeExpressions().getValue(), context.getProperty(FILENAME).evaluateAttributeExpressions().getValue(), context.getProperty(RECURSIVE).asBoolean(), maxAge));
        } else {
            filesToTail.add(context.getProperty(FILENAME).evaluateAttributeExpressions().getValue());
        }
        return filesToTail;
    }

    @OnScheduled
    public void compileRegex(ProcessContext context) {
        if (context.getProperty(MODE).getValue().equals(MODE_SINGLEFILE.getValue())) {
            String patternString = context.getProperty(LINE_START_PATTERN).getValue();
            this.lineStartPattern = patternString == null ? null : Pattern.compile(patternString);
        }
        this.maxBufferBytes = this.lineStartPattern == null ? Long.MAX_VALUE : context.getProperty(MAX_BUFFER_LENGTH).asDataSize(DataUnit.B).longValue();
        this.preAllocatedBufferSize = context.getProperty(PRE_ALLOCATED_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
    }

    @OnScheduled
    public void recoverState(ProcessContext context) throws IOException {
        this.isMultiChanging.set(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue()));
        List<String> filesToTail = this.lookup(context);
        Scope scope = this.getStateScope(context);
        StateMap stateMap = context.getStateManager().getState(scope);
        if (stateMap.getStateVersion().isEmpty() || stateMap.toMap().isEmpty()) {
            this.initStates(filesToTail, Collections.emptyMap(), true);
            this.recoverState(filesToTail, Collections.emptyMap());
            return;
        }
        Map statesMap = stateMap.toMap();
        this.initStates(filesToTail, statesMap, false);
        this.recoverState(filesToTail, statesMap);
    }

    private void initStates(List<String> filesToTail, Map<String, String> statesMap, boolean isCleared) {
        int fileIndex = 0;
        if (isCleared) {
            this.states.clear();
        } else {
            if (this.states.isEmpty() && !statesMap.isEmpty()) {
                for (Map.Entry entry : statesMap.entrySet()) {
                    String key = (String)entry.getKey();
                    String value = (String)entry.getValue();
                    if (!key.endsWith("filename") || !filesToTail.contains(value)) continue;
                    int index = Integer.parseInt(key.split("\\.")[1]);
                    this.states.put(value, new TailFileObject(index, statesMap, this.preAllocatedBufferSize));
                }
            }
            ArrayList<String> toBeRemoved = new ArrayList<String>();
            for (Map.Entry<String, TailFileObject> entry : this.states.entrySet()) {
                String filePath = entry.getKey();
                TailFileObject tailFileObject = entry.getValue();
                if (filesToTail.contains(filePath)) continue;
                toBeRemoved.add(filePath);
                this.cleanReader(tailFileObject);
            }
            toBeRemoved.forEach(this.states.keySet()::remove);
            for (TailFileObject tfo : this.states.values()) {
                if (fileIndex > tfo.getFilenameIndex()) continue;
                fileIndex = tfo.getFilenameIndex() + 1;
            }
        }
        for (String string : filesToTail) {
            if (!isCleared && this.states.containsKey(string)) continue;
            TailFileState tailFileState = new TailFileState(string, null, null, 0L, 0L, 0L, null, ByteBuffer.allocate(this.preAllocatedBufferSize));
            this.states.put(string, new TailFileObject(fileIndex, tailFileState));
            ++fileIndex;
        }
    }

    private void recoverState(List<String> filesToTail, Map<String, String> map) throws IOException {
        for (String file : filesToTail) {
            this.recoverState(map, file);
        }
    }

    private List<String> getFilesToTail(String baseDir, String fileRegex, boolean isRecursive, long maxAge) {
        Collection files = FileUtils.listFiles((File)new File(baseDir), null, (boolean)isRecursive);
        ArrayList<String> result = new ArrayList<String>();
        String baseDirNoTrailingSeparator = baseDir.endsWith(File.separator) ? baseDir.substring(0, baseDir.length() - 1) : baseDir;
        String fullRegex = File.separator.equals("/") ? baseDirNoTrailingSeparator + File.separator + fileRegex : baseDirNoTrailingSeparator + Pattern.quote(File.separator) + fileRegex;
        Pattern p = Pattern.compile(fullRegex);
        for (File file : files) {
            String path = file.getPath();
            if (!p.matcher(path).matches()) continue;
            if (this.isMultiChanging.get()) {
                if (new Date().getTime() - file.lastModified() >= maxAge) continue;
                result.add(path);
                continue;
            }
            result.add(path);
        }
        return result;
    }

    private void recoverState(Map<String, String> stateValues, String filePath) throws IOException {
        TailFileObject tailFileObject = this.states.get(filePath);
        String prefix = MAP_PREFIX + tailFileObject.getFilenameIndex() + ".";
        if (!(stateValues.containsKey(prefix + "filename") && stateValues.containsKey(prefix + "position") && stateValues.containsKey(prefix + "timestamp") && stateValues.containsKey(prefix + "length"))) {
            this.resetState(filePath);
            return;
        }
        String checksumValue = stateValues.get(prefix + "checksum");
        boolean checksumPresent = checksumValue != null;
        String storedStateFilename = stateValues.get(prefix + "filename");
        long position = Long.parseLong(stateValues.get(prefix + "position"));
        long timestamp = Long.parseLong(stateValues.get(prefix + "timestamp"));
        long length = Long.parseLong(stateValues.get(prefix + "length"));
        FileChannel reader = null;
        File tailFile = null;
        if (checksumPresent && filePath.equals(storedStateFilename)) {
            tailFileObject.setExpectedRecoveryChecksum(Long.parseLong(checksumValue));
            CRC32 checksum = new CRC32();
            File existingTailFile = new File(storedStateFilename);
            if (existingTailFile.length() >= position) {
                try (InputStream tailFileIs = Files.newInputStream(existingTailFile.toPath(), new OpenOption[0]);
                     CheckedInputStream in = new CheckedInputStream(tailFileIs, checksum);){
                    try {
                        StreamUtils.copy((InputStream)in, (OutputStream)new NullOutputStream(), (long)tailFileObject.getState().getPosition());
                    }
                    catch (EOFException eof) {
                        this.getLogger().debug("When recovering state, file being tailed has less data than was stored in the state. Assuming rollover. Will begin tailing current file from beginning.");
                    }
                    long checksumResult = in.getChecksum().getValue();
                    if (checksumResult == tailFileObject.getExpectedRecoveryChecksum()) {
                        this.getLogger().debug("When recovering state, checksum of tailed file matches the stored checksum. Will resume where left off.");
                        tailFile = existingTailFile;
                        reader = FileChannel.open(tailFile.toPath(), StandardOpenOption.READ);
                        this.getLogger().debug("Created FileChannel {} for {} in recoverState", new Object[]{reader, tailFile});
                        reader.position(position);
                    }
                    this.getLogger().debug("When recovering state, checksum of tailed file does not match the stored checksum. Will begin tailing current file from beginning.");
                }
            } else {
                this.getLogger().debug("When recovering state, existing file to tail is only {} bytes but position flag is {}; this indicates that the file has rotated. Will begin tailing current file from beginning.", new Object[]{existingTailFile.length(), position});
            }
            tailFileObject.setState(new TailFileState(filePath, tailFile, reader, position, timestamp, length, checksum, ByteBuffer.allocate(this.preAllocatedBufferSize)));
        } else {
            this.resetState(filePath);
        }
        this.getLogger().debug("Recovered state {}", new Object[]{tailFileObject.getState()});
    }

    private void resetState(String filePath) {
        this.states.get(filePath).setExpectedRecoveryChecksum(null);
        this.states.get(filePath).setState(new TailFileState(filePath, null, null, 0L, 0L, 0L, null, ByteBuffer.allocate(this.preAllocatedBufferSize)));
    }

    @OnStopped
    public void cleanup(ProcessContext context) {
        for (TailFileObject tfo : this.states.values()) {
            this.cleanReader(tfo);
            TailFileState state = tfo.getState();
            tfo.setState(new TailFileState(state.getFilename(), state.getFile(), null, state.getPosition() - (long)this.linesBuffer.size(), state.getTimestamp(), state.getLength(), state.getChecksum(), state.getBuffer(), state.isTailingPostRollover()));
            this.persistState(tfo, null, context);
        }
        this.linesBuffer.reset();
    }

    private void cleanReader(TailFileObject tfo) {
        if (tfo.getState() == null) {
            return;
        }
        FileChannel reader = tfo.getState().getReader();
        if (reader == null) {
            return;
        }
        try {
            reader.close();
            this.getLogger().debug("Closed FileChannel {}", new Object[]{reader});
        }
        catch (IOException ioe) {
            this.getLogger().warn("Failed to close file handle during cleanup");
        }
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        long timeSinceLastLookup;
        if (this.isMultiChanging.get() && (timeSinceLastLookup = new Date().getTime() - this.lastLookup.get()) > context.getProperty(LOOKUP_FREQUENCY).asTimePeriod(TimeUnit.MILLISECONDS)) {
            try {
                List<String> filesToTail = this.lookup(context);
                Scope scope = this.getStateScope(context);
                StateMap stateMap = session.getState(scope);
                this.initStates(filesToTail, stateMap.toMap(), false);
            }
            catch (IOException e) {
                this.getLogger().error("Exception raised while attempting to recover state about where the tailing last left off", (Throwable)e);
                context.yield();
                return;
            }
        }
        if (this.requireStateLookup) {
            try {
                this.recoverState(context);
            }
            catch (IOException e) {
                this.getLogger().error("Exception raised while attempting to recover state about where the tailing last left off", (Throwable)e);
                context.yield();
                return;
            }
            this.requireStateLookup = false;
        }
        if (this.states.isEmpty()) {
            context.yield();
            return;
        }
        for (String tailFile : this.states.keySet()) {
            try {
                this.processTailFile(context, session, tailFile);
            }
            catch (NulCharacterEncounteredException e) {
                this.getLogger().warn("NUL character encountered in {} and '{}' is set to 'true', yielding.", new Object[]{tailFile, REREAD_ON_NUL.getDisplayName()});
                context.yield();
                return;
            }
        }
        if (this.lineStartPattern != null && this.linesBuffer.size() > 0) {
            this.cleanup(context);
        }
        try {
            Scope scope = this.getStateScope(context);
            StateMap sessionStateMap = session.getState(scope);
            HashMap<String, String> sessionStates = new HashMap<String, String>(sessionStateMap.toMap());
            List<String> keysToRemove = this.collectKeysToBeRemoved(sessionStates);
            keysToRemove.forEach(sessionStates.keySet()::remove);
            this.getLogger().debug("Removed {} references to nonexistent files from session's state map", new Object[]{keysToRemove.size()});
            session.setState(sessionStates, scope);
        }
        catch (IOException e) {
            this.getLogger().error("Exception raised while attempting to cleanup session's state map", (Throwable)e);
            context.yield();
        }
    }

    public void migrateProperties(PropertyConfiguration config) {
        config.renameProperty("tail-base-directory", BASE_DIRECTORY.getName());
        config.renameProperty("tail-mode", MODE.getName());
        config.renameProperty("File to Tail", FILENAME.getName());
        config.renameProperty("File Location", STATE_LOCATION.getName());
        config.renameProperty("tailfile-recursive-lookup", RECURSIVE.getName());
        config.renameProperty("tailfile-lookup-frequency", LOOKUP_FREQUENCY.getName());
        config.renameProperty("tailfile-maximum-age", MAXIMUM_AGE.getName());
        config.renameProperty("reread-on-nul", REREAD_ON_NUL.getName());
        config.renameProperty("pre-allocated-buffer-size", PRE_ALLOCATED_BUFFER_SIZE.getName());
    }

    private List<String> collectKeysToBeRemoved(Map<String, String> sessionStates) {
        ArrayList<String> keysToRemove = new ArrayList<String>();
        List<String> filesToRemove = sessionStates.entrySet().stream().filter(entry -> ((String)entry.getKey()).endsWith("filename") && !this.states.containsKey(entry.getValue())).map(Map.Entry::getKey).toList();
        for (String key : filesToRemove) {
            String prefix = StringUtils.substringBefore((String)key, (String)"filename");
            keysToRemove.add(prefix + "filename");
            keysToRemove.add(prefix + "length");
            keysToRemove.add(prefix + "position");
            keysToRemove.add(prefix + "checksum");
            keysToRemove.add(prefix + "timestamp");
            keysToRemove.add(prefix + "tailingPostRollover");
        }
        return keysToRemove;
    }

    private void processTailFile(ProcessContext context, ProcessSession session, String tailFile) {
        AtomicReference abort;
        boolean rolloverOccurred;
        TailFileObject tfo = this.states.get(tailFile);
        if (tfo.isTailFileChanged()) {
            rolloverOccurred = false;
            String recoverPosition = context.getProperty(START_POSITION).getValue();
            if (START_BEGINNING_OF_TIME.getValue().equals(recoverPosition)) {
                this.recoverRolledFiles(context, session, tailFile, tfo.getExpectedRecoveryChecksum(), tfo.getState().getTimestamp(), tfo.getState().getPosition());
            } else if (START_CURRENT_FILE.getValue().equals(recoverPosition)) {
                this.cleanup(context);
                tfo.setState(new TailFileState(tailFile, null, null, 0L, 0L, 0L, null, tfo.getState().getBuffer()));
            } else {
                String filename = tailFile;
                File file = new File(filename);
                try {
                    FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
                    this.getLogger().debug("Created FileChannel {} for {}", new Object[]{fileChannel, file});
                    CRC32 checksum = new CRC32();
                    long position = file.length();
                    long timestamp = file.lastModified() + 1L;
                    try (FileInputStream fis = new FileInputStream(file);
                         CheckedInputStream in = new CheckedInputStream(fis, checksum);){
                        StreamUtils.copy((InputStream)in, (OutputStream)new NullOutputStream(), (long)position);
                    }
                    fileChannel.position(position);
                    this.cleanup(context);
                    tfo.setState(new TailFileState(filename, file, fileChannel, position, timestamp, file.length(), checksum, tfo.getState().getBuffer()));
                }
                catch (IOException ioe) {
                    this.getLogger().error("Attempted to position Reader at current position in file {} but failed to do so", new Object[]{file, ioe});
                    context.yield();
                    return;
                }
            }
            tfo.setTailFileChanged(false);
        } else {
            Long expectedChecksumValue = tfo.getExpectedRecoveryChecksum();
            if (expectedChecksumValue == null) {
                Long l = expectedChecksumValue = tfo.getState().getChecksum() == null ? null : Long.valueOf(tfo.getState().getChecksum().getValue());
            }
            if (rolloverOccurred = this.recoverRolledFiles(context, session, tailFile, expectedChecksumValue, tfo.getState().getTimestamp(), tfo.getState().getPosition())) {
                boolean tailAfterRollover;
                boolean bl = tailAfterRollover = context.getProperty(POST_ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS) > 0L;
                if (tailAfterRollover) {
                    this.getLogger().debug("File {} was rolled over and the Rollover Tail Period is set, so will not consume from new file during this iteration.", new Object[]{tailFile});
                    return;
                }
            }
            tfo.setExpectedRecoveryChecksum(null);
        }
        TailFileState state = tfo.getState();
        File file = state.getFile();
        FileChannel reader = state.getReader();
        Checksum checksum = state.getChecksum();
        if (checksum == null) {
            checksum = new CRC32();
        }
        long position = state.getPosition();
        long timestamp = state.getTimestamp();
        long length = state.getLength();
        if ((file == null || reader == null) && (reader = this.createReader(file = new File(tailFile), position)) == null) {
            context.yield();
            return;
        }
        long startNanos = System.nanoTime();
        boolean rotated = rolloverOccurred;
        if (!rotated) {
            long fileLength = file.length();
            if (length > fileLength) {
                this.getLogger().debug("Rotated = true because TailFileState Length = {}, File Length = {}", new Object[]{length, fileLength});
                rotated = true;
            } else {
                try {
                    long readerSize = reader.size();
                    long readerPosition = reader.position();
                    if (readerSize == readerPosition && readerSize != fileLength) {
                        this.getLogger().debug("Rotated = true because readerSize={}, readerPosition={}, fileLength={}", new Object[]{readerSize, readerPosition, fileLength});
                        rotated = true;
                    }
                }
                catch (IOException e) {
                    this.getLogger().warn("Failed to determined the size or position of the File Channel when determining if the file has rolled over. Will assume that the file being tailed has not rolled over", (Throwable)e);
                }
            }
        }
        if (rotated) {
            try {
                reader.close();
                this.getLogger().debug("Closed FileChannel {}", new Object[]{reader});
            }
            catch (IOException ioe) {
                this.getLogger().warn("Failed to close reader for {}", new Object[]{file, ioe});
            }
            reader = this.createReader(file, 0L);
            position = 0L;
            checksum.reset();
        }
        if (file.length() == position || !file.exists()) {
            this.getLogger().debug("No data to consume; created no FlowFiles");
            tfo.setState(new TailFileState(tailFile, file, reader, position, timestamp, length, checksum, state.getBuffer()));
            this.persistState(tfo, session, context);
            context.yield();
            return;
        }
        TailFileState currentState = state;
        Checksum chksum = checksum;
        FlowFile flowFile = session.create();
        AtomicLong positionHolder = new AtomicLong(position);
        FileChannel fileReader = reader;
        boolean reReadOnNul = context.getProperty(REREAD_ON_NUL).asBoolean();
        if ((flowFile = session.write(flowFile, arg_0 -> this.lambda$processTailFile$1(positionHolder, fileReader, currentState, chksum, reReadOnNul, abort = new AtomicReference(), arg_0))).getSize() == 0L) {
            session.remove(flowFile);
            this.getLogger().debug("No data to consume; removed created FlowFile");
        } else {
            String tailFilename = file.getName();
            String baseName = StringUtils.substringBeforeLast((String)tailFilename, (String)".");
            String flowFileName = baseName.length() < tailFilename.length() ? baseName + "." + position + "-" + positionHolder.get() + "." + StringUtils.substringAfterLast((String)tailFilename, (String)".") : baseName + "." + position + "-" + positionHolder.get();
            flowFile = session.putAllAttributes(flowFile, Map.of(CoreAttributes.FILENAME.key(), flowFileName, CoreAttributes.MIME_TYPE.key(), "text/plain", "tailfile.original.path", tailFile));
            session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), "FlowFile contains bytes " + position + " through " + positionHolder.get() + " of source file", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
            session.transfer(flowFile, REL_SUCCESS);
            this.getLogger().debug("Created {} and routed to success", new Object[]{flowFile});
        }
        if (flowFile.getSize() > 0L || this.linesBuffer.size() > 0) {
            position = positionHolder.get();
            timestamp = Math.max(state.getTimestamp(), file.lastModified());
            length = file.length();
        }
        tfo.setState(new TailFileState(tailFile, file, reader, position, timestamp, length, checksum, state.getBuffer()));
        this.persistState(tfo, session, context);
        if (abort.get() != null) {
            long newPosition = positionHolder.get();
            try {
                reader.position(newPosition);
            }
            catch (IOException ex) {
                this.getLogger().warn("Couldn't reposition the reader for {} due to {}", new Object[]{file, ex, ex});
                try {
                    reader.close();
                }
                catch (IOException ex2) {
                    this.getLogger().warn("Failed to close reader for {} due to {}", new Object[]{file, ex2, ex2});
                }
            }
            throw (NulCharacterEncounteredException)abort.get();
        }
    }

    private long readLines(FileChannel reader, ByteBuffer buffer, OutputStream out, Checksum checksum, Boolean reReadOnNul) throws IOException {
        return this.readLines(reader, buffer, out, checksum, reReadOnNul, false);
    }

    private long readLines(FileChannel reader, ByteBuffer buffer, OutputStream out, Checksum checksum, Boolean reReadOnNul, boolean readFully) throws IOException {
        this.getLogger().debug("Reading lines starting at position {}", new Object[]{reader.position()});
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();){
            int num;
            long pos;
            long rePos = pos = reader.position();
            int linesRead = 0;
            boolean seenCR = false;
            buffer.clear();
            while ((num = reader.read(buffer)) != -1) {
                buffer.flip();
                block11: for (int i = 0; i < num; ++i) {
                    byte ch = buffer.get(i);
                    switch (ch) {
                        case 10: {
                            baos.write(ch);
                            seenCR = false;
                            this.flushByteArrayOutputStream(baos, out, checksum, false);
                            rePos = pos + (long)i + 1L;
                            ++linesRead;
                            continue block11;
                        }
                        case 13: {
                            baos.write(ch);
                            seenCR = true;
                            continue block11;
                        }
                        case 0: {
                            if (reReadOnNul.booleanValue()) {
                                throw new NulCharacterEncounteredException(rePos);
                            }
                        }
                        default: {
                            if (seenCR) {
                                seenCR = false;
                                this.flushByteArrayOutputStream(baos, out, checksum, false);
                                ++linesRead;
                                baos.write(ch);
                                rePos = pos + (long)i;
                                continue block11;
                            }
                            baos.write(ch);
                        }
                    }
                }
                pos = reader.position();
            }
            if (readFully) {
                this.flushByteArrayOutputStream(baos, out, checksum, true);
                rePos = reader.position();
            }
            if (rePos < reader.position()) {
                this.getLogger().debug("Read {} lines; repositioning reader from {} to {}", new Object[]{linesRead, pos, rePos});
                reader.position(rePos);
            }
            long l = rePos;
            return l;
        }
    }

    private void flushByteArrayOutputStream(ByteArrayOutputStream baos, OutputStream out, Checksum checksum, boolean ignoreRegex) throws IOException {
        String[] lines;
        byte[] baosBuffer = baos.toByteArray();
        baos.reset();
        if (ignoreRegex) {
            this.flushLinesBuffer(out, checksum);
        }
        if (this.lineStartPattern == null) {
            out.write(baosBuffer);
            checksum.update(baosBuffer, 0, baosBuffer.length);
            if (this.getLogger().isTraceEnabled()) {
                this.getLogger().trace("Checksum updated to {}", new Object[]{checksum.getValue()});
            }
            return;
        }
        String bufferAsString = new String(baosBuffer, StandardCharsets.UTF_8);
        for (String line : lines = bufferAsString.split("\n")) {
            boolean startsWithRegex = this.lineStartPattern.matcher(line).lookingAt();
            if (startsWithRegex || (long)this.linesBuffer.size() >= this.maxBufferBytes) {
                this.flushLinesBuffer(out, checksum);
            }
            this.linesBuffer.write(line.getBytes(StandardCharsets.UTF_8));
            this.linesBuffer.write(NEW_LINE_BYTES);
        }
    }

    private void flushLinesBuffer(OutputStream out, Checksum checksum) throws IOException {
        this.linesBuffer.writeTo(out);
        checksum.update(this.linesBuffer.toByteArray(), 0, this.linesBuffer.size());
        if (this.getLogger().isTraceEnabled()) {
            this.getLogger().trace("Checksum updated to {}", new Object[]{checksum.getValue()});
        }
        this.linesBuffer.reset();
    }

    private List<File> getRolledOffFiles(ProcessContext context, long minTimestamp, String tailFilePath) throws IOException {
        String rollingPattern;
        File tailFile = new File(tailFilePath);
        File directory = tailFile.getParentFile();
        if (directory == null) {
            directory = new File(".");
        }
        if ((rollingPattern = context.getProperty(ROLLING_FILENAME_PATTERN).getValue()) == null) {
            return Collections.emptyList();
        }
        rollingPattern = rollingPattern.replace("${filename}", StringUtils.substringBeforeLast((String)tailFile.getName(), (String)"."));
        ArrayList<File> rolledOffFiles = new ArrayList<File>();
        try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(directory.toPath(), rollingPattern);){
            for (Path path : dirStream) {
                File file = path.toFile();
                long lastMod = file.lastModified();
                if (lastMod >= minTimestamp && !file.equals(tailFile)) {
                    rolledOffFiles.add(file);
                    continue;
                }
                this.getLogger().debug("Found rolled off file {} but its last modified timestamp is before the cutoff (Last Mod = {}, Cutoff = {}) so will not consume it", new Object[]{file, lastMod, minTimestamp});
            }
        }
        rolledOffFiles.sort(Comparator.comparingLong(File::lastModified).thenComparing(File::getName));
        return rolledOffFiles;
    }

    private Scope getStateScope(ProcessContext context) {
        String location = context.getProperty(STATE_LOCATION).getValue();
        if (LOCATION_REMOTE.getValue().equalsIgnoreCase(location)) {
            return Scope.CLUSTER;
        }
        return Scope.LOCAL;
    }

    private void persistState(TailFileObject tfo, ProcessSession session, ProcessContext context) {
        this.persistState(tfo.getState().toStateMap(tfo.getFilenameIndex()), session, context);
    }

    private void persistState(Map<String, String> state, ProcessSession session, ProcessContext context) {
        try {
            Scope scope = this.getStateScope(context);
            StateMap oldState = session == null ? context.getStateManager().getState(scope) : session.getState(scope);
            HashMap<String, String> updatedState = new HashMap<String, String>(oldState.toMap());
            updatedState.putAll(state);
            if (session == null) {
                context.getStateManager().setState(updatedState, scope);
            } else {
                session.setState(updatedState, scope);
            }
        }
        catch (IOException e) {
            this.getLogger().warn("Failed to store state due to {}; some data may be duplicated on restart of NiFi", (Throwable)e);
        }
    }

    private FileChannel createReader(File file, long position) {
        FileChannel reader;
        try {
            reader = FileChannel.open(file.toPath(), StandardOpenOption.READ);
        }
        catch (IOException ioe) {
            this.getLogger().warn("Unable to open file {}; will attempt to access file again after the configured Yield Duration has elapsed", new Object[]{file, ioe});
            return null;
        }
        this.getLogger().debug("Created FileChannel {} for {}", new Object[]{reader, file});
        try {
            reader.position(position);
        }
        catch (IOException ioe) {
            this.getLogger().error("Failed to read from {}", new Object[]{file, ioe});
            try {
                reader.close();
                this.getLogger().debug("Closed FileChannel {}", new Object[]{reader});
            }
            catch (IOException iOException) {
                // empty catch block
            }
            return null;
        }
        return reader;
    }

    Map<String, TailFileObject> getState() {
        return this.states;
    }

    private boolean recoverRolledFiles(ProcessContext context, ProcessSession session, String tailFile, Long expectedChecksum, long timestamp, long position) {
        try {
            List<File> rolledOffFiles = this.getRolledOffFiles(context, timestamp, tailFile);
            return this.recoverRolledFiles(context, session, tailFile, rolledOffFiles, expectedChecksum, position);
        }
        catch (IOException e) {
            this.getLogger().error("Failed to recover files that have rolled over", (Throwable)e);
            return false;
        }
    }

    private boolean recoverRolledFiles(ProcessContext context, ProcessSession session, String tailFile, List<File> rolledOffFiles, Long expectedChecksum, long position) {
        try {
            boolean tailFirstFile;
            File firstFile;
            boolean rolloverOccurred;
            this.getLogger().debug("Recovering Rolled Off Files; total number of files rolled off = {}", new Object[]{rolledOffFiles.size()});
            TailFileObject tfo = this.states.get(tailFile);
            long postRolloverTailMillis = context.getProperty(POST_ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
            boolean tailingPostRollover = tfo.getState().isTailingPostRollover();
            boolean shouldTailPostRollover = postRolloverTailMillis > 0L;
            boolean bl = rolloverOccurred = !rolledOffFiles.isEmpty();
            if (rolloverOccurred) {
                firstFile = rolledOffFiles.getFirst();
                long millisSinceModified = this.getCurrentTimeMs() - firstFile.lastModified();
                boolean fileGrew = firstFile.length() >= position;
                boolean tailRolledFile = postRolloverTailMillis == 0L || millisSinceModified < postRolloverTailMillis;
                tailFirstFile = fileGrew && tailRolledFile && expectedChecksum != null;
            } else {
                tailFirstFile = false;
            }
            if (tailFirstFile) {
                firstFile = rolledOffFiles.getFirst();
                boolean consumed = shouldTailPostRollover ? this.tailRolledFile(context, session, tailFile, expectedChecksum, position, tfo, firstFile, false, true) : this.tailRolledFile(context, session, tailFile, expectedChecksum, position, tfo, firstFile, true, false);
                if (consumed) {
                    rolledOffFiles.removeFirst();
                }
            } else if (tailingPostRollover && shouldTailPostRollover) {
                List<File> allRolledFiles = this.getRolledOffFiles(context, 0L, tailFile);
                allRolledFiles.sort(Comparator.comparing(File::lastModified).reversed());
                File newestFile = allRolledFiles.getFirst();
                long millisSinceModified = this.getCurrentTimeMs() - newestFile.lastModified();
                if (millisSinceModified < postRolloverTailMillis) {
                    this.getLogger().debug("Rolled over file {} (size={}, lastModified={}) was modified {} millis ago, which isn't long enough to consume file fully without taking line endings into account. Will do nothing for now.", new Object[]{newestFile, newestFile.length(), newestFile.lastModified(), millisSinceModified});
                    return true;
                }
                boolean consumed = this.tailRolledFile(context, session, tailFile, expectedChecksum, position, tfo, newestFile, true, false);
                if (consumed) {
                    this.getLogger().debug("Consumed the final data from {}", new Object[]{newestFile});
                    rolledOffFiles.remove(newestFile);
                } else {
                    this.getLogger().debug("No more data to consume from {} (size={}, lastModified={})", new Object[]{newestFile, newestFile.length(), newestFile.lastModified()});
                }
            }
            for (File file : rolledOffFiles) {
                tfo.setState(this.consumeFileFully(file, context, session, tfo));
            }
            return rolloverOccurred;
        }
        catch (IOException e) {
            this.getLogger().error("Failed to recover files that have rolled over", (Throwable)e);
            return false;
        }
    }

    private boolean tailRolledFile(ProcessContext context, ProcessSession session, String tailFile, Long expectedChecksum, long position, TailFileObject tfo, File fileToTail, boolean readFully, boolean tailingPostRollover) throws IOException {
        Boolean reReadOnNul = context.getProperty(REREAD_ON_NUL).asBoolean();
        long startNanos = System.nanoTime();
        try (FileInputStream fis = new FileInputStream(fileToTail);){
            CheckedInputStream in;
            block17: {
                boolean bl;
                in = new CheckedInputStream(fis, new CRC32());
                try {
                    StreamUtils.copy((InputStream)in, (OutputStream)new NullOutputStream(), (long)position);
                    long checksumResult = in.getChecksum().getValue();
                    if (checksumResult == expectedChecksum) break block17;
                    this.getLogger().debug("Checksum for {} did not match expected checksum. Checksum for file was {} but expected {}. Will consume entire file", new Object[]{fileToTail, checksumResult, expectedChecksum});
                    bl = false;
                }
                catch (Throwable throwable) {
                    try {
                        in.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                    throw throwable;
                }
                in.close();
                return bl;
            }
            this.getLogger().debug("Checksum for {} matched expected checksum. Will skip first {} bytes", new Object[]{fileToTail, position});
            FlowFile flowFile = session.create();
            TailFileState currentState = tfo.getState();
            Checksum checksum = currentState.getChecksum() == null ? new CRC32() : currentState.getChecksum();
            ByteBuffer buffer = currentState.getBuffer() == null ? ByteBuffer.allocate(this.preAllocatedBufferSize) : currentState.getBuffer();
            FileChannel channel = fis.getChannel();
            long timestamp = fileToTail.lastModified();
            AtomicReference abort = new AtomicReference();
            flowFile = session.write(flowFile, out -> {
                try {
                    this.readLines(channel, buffer, out, checksum, reReadOnNul, readFully);
                }
                catch (NulCharacterEncounteredException ncee) {
                    abort.set(ncee);
                    this.getLogger().info("Encountered NUL character when tailing file {}; will yield", new Object[]{tailFile});
                    context.yield();
                }
            });
            if (flowFile.getSize() == 0L) {
                session.remove(flowFile);
            } else {
                flowFile = session.putAllAttributes(flowFile, Map.of(CoreAttributes.FILENAME.key(), fileToTail.getName(), CoreAttributes.MIME_TYPE.key(), "text/plain", "tailfile.original.path", tailFile));
                session.getProvenanceReporter().receive(flowFile, fileToTail.toURI().toString(), "FlowFile contains bytes 0 through " + position + " of source file", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
                session.transfer(flowFile, REL_SUCCESS);
                this.getLogger().debug("Created {} from rolled over file {} and routed to success", new Object[]{flowFile, fileToTail});
            }
            long postRolloverTailMillis = context.getProperty(POST_ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
            long millisSinceUpdate = this.getCurrentTimeMs() - timestamp;
            if (tailingPostRollover && postRolloverTailMillis > 0L) {
                this.getLogger().debug("File {} has been rolled over, but it was updated {} millis ago, which is less than the configured {} ({} ms), so will continue tailing", new Object[]{fileToTail, millisSinceUpdate, POST_ROLLOVER_TAIL_PERIOD.getDisplayName(), postRolloverTailMillis});
                long length = currentState.getLength() + flowFile.getSize();
                long updatedPosition = position + flowFile.getSize();
                TailFileState updatedState = new TailFileState(currentState.getFilename(), currentState.getFile(), channel, updatedPosition, timestamp, length, checksum, buffer, tailingPostRollover);
                tfo.setState(updatedState);
            } else {
                NulCharacterEncounteredException ncee = (NulCharacterEncounteredException)abort.get();
                if (ncee != null) {
                    throw ncee;
                }
                this.getLogger().debug("Completed tailing of file {}; will cleanup state", new Object[]{tailFile});
                this.cleanup(context);
                tfo.setState(new TailFileState(tailFile, null, null, 0L, fileToTail.lastModified() + 1L, fileToTail.length(), null, tfo.getState().getBuffer(), tailingPostRollover));
            }
            this.persistState(tfo, session, context);
            boolean bl = true;
            in.close();
            return bl;
        }
    }

    private TailFileState consumeFileFully(File file, ProcessContext context, ProcessSession session, TailFileObject tfo) throws IOException {
        FlowFile flowFile = session.create();
        try (FileInputStream fis = new FileInputStream(file);){
            flowFile = session.write(flowFile, out -> {
                this.flushLinesBuffer(out, new CRC32());
                StreamUtils.copy((InputStream)fis, (OutputStream)out);
            });
        }
        if (flowFile.getSize() == 0L) {
            session.remove(flowFile);
        } else {
            flowFile = session.putAllAttributes(flowFile, Map.of(CoreAttributes.FILENAME.key(), file.getName(), CoreAttributes.MIME_TYPE.key(), "text/plain", "tailfile.original.path", tfo.getState().getFilename()));
            session.getProvenanceReporter().receive(flowFile, file.toURI().toString());
            session.transfer(flowFile, REL_SUCCESS);
            this.getLogger().debug("Created {} from {} and routed to success", new Object[]{flowFile, file});
            this.cleanup(context);
            tfo.setState(new TailFileState(context.getProperty(FILENAME).evaluateAttributeExpressions().getValue(), null, null, 0L, file.lastModified() + 1L, file.length(), null, tfo.getState().getBuffer()));
            this.persistState(tfo, session, context);
        }
        return tfo.getState();
    }

    public long getCurrentTimeMs() {
        return System.currentTimeMillis();
    }

    private /* synthetic */ void lambda$processTailFile$1(AtomicLong positionHolder, FileChannel fileReader, TailFileState currentState, Checksum chksum, boolean reReadOnNul, AtomicReference abort, OutputStream rawOut) throws IOException {
        try (BufferedOutputStream out = new BufferedOutputStream(rawOut);){
            positionHolder.set(this.readLines(fileReader, currentState.getBuffer(), out, chksum, reReadOnNul));
        }
        catch (NulCharacterEncounteredException e) {
            positionHolder.set(e.getRePos());
            abort.set(e);
        }
    }

    static class TailFileState {
        private final String filename;
        private final File file;
        private final FileChannel reader;
        private final long position;
        private final long timestamp;
        private final long length;
        private final Checksum checksum;
        private final ByteBuffer buffer;
        private final boolean tailingPostRollover;

        public TailFileState(String filename, File file, FileChannel reader, long position, long timestamp, long length, Checksum checksum, ByteBuffer buffer) {
            this(filename, file, reader, position, timestamp, length, checksum, buffer, false);
        }

        public TailFileState(String filename, File file, FileChannel reader, long position, long timestamp, long length, Checksum checksum, ByteBuffer buffer, boolean tailingPostRollover) {
            this.filename = filename;
            this.file = file;
            this.reader = reader;
            this.position = position;
            this.length = length;
            this.timestamp = timestamp;
            this.checksum = checksum;
            this.buffer = buffer;
            this.tailingPostRollover = tailingPostRollover;
        }

        public String getFilename() {
            return this.filename;
        }

        public File getFile() {
            return this.file;
        }

        public FileChannel getReader() {
            return this.reader;
        }

        public long getPosition() {
            return this.position;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public long getLength() {
            return this.length;
        }

        public Checksum getChecksum() {
            return this.checksum;
        }

        public ByteBuffer getBuffer() {
            return this.buffer;
        }

        public boolean isTailingPostRollover() {
            return this.tailingPostRollover;
        }

        public String toString() {
            return "TailFileState[filename=" + this.filename + ", position=" + this.position + ", timestamp=" + this.timestamp + ", checksum=" + String.valueOf(this.checksum == null ? "null" : Long.valueOf(this.checksum.getValue())) + ", tailingPostRollover=" + this.tailingPostRollover + "]";
        }

        public Map<String, String> toStateMap(int index) {
            String prefix = TailFile.MAP_PREFIX + index + ".";
            HashMap<String, String> map = HashMap.newHashMap(4);
            map.put(prefix + "filename", this.filename);
            map.put(prefix + "position", String.valueOf(this.position));
            map.put(prefix + "length", String.valueOf(this.length));
            map.put(prefix + "timestamp", String.valueOf(this.timestamp));
            map.put(prefix + "checksum", this.checksum == null ? null : String.valueOf(this.checksum.getValue()));
            map.put(prefix + "tailingPostRollover", String.valueOf(this.tailingPostRollover));
            return map;
        }

        static class StateKeys {
            public static final String FILENAME = "filename";
            public static final String POSITION = "position";
            public static final String TIMESTAMP = "timestamp";
            public static final String CHECKSUM = "checksum";
            public static final String LENGTH = "length";
            public static final String TAILING_POST_ROLLOVER = "tailingPostRollover";

            StateKeys() {
            }
        }
    }

    static class TailFileObject {
        private TailFileState state;
        private Long expectedRecoveryChecksum;
        private final int filenameIndex;
        private boolean tailFileChanged = true;

        public TailFileObject(int index, TailFileState fileState) {
            this.filenameIndex = index;
            this.state = fileState;
        }

        public TailFileObject(int index, Map<String, String> statesMap, int preAllocatedBufferSize) {
            this.filenameIndex = index;
            this.tailFileChanged = false;
            String prefix = TailFile.MAP_PREFIX + index + ".";
            String filename = statesMap.get(prefix + "filename");
            long position = Long.parseLong(statesMap.get(prefix + "position"));
            long timestamp = Long.parseLong(statesMap.get(prefix + "timestamp"));
            long length = Long.parseLong(statesMap.get(prefix + "length"));
            boolean tailingPostRollover = Boolean.parseBoolean(prefix + "tailingPostRollover");
            this.state = new TailFileState(filename, new File(filename), null, position, timestamp, length, null, ByteBuffer.allocate(preAllocatedBufferSize), tailingPostRollover);
        }

        public int getFilenameIndex() {
            return this.filenameIndex;
        }

        public TailFileState getState() {
            return this.state;
        }

        public void setState(TailFileState state) {
            this.state = state;
        }

        public Long getExpectedRecoveryChecksum() {
            return this.expectedRecoveryChecksum;
        }

        public void setExpectedRecoveryChecksum(Long expectedRecoveryChecksum) {
            this.expectedRecoveryChecksum = expectedRecoveryChecksum;
        }

        public boolean isTailFileChanged() {
            return this.tailFileChanged;
        }

        public void setTailFileChanged(boolean tailFileChanged) {
            this.tailFileChanged = tailFileChanged;
        }
    }

    static class NulCharacterEncounteredException
    extends RuntimeException {
        private final long rePos;

        public NulCharacterEncounteredException(long rePos) {
            this.rePos = rePos;
        }

        public long getRePos() {
            return this.rePos;
        }

        @Override
        public synchronized Throwable fillInStackTrace() {
            return this;
        }
    }
}

