/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.jet.AbstractProcessor;
import com.hazelcast.jet.Processor;
import com.hazelcast.jet.ProcessorSupplier;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import com.sun.nio.file.SensitivityWatchEventModifier;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;

public class ReadFileStreamP
extends AbstractProcessor
implements Closeable {
    private final WatchType watchType;
    private final int parallelism;
    private final int id;
    private final Path watchedDirectory;
    private final Map<Path, Long> fileOffsets;
    private WatchService watcher;

    ReadFileStreamP(String folder, WatchType watchType, int parallelism, int id) {
        this.watchType = watchType;
        this.parallelism = parallelism;
        this.id = id;
        this.watchedDirectory = Paths.get(folder, new String[0]);
        this.fileOffsets = watchType == WatchType.APPENDED_ONLY ? new HashMap<Path, Long>() : null;
    }

    @Override
    protected void init(@Nonnull Processor.Context context) throws Exception {
        this.watcher = FileSystems.getDefault().newWatchService();
        this.watchedDirectory.register(this.watcher, new WatchEvent.Kind[]{StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE}, SensitivityWatchEventModifier.HIGH);
        this.getLogger().info("Started to watch the directory: " + this.watchedDirectory);
    }

    @Override
    public boolean complete() {
        try {
            boolean isDone = this.tryComplete();
            if (isDone) {
                this.close();
            }
            return isDone;
        }
        catch (InterruptedException ignored) {
            return true;
        }
        catch (IOException e) {
            throw ExceptionUtil.sneakyThrow(e);
        }
    }

    private boolean tryComplete() throws InterruptedException, IOException {
        WatchKey key = this.watcher.take();
        for (WatchEvent<?> event : key.pollEvents()) {
            WatchEvent.Kind<?> kind = event.kind();
            Path filePath = this.watchedDirectory.resolve((Path)event.context());
            if (kind == StandardWatchEventKinds.OVERFLOW) {
                this.getLogger().warning("Detected OVERFLOW in " + this.watchedDirectory);
                continue;
            }
            if (kind == StandardWatchEventKinds.ENTRY_DELETE && filePath.equals(this.watchedDirectory)) {
                this.getLogger().info("Directory " + this.watchedDirectory + " deleted, stopped watching");
                return true;
            }
            if (!this.shouldProcessEvent(filePath)) continue;
            this.processEvent(kind, filePath);
        }
        if (!key.reset()) {
            this.getLogger().info("Watch key is invalid. Stopping watcher.");
            return true;
        }
        return false;
    }

    private void processEvent(WatchEvent.Kind<?> kind, Path file) throws IOException {
        if (Files.isDirectory(file, new LinkOption[0])) {
            return;
        }
        if (kind == StandardWatchEventKinds.ENTRY_DELETE && this.watchType == WatchType.APPENDED_ONLY) {
            if (this.getLogger().isFineEnabled()) {
                this.getLogger().fine("Detected deleted file: " + file);
            }
            if (this.fileOffsets != null) {
                this.fileOffsets.remove(file);
            }
        } else if (kind == StandardWatchEventKinds.ENTRY_CREATE) {
            if (this.getLogger().isFineEnabled()) {
                this.getLogger().fine("Detected new file: " + file);
            }
            long newOffset = this.processFile(file, 0L);
            if (this.fileOffsets != null) {
                this.fileOffsets.put(file, newOffset);
            }
        } else if (kind == StandardWatchEventKinds.ENTRY_MODIFY && this.watchType != WatchType.NEW) {
            if (this.getLogger().isFineEnabled()) {
                this.getLogger().fine("Detected modified file: " + file);
            }
            long previousOffset = this.fileOffsets != null ? this.fileOffsets.getOrDefault(file, 0L) : 0L;
            long newOffset = this.processFile(file, previousOffset);
            if (this.fileOffsets != null) {
                this.fileOffsets.put(file, newOffset);
            }
        }
    }

    private boolean shouldProcessEvent(Path file) {
        int hashCode = file.hashCode();
        return (hashCode & Integer.MAX_VALUE) % this.parallelism == this.id;
    }

    /*
     * Exception decompiling
     */
    private long processFile(Path file, long offset) throws IOException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    @Override
    public boolean isCooperative() {
        return false;
    }

    @Override
    public void close() throws IOException {
        if (this.watcher != null) {
            this.getLogger().info("Closing watcher");
            this.watcher.close();
        }
    }

    public static ProcessorSupplier supplier(String folderPath, WatchType watchType) {
        return new Supplier(folderPath, watchType);
    }

    private static class Supplier
    implements ProcessorSupplier {
        static final long serialVersionUID = 1L;
        private final String folderPath;
        private final WatchType watchType;
        private transient ArrayList<ReadFileStreamP> readers;

        Supplier(String folderPath, WatchType watchType) {
            this.folderPath = folderPath;
            this.watchType = watchType;
        }

        @Nonnull
        public List<ReadFileStreamP> get(int count) {
            this.readers = new ArrayList();
            for (int i = 0; i < count; ++i) {
                this.readers.add(new ReadFileStreamP(this.folderPath, this.watchType, count, i));
            }
            return this.readers;
        }

        @Override
        public void complete(Throwable error) {
            this.readers.forEach(r -> Util.uncheckRun(r::close));
        }
    }

    public static enum WatchType {
        NEW,
        REPROCESS,
        APPENDED_ONLY;

    }
}

