/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.jet.AbstractProcessor;
import com.hazelcast.jet.Processor;
import com.hazelcast.jet.ProcessorSupplier;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;

public class ReadFilesP
extends AbstractProcessor {
    private final Charset charset;
    private final int parallelism;
    private final int id;
    private final Path directory;
    private final String glob;

    ReadFilesP(String directory, Charset charset, String glob, int parallelism, int id) {
        this.directory = Paths.get(directory, new String[0]);
        this.glob = glob;
        this.charset = charset;
        this.parallelism = parallelism;
        this.id = id;
    }

    @Override
    public boolean complete() {
        try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(this.directory, this.glob);){
            StreamSupport.stream(directoryStream.spliterator(), false).filter(this::shouldProcessEvent).forEach(this::processFile);
        }
        catch (IOException e) {
            throw ExceptionUtil.sneakyThrow(e);
        }
        return true;
    }

    private boolean shouldProcessEvent(Path file) {
        if (Files.isDirectory(file, new LinkOption[0])) {
            return false;
        }
        int hashCode = file.hashCode();
        return (hashCode & Integer.MAX_VALUE) % this.parallelism == this.id;
    }

    private void processFile(Path file) {
        if (this.getLogger().isFinestEnabled()) {
            this.getLogger().finest("Processing file " + file);
        }
        try (BufferedReader reader = Files.newBufferedReader(file, this.charset);){
            String line;
            while ((line = reader.readLine()) != null) {
                this.emit(line);
            }
        }
        catch (IOException e) {
            throw ExceptionUtil.sneakyThrow(e);
        }
    }

    @Override
    public boolean isCooperative() {
        return false;
    }

    public static ProcessorSupplier supplier(final @Nonnull String directory, final @Nonnull String charset, final @Nonnull String glob) {
        return new ProcessorSupplier(){
            static final long serialVersionUID = 1L;

            @Override
            @Nonnull
            public Collection<? extends Processor> get(int count) {
                Charset charsetObj = Charset.forName(charset);
                return IntStream.range(0, count).mapToObj(i -> new ReadFilesP(directory, charsetObj, glob, count, i)).collect(Collectors.toList());
            }
        };
    }
}

