/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.data.management.copy;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyableDataset;
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.policy.SelectAfterTimeBasedPolicy;
import org.apache.gobblin.data.management.policy.VersionSelectionPolicy;
import org.apache.gobblin.data.management.version.TimestampedDatasetVersion;
import org.apache.gobblin.data.management.version.finder.DateTimeDatasetVersionFinder;
import org.apache.gobblin.data.management.version.finder.VersionFinder;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
import org.apache.gobblin.util.filters.HiddenFilter;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TimestampBasedCopyableDataset
implements CopyableDataset,
FileSystemDataset {
    private static final Logger log = LoggerFactory.getLogger(TimestampBasedCopyableDataset.class);
    private final Path datasetRoot;
    private final VersionFinder<TimestampedDatasetVersion> datasetVersionFinder;
    private final VersionSelectionPolicy<TimestampedDatasetVersion> versionSelectionPolicy;
    private final ExecutorService executor;
    private final FileSystem srcFs;
    public static final String DATASET_VERSION_FINDER = "timestamp.based.copyable.dataset.version.finder";
    public static final String DEFAULT_DATASET_VERSION_FINDER = DateTimeDatasetVersionFinder.class.getName();
    public static final String COPY_POLICY = "timestamp.based.copyable.dataset.copy.policy";
    public static final String DEFAULT_COPY_POLICY = SelectAfterTimeBasedPolicy.class.getName();
    public static final String THREADPOOL_SIZE_TO_GET_COPYABLE_FILES = "threadpool.size.to.get.copyable.files";
    public static final String DEFAULT_THREADPOOL_SIZE_TO_GET_COPYABLE_FILES = "20";

    public TimestampBasedCopyableDataset(FileSystem fs, Properties props, Path datasetRoot) {
        this.srcFs = fs;
        this.datasetRoot = datasetRoot;
        try {
            Class<?> copyPolicyClass = Class.forName(props.getProperty(COPY_POLICY, DEFAULT_COPY_POLICY));
            this.versionSelectionPolicy = (VersionSelectionPolicy)copyPolicyClass.getConstructor(Properties.class).newInstance(props);
            Class<?> timestampedDatasetVersionFinderClass = Class.forName(props.getProperty(DATASET_VERSION_FINDER, DEFAULT_DATASET_VERSION_FINDER));
            this.datasetVersionFinder = (VersionFinder)timestampedDatasetVersionFinderClass.getConstructor(FileSystem.class, Properties.class).newInstance(this.srcFs, props);
        }
        catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException exception) {
            throw new RuntimeException(exception);
        }
        this.executor = ScalingThreadPoolExecutor.newScalingThreadPool((int)0, (int)Integer.parseInt(props.getProperty(THREADPOOL_SIZE_TO_GET_COPYABLE_FILES, DEFAULT_THREADPOOL_SIZE_TO_GET_COPYABLE_FILES)), (long)100L, (ThreadFactory)ExecutorsUtils.newThreadFactory((Optional)Optional.of((Object)log), (Optional)Optional.of((Object)this.getClass().getSimpleName())));
    }

    public Collection<CopyableFile> getCopyableFiles(FileSystem targetFs, CopyConfiguration configuration) throws IOException {
        log.info(String.format("Getting copyable files at root path: %s", this.datasetRoot));
        ArrayList versions = Lists.newArrayList(this.datasetVersionFinder.findDatasetVersions(this));
        if (versions.isEmpty()) {
            log.warn("No dataset version can be found. Ignoring.");
            return Lists.newArrayList();
        }
        Collection<TimestampedDatasetVersion> copyableVersions = this.versionSelectionPolicy.listSelectedVersions(versions);
        ConcurrentLinkedQueue<CopyableFile> copyableFileList = new ConcurrentLinkedQueue<CopyableFile>();
        ArrayList futures = Lists.newArrayList();
        for (TimestampedDatasetVersion copyableVersion : copyableVersions) {
            futures.add(this.executor.submit(this.getCopyableFileGenetator(targetFs, configuration, copyableVersion, copyableFileList)));
        }
        try {
            for (Future future : futures) {
                future.get();
            }
        }
        catch (InterruptedException | ExecutionException e) {
            throw new IOException("Failed to generate copyable files.", e);
        }
        finally {
            ExecutorsUtils.shutdownExecutorService((ExecutorService)this.executor, (Optional)Optional.of((Object)log));
        }
        return copyableFileList;
    }

    @VisibleForTesting
    protected CopyableFileGenerator getCopyableFileGenetator(FileSystem targetFs, CopyConfiguration configuration, TimestampedDatasetVersion copyableVersion, ConcurrentLinkedQueue<CopyableFile> copyableFileList) {
        return new CopyableFileGenerator(this.srcFs, targetFs, configuration, this.datasetRoot, this.getTargetRoot(configuration.getPublishDir()), copyableVersion.getDateTime(), copyableVersion.getPaths(), copyableFileList, this.copyableFileFilter());
    }

    protected PathFilter copyableFileFilter() {
        return new HiddenFilter();
    }

    protected Path getTargetRoot(Path publishDir) {
        return new Path(publishDir, this.datasetRoot.getName());
    }

    public String datasetURN() {
        return this.datasetRoot().toString();
    }

    public Path datasetRoot() {
        return this.datasetRoot;
    }

    public Path getDatasetRoot() {
        return this.datasetRoot;
    }

    public VersionFinder<TimestampedDatasetVersion> getDatasetVersionFinder() {
        return this.datasetVersionFinder;
    }

    public VersionSelectionPolicy<TimestampedDatasetVersion> getVersionSelectionPolicy() {
        return this.versionSelectionPolicy;
    }

    public ExecutorService getExecutor() {
        return this.executor;
    }

    public FileSystem getSrcFs() {
        return this.srcFs;
    }

    protected static class CopyableFileGenerator
    implements Runnable {
        private final FileSystem srcFs;
        private final FileSystem targetFs;
        private final CopyConfiguration configuration;
        private final Path datasetRoot;
        private final Path targetRoot;
        private final DateTime versionDatetime;
        private final Collection<Path> locationsToCopy;
        private final ConcurrentLinkedQueue<CopyableFile> copyableFileList;
        private final PathFilter filter;

        @Override
        public void run() {
            for (Path locationToCopy : this.locationsToCopy) {
                long timestampFromPath = this.versionDatetime.getMillis();
                try {
                    for (FileStatus singleFile : this.srcFs.listStatus(locationToCopy, this.filter)) {
                        Path singleFilePath = singleFile.getPath();
                        log.debug("Checking if it is a copyable file: " + singleFilePath);
                        Path relativePath = PathUtils.relativizePath((Path)singleFilePath, (Path)this.datasetRoot);
                        Path targetPath = new Path(this.targetRoot, relativePath);
                        if (!this.isCopyableFile(singleFile, targetPath)) continue;
                        log.debug("Will create workunit for: " + singleFilePath);
                        this.copyableFileList.add(this.generateCopyableFile(singleFile, targetPath, timestampFromPath, locationToCopy));
                    }
                }
                catch (IOException e) {
                    e.printStackTrace();
                    throw new RuntimeException("Failed to get copyable files for " + locationToCopy, e);
                }
            }
        }

        @VisibleForTesting
        protected CopyableFile generateCopyableFile(FileStatus singleFile, Path targetPath, long timestampFromPath, Path locationToCopy) throws IOException {
            return CopyableFile.fromOriginAndDestination(this.srcFs, singleFile, targetPath, this.configuration).originTimestamp(timestampFromPath).upstreamTimestamp(timestampFromPath).fileSet(PathUtils.getPathWithoutSchemeAndAuthority((Path)locationToCopy).toString()).build();
        }

        private boolean isCopyableFile(FileStatus srcFileStatus, Path targetPath) throws IOException {
            if (!this.targetFs.exists(targetPath)) {
                return true;
            }
            return srcFileStatus.getModificationTime() > this.targetFs.getFileStatus(targetPath).getModificationTime();
        }

        public CopyableFileGenerator(FileSystem srcFs, FileSystem targetFs, CopyConfiguration configuration, Path datasetRoot, Path targetRoot, DateTime versionDatetime, Collection<Path> locationsToCopy, ConcurrentLinkedQueue<CopyableFile> copyableFileList, PathFilter filter) {
            this.srcFs = srcFs;
            this.targetFs = targetFs;
            this.configuration = configuration;
            this.datasetRoot = datasetRoot;
            this.targetRoot = targetRoot;
            this.versionDatetime = versionDatetime;
            this.locationsToCopy = locationsToCopy;
            this.copyableFileList = copyableFileList;
            this.filter = filter;
        }
    }
}

