/*
 * Decompiled with CFR 0.152.
 */
package org.apache.jackrabbit.oak.index.indexer.document.flatfile;

import com.google.common.base.Charsets;
import com.google.common.base.Stopwatch;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.oak.commons.sort.ExternalSort;
import org.apache.jackrabbit.oak.index.indexer.document.CompositeException;
import org.apache.jackrabbit.oak.index.indexer.document.LastModifiedRange;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverserFactory;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.MemoryManager;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateHolder;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.PathElementComparator;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.SimpleNodeStateHolder;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.SortStrategy;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.TraverseAndSortTask;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultithreadedTraverseWithSortStrategy
implements SortStrategy {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final Charset charset = Charsets.UTF_8;
    private final boolean compressionEnabled;
    private final File storeDir;
    private final Comparator<NodeStateHolder> comparator;
    private final ConcurrentLinkedQueue<File> sortedFiles;
    private final ConcurrentLinkedQueue<Throwable> throwables;
    private final BlockingQueue<Callable<List<File>>> taskQueue;
    private final Phaser phaser;
    private static final Callable<List<File>> POISON_PILL = () -> null;
    private final MemoryManager memoryManager;

    MultithreadedTraverseWithSortStrategy(NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory, List<Long> lastModifiedBreakPoints, PathElementComparator pathComparator, BlobStore blobStore, File storeDir, Iterable<File> existingDataDumpDirs, boolean compressionEnabled, MemoryManager memoryManager) throws IOException {
        this.storeDir = storeDir;
        this.compressionEnabled = compressionEnabled;
        this.sortedFiles = new ConcurrentLinkedQueue();
        this.throwables = new ConcurrentLinkedQueue();
        this.comparator = (e1, e2) -> pathComparator.compare((Iterable<String>)e1.getPathElements(), (Iterable<String>)e2.getPathElements());
        this.taskQueue = new LinkedBlockingQueue<Callable<List<File>>>();
        this.phaser = new Phaser(){

            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                return phase == Phases.WAITING_FOR_RESULTS.value && registeredParties == 0;
            }
        };
        this.memoryManager = memoryManager;
        this.createInitialTasks(nodeStateEntryTraverserFactory, lastModifiedBreakPoints, blobStore, existingDataDumpDirs);
    }

    private void createInitialTasks(NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory, List<Long> lastModifiedBreakPoints, BlobStore blobStore, Iterable<File> existingDataDumpDirs) throws IOException {
        ConcurrentLinkedQueue<String> completedTasks = new ConcurrentLinkedQueue<String>();
        if (existingDataDumpDirs != null && existingDataDumpDirs.iterator().hasNext()) {
            ArrayList<LastModifiedRange> previousState = new ArrayList<LastModifiedRange>();
            for (File existingDataDumpDir : existingDataDumpDirs) {
                for (File existingSortWorkDir : existingDataDumpDir.listFiles()) {
                    if (!existingSortWorkDir.isDirectory()) {
                        this.log.info("Not a directory {}. Skipping it.", (Object)existingSortWorkDir.getAbsolutePath());
                        continue;
                    }
                    boolean downloadCompleted = DirectoryHelper.hasCompleted(existingSortWorkDir);
                    if (!downloadCompleted) {
                        long start = DirectoryHelper.getLastModifiedTimeFromDirName(existingSortWorkDir);
                        long end = DirectoryHelper.getLastModifiedOfLastDownloadedDocument(existingSortWorkDir);
                        previousState.add(new LastModifiedRange(start, end != -1L ? end + 1L : start));
                    }
                    this.log.info("Including existing sorted files from directory {} (hasCompleted={})", (Object)existingSortWorkDir.getAbsolutePath(), (Object)downloadCompleted);
                    DirectoryHelper.getDataFiles(existingSortWorkDir).forEach(file -> {
                        this.log.debug("Including existing sorted file {}", (Object)file.getName());
                        this.sortedFiles.add((File)file);
                    });
                }
            }
            this.resumeFromPreviousState(previousState, nodeStateEntryTraverserFactory, blobStore, completedTasks);
        } else {
            for (int i = 0; i < lastModifiedBreakPoints.size(); ++i) {
                long start = lastModifiedBreakPoints.get(i);
                long end = i < lastModifiedBreakPoints.size() - 1 ? lastModifiedBreakPoints.get(i + 1) : Long.MAX_VALUE;
                this.addTask(start, end, nodeStateEntryTraverserFactory, blobStore, completedTasks);
            }
        }
    }

    private void resumeFromPreviousState(List<LastModifiedRange> previousState, NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory, BlobStore blobStore, ConcurrentLinkedQueue<String> completedTasks) throws IOException {
        previousState.sort(Comparator.comparing(LastModifiedRange::getLastModifiedFrom));
        int i = 0;
        while (i < previousState.size()) {
            LastModifiedRange currentRange = previousState.get(i);
            LastModifiedRange nextRange = i < previousState.size() - 1 ? previousState.get(i + 1) : null;
            boolean skipNext = false;
            if (nextRange != null && currentRange.checkOverlap(nextRange)) {
                LastModifiedRange merged = currentRange.mergeWith(nextRange);
                this.log.info("Range overlap between " + currentRange + " and " + nextRange + ". Using merged range " + merged);
                currentRange = merged;
                nextRange = i < previousState.size() - 2 ? previousState.get(i + 2) : null;
                skipNext = true;
            }
            long start = currentRange.getLastModifiedTo() - 1L;
            long end = nextRange != null ? nextRange.getLastModifiedFrom() : Long.MAX_VALUE;
            this.addTask(start, end, nodeStateEntryTraverserFactory, blobStore, completedTasks);
            i = skipNext ? i + 2 : i + 1;
        }
    }

    private void addTask(long start, long end, NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory, BlobStore blobStore, ConcurrentLinkedQueue<String> completedTasks) throws IOException {
        LastModifiedRange range = new LastModifiedRange(start, end);
        this.taskQueue.add(new TraverseAndSortTask(range, this.comparator, blobStore, this.storeDir, this.compressionEnabled, completedTasks, this.taskQueue, this.phaser, nodeStateEntryTraverserFactory, this.memoryManager));
    }

    @Override
    public File createSortedStoreFile() throws IOException, CompositeException {
        String watcherThreadName = "watcher";
        Thread watcher = new Thread((Runnable)new TaskRunner(), watcherThreadName);
        watcher.start();
        this.phaser.awaitAdvance(Phases.WAITING_FOR_TASK_SPLITS.value);
        this.log.debug("All tasks completed. Signalling {} to proceed to result collection.", (Object)watcherThreadName);
        this.taskQueue.add(POISON_PILL);
        this.phaser.awaitAdvance(Phases.WAITING_FOR_RESULTS.value);
        if (!this.throwables.isEmpty()) {
            CompositeException exception = new CompositeException();
            for (Throwable throwable : this.throwables) {
                exception.addSuppressed(throwable);
            }
            throw exception;
        }
        this.log.debug("Result collection complete. Proceeding to merge.");
        return this.sortStoreFile();
    }

    @Override
    public long getEntryCount() {
        return 0L;
    }

    private File sortStoreFile() throws IOException {
        this.log.info("Proceeding to perform merge of {} sorted files", (Object)this.sortedFiles.size());
        Stopwatch w = Stopwatch.createStarted();
        File sortedFile = new File(this.storeDir, FlatFileStoreUtils.getSortedStoreFileName(this.compressionEnabled));
        ArrayList<File> inputSortedFilesToMerge = new ArrayList<File>(this.sortedFiles);
        try (BufferedWriter writer = FlatFileStoreUtils.createWriter(sortedFile, this.compressionEnabled);){
            Function<String, NodeStateHolder> func1 = line -> line == null ? null : new SimpleNodeStateHolder((String)line);
            Function<NodeStateHolder, String> func2 = holder -> holder == null ? null : holder.getLine();
            ExternalSort.mergeSortedFiles(inputSortedFilesToMerge, (BufferedWriter)writer, this.comparator, (Charset)this.charset, (boolean)true, (boolean)this.compressionEnabled, func2, func1);
        }
        this.log.info("Merging of sorted files completed in {}", (Object)w);
        return sortedFile;
    }

    static class DirectoryHelper {
        private static final String PREFIX = "sort-work-dir-";
        private static final String LAST_MODIFIED_TIME_DELIMITER = "-from-";
        private static final String STATUS_FILE_NAME = "last-saved";
        private static final String COMPLETION_MARKER_FILE_NAME = "completed";
        private static final Logger log = LoggerFactory.getLogger(DirectoryHelper.class);

        DirectoryHelper() {
        }

        static File createdSortWorkDir(File storeDir, String taskID, long lastModifiedLowerBound) throws IOException {
            File sortedFileDir = new File(storeDir, PREFIX + taskID + LAST_MODIFIED_TIME_DELIMITER + lastModifiedLowerBound);
            FileUtils.forceMkdir((File)sortedFileDir);
            return sortedFileDir;
        }

        static long getLastModifiedTimeFromDirName(File dir) {
            if (!dir.isDirectory()) {
                throw new IllegalArgumentException(dir.getAbsolutePath() + " is not a directory");
            }
            return Long.parseLong(dir.getName().substring(dir.getName().lastIndexOf(LAST_MODIFIED_TIME_DELIMITER) + LAST_MODIFIED_TIME_DELIMITER.length()));
        }

        static void markCompleted(File sortWorkDir) {
            try {
                Files.write(Paths.get(sortWorkDir.getAbsolutePath() + "/" + COMPLETION_MARKER_FILE_NAME, new String[0]), COMPLETION_MARKER_FILE_NAME.getBytes(), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
            }
            catch (IOException e) {
                log.warn("Resuming download will not be accurate. Could not mark the directory " + sortWorkDir.getAbsolutePath() + " completed.", (Throwable)e);
            }
        }

        static boolean hasCompleted(File sortWorkDir) {
            return new File(sortWorkDir + "/" + COMPLETION_MARKER_FILE_NAME).exists();
        }

        static void markLastProcessedStatus(File sortWorkDir, long lastModifiedTime) {
            try {
                Files.write(Paths.get(sortWorkDir.getAbsolutePath() + "/" + STATUS_FILE_NAME, new String[0]), ("" + lastModifiedTime).getBytes(), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
            }
            catch (IOException e) {
                log.warn("Resuming download will not be accurate. Could not save last processed status = " + lastModifiedTime + " in " + sortWorkDir.getAbsolutePath(), (Throwable)e);
            }
        }

        static long getLastModifiedOfLastDownloadedDocument(File sortWorkDir) throws IOException {
            File statusFile = new File(sortWorkDir.getAbsolutePath() + "/" + STATUS_FILE_NAME);
            if (!statusFile.exists()) {
                return -1L;
            }
            return Long.parseLong(Files.readAllLines(statusFile.toPath()).get(0));
        }

        static Stream<File> getDataFiles(File sortWorkDir) {
            return Arrays.stream(sortWorkDir.listFiles()).filter(f -> !STATUS_FILE_NAME.equals(f.getName()) && !COMPLETION_MARKER_FILE_NAME.equals(f.getName()));
        }
    }

    private class TaskRunner
    implements Runnable {
        private final ExecutorService executorService;
        private final int threadPoolSize = Integer.parseInt(System.getProperty("oak.indexer.dataDumpThreadPoolSize", "4"));

        public TaskRunner() {
            this.executorService = Executors.newFixedThreadPool(this.threadPoolSize);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                MultithreadedTraverseWithSortStrategy.this.log.info("Using a thread pool of size {}", (Object)this.threadPoolSize);
                ArrayList results = new ArrayList();
                while (true) {
                    Callable task;
                    if ((task = (Callable)MultithreadedTraverseWithSortStrategy.this.taskQueue.take()) == POISON_PILL) break;
                    results.add(this.executorService.submit(task));
                }
                MultithreadedTraverseWithSortStrategy.this.log.debug("Won't wait for new tasks now.");
                MultithreadedTraverseWithSortStrategy.this.log.debug("Registering to phaser and waiting for results now.");
                MultithreadedTraverseWithSortStrategy.this.phaser.register();
                try {
                    boolean exceptionsCaught = false;
                    for (Future future : results) {
                        try {
                            MultithreadedTraverseWithSortStrategy.this.sortedFiles.addAll((Collection)future.get());
                        }
                        catch (Throwable e) {
                            MultithreadedTraverseWithSortStrategy.this.throwables.add(e);
                            exceptionsCaught = true;
                        }
                    }
                    MultithreadedTraverseWithSortStrategy.this.log.debug("Completed result collection {}. Arriving at phaser now.", (Object)(!exceptionsCaught ? "fully" : "partially"));
                }
                finally {
                    MultithreadedTraverseWithSortStrategy.this.phaser.arrive();
                }
            }
            catch (InterruptedException e) {
                MultithreadedTraverseWithSortStrategy.this.log.error("Could not complete task submissions", (Throwable)e);
            }
            this.executorService.shutdown();
        }
    }

    private static enum Phases {
        WAITING_FOR_TASK_SPLITS(0),
        WAITING_FOR_RESULTS(1);

        private final int value;

        private Phases(int value) {
            this.value = value;
        }

        public int getValue() {
            return this.value;
        }
    }
}

