/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.function.DistributedBiFunction;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public final class ReadFilesP<R>
extends AbstractProcessor {
    private final Charset charset;
    private final int parallelism;
    private final int id;
    private final DistributedBiFunction<String, String, R> mapOutputFn;
    private final Path directory;
    private final String glob;
    private DirectoryStream<Path> directoryStream;
    private Traverser<R> outputTraverser;
    private Stream<String> currentFileLines;

    private ReadFilesP(String directory, Charset charset, String glob, int parallelism, int id, DistributedBiFunction<String, String, R> mapOutputFn) {
        this.directory = Paths.get(directory, new String[0]);
        this.glob = glob;
        this.charset = charset;
        this.parallelism = parallelism;
        this.id = id;
        this.mapOutputFn = mapOutputFn;
    }

    @Override
    protected void init(@Nonnull Processor.Context context) throws Exception {
        this.directoryStream = Files.newDirectoryStream(this.directory, this.glob);
        this.outputTraverser = Traversers.traverseIterator(this.directoryStream.iterator()).filter(this::shouldProcessEvent).flatMap(this::processFile).onFirstNull(() -> {
            Util.uncheckRun(() -> this.close(null));
            this.directoryStream = null;
        });
    }

    @Override
    public boolean complete() {
        return this.emitFromTraverser(this.outputTraverser);
    }

    private boolean shouldProcessEvent(Path file) {
        if (Files.isDirectory(file, new LinkOption[0])) {
            return false;
        }
        int hashCode = file.hashCode();
        return (hashCode & Integer.MAX_VALUE) % this.parallelism == this.id;
    }

    private Traverser<R> processFile(Path file) {
        if (this.getLogger().isFinestEnabled()) {
            this.getLogger().finest("Processing file " + file);
        }
        try {
            assert (this.currentFileLines == null) : "currentFileLines != null";
            this.currentFileLines = Files.lines(file, this.charset);
            String fileName = file.getFileName().toString();
            return Traversers.traverseStream(this.currentFileLines).map(line -> this.mapOutputFn.apply(fileName, (String)line)).onFirstNull(() -> {
                this.currentFileLines.close();
                this.currentFileLines = null;
            });
        }
        catch (IOException e) {
            throw ExceptionUtil.sneakyThrow(e);
        }
    }

    @Override
    public void close(@Nullable Throwable error) throws IOException {
        IOException ex = null;
        if (this.directoryStream != null) {
            try {
                this.directoryStream.close();
            }
            catch (IOException e) {
                ex = e;
            }
        }
        if (this.currentFileLines != null) {
            this.currentFileLines.close();
        }
        if (ex != null) {
            throw ex;
        }
    }

    @Override
    public boolean isCooperative() {
        return false;
    }

    public static ProcessorMetaSupplier metaSupplier(@Nonnull String directory, @Nonnull String charset, @Nonnull String glob, @Nonnull DistributedBiFunction<String, String, ?> mapOutputFn) {
        return ProcessorMetaSupplier.of(count -> IntStream.range(0, count).mapToObj(i -> new ReadFilesP(directory, Charset.forName(charset), glob, count, i, mapOutputFn)).collect(Collectors.toList()), 2);
    }
}

