/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.jet.JetException;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.function.DistributedBiFunction;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.ReflectionUtils;
import com.hazelcast.logging.ILogger;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.Charset;
import java.nio.file.DirectoryStream;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class StreamFilesP<R>
extends AbstractProcessor {
    private static final int LINES_IN_ONE_BATCH = 64;
    private static final String SENSITIVITY_MODIFIER_CLASS_NAME = "com.sun.nio.file.SensitivityWatchEventModifier";
    private static final WatchEvent.Kind[] WATCH_EVENT_KINDS = new WatchEvent.Kind[]{StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE};
    private static final WatchEvent.Modifier[] WATCH_EVENT_MODIFIERS = StreamFilesP.getHighSensitivityModifiers();
    final Map<Path, FileOffset> fileOffsets = new HashMap<Path, FileOffset>();
    private final Path watchedDirectory;
    private final Charset charset;
    private final PathMatcher glob;
    private final int parallelism;
    private final int id;
    private final DistributedBiFunction<String, String, R> mapOutputFn;
    private final Queue<Path> eventQueue = new ArrayDeque<Path>();
    private WatchService watcher;
    private StringBuilder lineBuilder = new StringBuilder();
    private R pendingLine;
    private Path currentFile;
    private String currentFileName;
    private FileInputStream currentInputStream;
    private Reader currentReader;

    StreamFilesP(@Nonnull String watchedDirectory, @Nonnull Charset charset, @Nonnull String glob, int parallelism, int id, @Nonnull DistributedBiFunction<String, String, R> mapOutputFn) {
        this.watchedDirectory = Paths.get(watchedDirectory, new String[0]);
        this.charset = charset;
        this.glob = FileSystems.getDefault().getPathMatcher("glob:" + glob);
        this.parallelism = parallelism;
        this.id = id;
        this.mapOutputFn = mapOutputFn;
        this.setCooperative(false);
    }

    @Override
    protected void init(@Nonnull Processor.Context context) throws Exception {
        try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(this.watchedDirectory);){
            for (Path file : directoryStream) {
                if (!Files.isRegularFile(file, new LinkOption[0])) continue;
                this.fileOffsets.put(file, new FileOffset(-Files.size(file), ""));
            }
        }
        this.watcher = FileSystems.getDefault().newWatchService();
        this.watchedDirectory.register(this.watcher, WATCH_EVENT_KINDS, WATCH_EVENT_MODIFIERS);
        this.getLogger().info("Started to watch directory: " + this.watchedDirectory);
    }

    @Override
    public void close(@Nullable Throwable error) {
        try {
            this.closeCurrentFile();
            if (this.isClosed()) {
                return;
            }
            this.getLogger().fine("Closing StreamFilesP");
            this.watcher.close();
        }
        catch (IOException e) {
            this.getLogger().severe("Failed to close StreamFilesP", e);
        }
        finally {
            this.watcher = null;
        }
    }

    @Override
    public boolean complete() {
        if (this.isClosed()) {
            return true;
        }
        try {
            this.drainWatcherEvents();
            if (this.currentFile == null) {
                this.currentFile = this.eventQueue.poll();
                String string = this.currentFileName = this.currentFile != null ? String.valueOf(this.currentFile.getFileName()) : null;
            }
            if (this.currentFile != null) {
                this.processFile();
            }
            return false;
        }
        catch (InterruptedException e) {
            this.close(e);
            return true;
        }
    }

    private void drainWatcherEvents() throws InterruptedException {
        WatchKey key;
        ILogger logger = this.getLogger();
        WatchKey watchKey = key = this.currentFile == null && this.eventQueue.isEmpty() ? this.watcher.poll(1L, TimeUnit.SECONDS) : this.watcher.poll();
        if (key == null) {
            if (!Files.exists(this.watchedDirectory, new LinkOption[0])) {
                logger.info("Directory " + this.watchedDirectory + " does not exist, stopped watching");
                this.close(null);
            }
            return;
        }
        for (WatchEvent<?> event : key.pollEvents()) {
            WatchEvent.Kind<?> kind = event.kind();
            Path fileName = (Path)event.context();
            Path filePath = this.watchedDirectory.resolve(fileName);
            if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) {
                if (!this.glob.matches(fileName) || !this.belongsToThisProcessor(fileName) || Files.isDirectory(filePath, new LinkOption[0])) continue;
                LoggingUtil.logFine(logger, "Will open file to read new content: %s", filePath);
                this.eventQueue.add(filePath);
                continue;
            }
            if (kind == StandardWatchEventKinds.ENTRY_DELETE) {
                LoggingUtil.logFinest(logger, "File was deleted: %s", filePath);
                this.fileOffsets.remove(filePath);
                continue;
            }
            if (kind == StandardWatchEventKinds.OVERFLOW) {
                logger.warning("Detected OVERFLOW in " + this.watchedDirectory);
                continue;
            }
            throw new JetException("Unknown kind of WatchEvent: " + kind);
        }
        if (!key.reset()) {
            logger.info("Watch key is invalid. Stopping watcher.");
            this.close(null);
        }
    }

    private boolean belongsToThisProcessor(Path path) {
        return (path.hashCode() & Integer.MAX_VALUE) % this.parallelism == this.id;
    }

    private void processFile() {
        try {
            if (!this.ensureFileOpen()) {
                return;
            }
            for (int i = 0; i < 64; ++i) {
                if (this.pendingLine == null) {
                    String line = this.readCompleteLine(this.currentReader);
                    this.pendingLine = line != null ? this.mapOutputFn.apply(this.currentFileName, line) : null;
                    Object v0 = this.pendingLine;
                }
                if (this.pendingLine == null) {
                    this.fileOffsets.put(this.currentFile, new FileOffset(this.currentInputStream.getChannel().position(), this.lineBuilder.toString()));
                    this.lineBuilder.setLength(0);
                    this.closeCurrentFile();
                } else if (this.tryEmit(this.pendingLine)) {
                    this.pendingLine = null;
                    continue;
                }
                break;
            }
        }
        catch (IOException e) {
            this.close(e);
            throw ExceptionUtil.sneakyThrow(e);
        }
    }

    private boolean ensureFileOpen() throws IOException {
        if (this.currentReader != null) {
            return true;
        }
        FileOffset offset = this.fileOffsets.getOrDefault(this.currentFile, FileOffset.ZERO);
        LoggingUtil.logFine(this.getLogger(), "Processing file %s, previous offset: %s", this.currentFile, offset);
        try {
            FileInputStream fis = new FileInputStream(this.currentFile.toFile());
            fis.getChannel().position(offset.positiveOffset());
            BufferedReader r = new BufferedReader(new InputStreamReader((InputStream)fis, this.charset));
            if (offset.offset < 0L && !this.findEndOfLine(r)) {
                this.closeCurrentFile();
                return false;
            }
            this.currentReader = r;
            this.currentInputStream = fis;
            this.lineBuilder.append(offset.pendingLine);
            return true;
        }
        catch (FileNotFoundException ignored) {
            this.closeCurrentFile();
            return false;
        }
    }

    private boolean findEndOfLine(Reader in) throws IOException {
        int ch;
        do {
            if ((ch = in.read()) >= 0) continue;
            return false;
        } while (ch != 10 && ch != 13);
        StreamFilesP.maybeSkipLF(in, ch);
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    String readCompleteLine(Reader reader) throws IOException {
        int ch;
        while ((ch = reader.read()) >= 0) {
            if (ch == 13 || ch == 10) {
                StreamFilesP.maybeSkipLF(reader, ch);
                try {
                    String string = this.lineBuilder.toString();
                    return string;
                }
                finally {
                    this.lineBuilder.setLength(0);
                }
            }
            this.lineBuilder.append((char)ch);
        }
        return null;
    }

    private static void maybeSkipLF(Reader reader, int ch) throws IOException {
        if (ch == 13) {
            reader.mark(1);
            int ch2 = reader.read();
            if (ch2 != 10) {
                reader.reset();
            }
        }
    }

    private void closeCurrentFile() {
        if (this.currentReader != null) {
            try {
                this.currentReader.close();
            }
            catch (IOException e) {
                throw ExceptionUtil.sneakyThrow(e);
            }
        }
        this.currentFile = null;
        this.currentFileName = null;
        this.currentReader = null;
        this.currentInputStream = null;
    }

    private boolean isClosed() {
        return this.watcher == null;
    }

    @Nonnull
    public static ProcessorMetaSupplier metaSupplier(@Nonnull String watchedDirectory, @Nonnull String charset, @Nonnull String glob, @Nonnull DistributedBiFunction<String, String, ?> mapOutputFn) {
        return ProcessorMetaSupplier.of(count -> IntStream.range(0, count).mapToObj(i -> new StreamFilesP(watchedDirectory, Charset.forName(charset), glob, count, i, mapOutputFn)).collect(Collectors.toList()), 2);
    }

    private static WatchEvent.Modifier[] getHighSensitivityModifiers() {
        Object modifier = ReflectionUtils.readStaticFieldOrNull(SENSITIVITY_MODIFIER_CLASS_NAME, "HIGH");
        if (modifier instanceof WatchEvent.Modifier) {
            return new WatchEvent.Modifier[]{(WatchEvent.Modifier)modifier};
        }
        return new WatchEvent.Modifier[0];
    }

    private static final class FileOffset {
        private static final FileOffset ZERO = new FileOffset(0L, "");
        private final long offset;
        private final String pendingLine;

        private FileOffset(long offset, @Nonnull String pendingLine) {
            this.offset = offset;
            this.pendingLine = pendingLine;
        }

        private long positiveOffset() {
            return this.offset >= 0L ? this.offset : -this.offset - 1L;
        }

        public String toString() {
            return "FileOffset{offset=" + this.offset + ", pendingLine='" + this.pendingLine + '\'' + '}';
        }
    }
}

