/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.source.legacy;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class FileMonitoringFunction
implements SourceFunction<Tuple3<String, Long, Long>> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(FileMonitoringFunction.class);
    private String path;
    private long interval;
    private WatchType watchType;
    private Map<String, Long> offsetOfFiles;
    private Map<String, Long> modificationTimes;
    private volatile boolean isRunning = true;

    public FileMonitoringFunction(String path, long interval, WatchType watchType) {
        this.path = path;
        this.interval = interval;
        this.watchType = watchType;
        this.modificationTimes = new HashMap<String, Long>();
        this.offsetOfFiles = new HashMap<String, Long>();
    }

    @Override
    public void run(SourceFunction.SourceContext<Tuple3<String, Long, Long>> ctx) throws Exception {
        FileSystem fileSystem = FileSystem.get((URI)new URI(this.path));
        while (this.isRunning) {
            List<String> files = this.listNewFiles(fileSystem);
            for (String filePath : files) {
                if (this.watchType == WatchType.ONLY_NEW_FILES || this.watchType == WatchType.REPROCESS_WITH_APPENDED) {
                    ctx.collect((Tuple3<String, Long, Long>)new Tuple3((Object)filePath, (Object)0L, (Object)-1L));
                    this.offsetOfFiles.put(filePath, -1L);
                    continue;
                }
                if (this.watchType != WatchType.PROCESS_ONLY_APPENDED) continue;
                long offset = 0L;
                long fileSize = fileSystem.getFileStatus(new Path(filePath)).getLen();
                if (this.offsetOfFiles.containsKey(filePath)) {
                    offset = this.offsetOfFiles.get(filePath);
                }
                ctx.collect((Tuple3<String, Long, Long>)new Tuple3((Object)filePath, (Object)offset, (Object)fileSize));
                this.offsetOfFiles.put(filePath, fileSize);
                LOG.info("File processed: {}, {}, {}", new Object[]{filePath, offset, fileSize});
            }
            Thread.sleep(this.interval);
        }
    }

    private List<String> listNewFiles(FileSystem fileSystem) throws IOException {
        ArrayList<String> files = new ArrayList<String>();
        FileStatus[] statuses = fileSystem.listStatus(new Path(this.path));
        if (statuses == null) {
            LOG.warn("Path does not exist: {}", (Object)this.path);
        } else {
            for (FileStatus status : statuses) {
                long modificationTime;
                Path filePath = status.getPath();
                String fileName = filePath.getName();
                if (this.isFiltered(fileName, modificationTime = status.getModificationTime())) continue;
                files.add(filePath.toString());
                this.modificationTimes.put(fileName, modificationTime);
            }
        }
        return files;
    }

    private boolean isFiltered(String fileName, long modificationTime) {
        if (this.watchType == WatchType.ONLY_NEW_FILES && this.modificationTimes.containsKey(fileName) || fileName.startsWith(".") || fileName.contains("_COPYING_")) {
            return true;
        }
        Long lastModification = this.modificationTimes.get(fileName);
        return lastModification != null && lastModification >= modificationTime;
    }

    @Override
    public void cancel() {
        this.isRunning = false;
    }

    @Internal
    public static enum WatchType {
        ONLY_NEW_FILES,
        REPROCESS_WITH_APPENDED,
        PROCESS_ONLY_APPENDED;

    }
}

