/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.jet.AbstractProcessor;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Processor;
import com.hazelcast.jet.ProcessorSupplier;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.sun.nio.file.SensitivityWatchEventModifier;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.Charset;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;

public class StreamFilesP
extends AbstractProcessor
implements Closeable {
    private static final int LINES_IN_ONE_BATCH = 64;
    final Map<Path, Long> fileOffsets = new HashMap<Path, Long>();
    private final Path watchedDirectory;
    private final Charset charset;
    private final PathMatcher glob;
    private final int parallelism;
    private final int id;
    private final Queue<Path> eventQueue = new ArrayDeque<Path>();
    private WatchService watcher;
    private StringBuilder lineBuilder = new StringBuilder();
    private Path currentFile;
    private FileInputStream currentInputStream;
    private Reader currentReader;

    StreamFilesP(@Nonnull String watchedDirectory, @Nonnull Charset charset, @Nonnull String glob, int parallelism, int id) {
        this.watchedDirectory = Paths.get(watchedDirectory, new String[0]);
        this.charset = charset;
        this.glob = FileSystems.getDefault().getPathMatcher("glob:" + glob);
        this.parallelism = parallelism;
        this.id = id;
        this.setCooperative(false);
    }

    @Override
    protected void init(@Nonnull Processor.Context context) throws Exception {
        for (Path file : Files.newDirectoryStream(this.watchedDirectory)) {
            if (!Files.isRegularFile(file, new LinkOption[0])) continue;
            this.fileOffsets.put(file, -Files.size(file));
        }
        this.watcher = FileSystems.getDefault().newWatchService();
        this.watchedDirectory.register(this.watcher, new WatchEvent.Kind[]{StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE}, SensitivityWatchEventModifier.HIGH);
        this.getLogger().info("Started to watch directory: " + this.watchedDirectory);
    }

    @Override
    public void close() {
        try {
            this.closeCurrentFile();
            if (this.isClosed()) {
                return;
            }
            this.getLogger().info("Closing StreamFilesP. Any pending watch events will be processed.");
            this.watcher.close();
        }
        catch (IOException e) {
            this.getLogger().severe("Failed to close StreamFilesP", e);
        }
        finally {
            this.watcher = null;
        }
    }

    @Override
    public boolean complete() {
        try {
            if (!this.isClosed()) {
                this.drainWatcherEvents();
            } else if (this.eventQueue.isEmpty()) {
                return true;
            }
            if (this.currentFile == null) {
                this.currentFile = this.eventQueue.poll();
            }
            if (this.currentFile != null) {
                this.processFile();
            }
            return false;
        }
        catch (InterruptedException e) {
            this.close();
            return true;
        }
    }

    private void drainWatcherEvents() throws InterruptedException {
        WatchKey key;
        ILogger logger = this.getLogger();
        WatchKey watchKey = key = this.currentFile == null && this.eventQueue.isEmpty() ? this.watcher.poll(1L, TimeUnit.SECONDS) : this.watcher.poll();
        if (key == null) {
            if (!Files.exists(this.watchedDirectory, new LinkOption[0])) {
                logger.info("Directory " + this.watchedDirectory + " does not exist, stopped watching");
                this.close();
            }
            return;
        }
        for (WatchEvent<?> event : key.pollEvents()) {
            WatchEvent.Kind<?> kind = event.kind();
            Path fileName = (Path)event.context();
            Path filePath = this.watchedDirectory.resolve(fileName);
            if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) {
                if (!this.glob.matches(fileName) || !this.belongsToThisProcessor(fileName) || Files.isDirectory(filePath, new LinkOption[0])) continue;
                LoggingUtil.logFine(logger, "Will open file to read new content: %s", filePath);
                this.eventQueue.add(filePath);
                continue;
            }
            if (kind == StandardWatchEventKinds.ENTRY_DELETE) {
                LoggingUtil.logFinest(logger, "File was deleted: %s", filePath);
                this.fileOffsets.remove(filePath);
                continue;
            }
            if (kind == StandardWatchEventKinds.OVERFLOW) {
                logger.warning("Detected OVERFLOW in " + this.watchedDirectory);
                continue;
            }
            throw new JetException("Unknown kind of WatchEvent: " + kind);
        }
        if (!key.reset()) {
            logger.info("Watch key is invalid. Stopping watcher.");
            this.close();
        }
    }

    private boolean belongsToThisProcessor(Path path) {
        return (path.hashCode() & Integer.MAX_VALUE) % this.parallelism == this.id;
    }

    private void processFile() {
        try {
            if (!this.ensureFileOpen()) {
                return;
            }
            for (int i = 0; i < 64; ++i) {
                String line = this.readCompleteLine(this.currentReader);
                if (line == null) {
                    this.fileOffsets.put(this.currentFile, this.currentInputStream.getChannel().position());
                    this.closeCurrentFile();
                    break;
                }
                this.emit(line);
            }
        }
        catch (IOException e) {
            this.close();
            throw ExceptionUtil.sneakyThrow(e);
        }
    }

    private boolean ensureFileOpen() throws IOException {
        if (this.currentReader != null) {
            return true;
        }
        long offset = this.fileOffsets.getOrDefault(this.currentFile, 0L);
        LoggingUtil.logFinest(this.getLogger(), "Processing file %s, previous offset: %,d", this.currentFile, offset);
        try {
            FileInputStream fis = new FileInputStream(this.currentFile.toFile());
            fis.getChannel().position(offset >= 0L ? offset : -offset - 1L);
            BufferedReader r = new BufferedReader(new InputStreamReader((InputStream)fis, this.charset));
            if (offset < 0L && !this.findNextLine(r, offset)) {
                this.closeCurrentFile();
                return false;
            }
            this.currentReader = r;
            this.currentInputStream = fis;
            return true;
        }
        catch (FileNotFoundException ignored) {
            this.closeCurrentFile();
            return false;
        }
    }

    private boolean findNextLine(Reader in, long offset) throws IOException {
        int ch;
        do {
            if ((ch = in.read()) >= 0) continue;
            this.fileOffsets.put(this.currentFile, offset);
            return false;
        } while (ch != 10 && ch != 13);
        StreamFilesP.maybeSkipLF(in, ch);
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    String readCompleteLine(Reader reader) throws IOException {
        int ch;
        while ((ch = reader.read()) >= 0 && ch >= 0) {
            if (ch == 13 || ch == 10) {
                StreamFilesP.maybeSkipLF(reader, ch);
                try {
                    String string = this.lineBuilder.toString();
                    return string;
                }
                finally {
                    this.lineBuilder.setLength(0);
                }
            }
            this.lineBuilder.append((char)ch);
        }
        return null;
    }

    private static void maybeSkipLF(Reader reader, int ch) throws IOException {
        if (ch == 13) {
            reader.mark(1);
            int ch2 = reader.read();
            if (ch2 != 10) {
                reader.reset();
            }
        }
    }

    private void closeCurrentFile() {
        if (this.currentReader != null) {
            try {
                this.currentReader.close();
            }
            catch (IOException e) {
                throw ExceptionUtil.sneakyThrow(e);
            }
        }
        this.currentFile = null;
        this.currentReader = null;
        this.currentInputStream = null;
    }

    private boolean isClosed() {
        return this.watcher == null;
    }

    public static ProcessorSupplier supplier(@Nonnull String watchedDirectory, @Nonnull String charset, @Nonnull String glob) {
        return new Supplier(watchedDirectory, charset, glob);
    }

    private static final class Supplier
    implements ProcessorSupplier {
        static final long serialVersionUID = 1L;
        private final String watchedDirectory;
        private final String charset;
        private final String glob;
        private transient ArrayList<StreamFilesP> processors;

        private Supplier(String watchedDirectory, String charset, String glob) {
            this.watchedDirectory = watchedDirectory;
            this.charset = charset;
            this.glob = glob;
        }

        @Nonnull
        public List<StreamFilesP> get(int count) {
            this.processors = new ArrayList(count);
            Charset charsetObj = Charset.forName(this.charset);
            for (int i = 0; i < count; ++i) {
                this.processors.add(new StreamFilesP(this.watchedDirectory, charsetObj, this.glob, count, i));
            }
            return this.processors;
        }

        @Override
        public void complete(Throwable error) {
            this.processors.forEach(r -> Util.uncheckRun(r::close));
        }
    }
}

