/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.data.management.copy.replication;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Properties;
import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopyableDataset;
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
import org.apache.gobblin.data.management.copy.replication.CopyRoute;
import org.apache.gobblin.data.management.copy.replication.EndPoint;
import org.apache.gobblin.data.management.copy.replication.HadoopFsEndPoint;
import org.apache.gobblin.data.management.copy.replication.ReplicationConfiguration;
import org.apache.gobblin.data.management.copy.replication.WatermarkMetadataGenerationCommitStep;
import org.apache.gobblin.data.management.dataset.DatasetUtils;
import org.apache.gobblin.source.extractor.ComparableWatermark;
import org.apache.gobblin.source.extractor.Watermark;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.commit.DeleteFileCommitStep;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConfigBasedDataset
implements CopyableDataset {
    private static final Logger log = LoggerFactory.getLogger(ConfigBasedDataset.class);
    private final Properties props;
    private final CopyRoute copyRoute;
    private final ReplicationConfiguration rc;
    private String datasetURN;
    private boolean watermarkEnabled;
    private final PathFilter pathFilter;
    private final boolean applyFilterToDirectories;

    public ConfigBasedDataset(ReplicationConfiguration rc, Properties props, CopyRoute copyRoute) {
        this.props = props;
        this.copyRoute = copyRoute;
        this.rc = rc;
        this.calculateDatasetURN();
        this.watermarkEnabled = Boolean.parseBoolean(this.props.getProperty("gobblin.copy.configBased.watermark.enabled", "true"));
        this.pathFilter = DatasetUtils.instantiatePathFilter(this.props);
        this.applyFilterToDirectories = Boolean.parseBoolean(this.props.getProperty("gobblin.copy.applyFilterToDirectories", "false"));
    }

    public ConfigBasedDataset(ReplicationConfiguration rc, Properties props, CopyRoute copyRoute, String datasetURN) {
        this.props = props;
        this.copyRoute = copyRoute;
        this.rc = rc;
        this.datasetURN = datasetURN;
        this.pathFilter = DatasetUtils.instantiatePathFilter(this.props);
        this.applyFilterToDirectories = Boolean.parseBoolean(this.props.getProperty("gobblin.copy.applyFilterToDirectories", "false"));
    }

    private void calculateDatasetURN() {
        EndPoint e = this.copyRoute.getCopyTo();
        if (e instanceof HadoopFsEndPoint) {
            HadoopFsEndPoint copyTo = (HadoopFsEndPoint)e;
            Configuration conf = HadoopUtils.newConfiguration();
            try {
                FileSystem copyToFs = FileSystem.get((URI)copyTo.getFsURI(), (Configuration)conf);
                this.datasetURN = copyToFs.makeQualified(copyTo.getDatasetPath()).toString();
            }
            catch (IOException iOException) {}
        } else {
            this.datasetURN = e.toString();
        }
    }

    public String datasetURN() {
        return this.datasetURN;
    }

    @Override
    public Collection<? extends CopyEntity> getCopyableFiles(FileSystem targetFs, CopyConfiguration copyConfiguration) throws IOException {
        ArrayList copyableFiles = Lists.newArrayList();
        EndPoint copyFromRaw = this.copyRoute.getCopyFrom();
        EndPoint copyToRaw = this.copyRoute.getCopyTo();
        if (!(copyFromRaw instanceof HadoopFsEndPoint) || !(copyToRaw instanceof HadoopFsEndPoint)) {
            log.warn("Currently only handle the Hadoop Fs EndPoint replication");
            return copyableFiles;
        }
        HadoopFsEndPoint copyFrom = (HadoopFsEndPoint)copyFromRaw;
        HadoopFsEndPoint copyTo = (HadoopFsEndPoint)copyToRaw;
        copyFrom.setPathFilter(this.pathFilter);
        copyFrom.setApplyFilterToDirectories(this.applyFilterToDirectories);
        copyTo.setPathFilter(this.pathFilter);
        copyTo.setApplyFilterToDirectories(this.applyFilterToDirectories);
        if (this.watermarkEnabled && (!copyFromRaw.getWatermark().isPresent() && copyToRaw.getWatermark().isPresent() || copyFromRaw.getWatermark().isPresent() && copyToRaw.getWatermark().isPresent() && ((ComparableWatermark)copyFromRaw.getWatermark().get()).compareTo(copyToRaw.getWatermark().get()) <= 0)) {
            log.info("No need to copy as destination watermark >= source watermark with source watermark {}, for dataset with metadata {}", copyFromRaw.getWatermark().isPresent() ? ((ComparableWatermark)copyFromRaw.getWatermark().get()).toJson() : "N/A", (Object)this.rc.getMetaData());
            return copyableFiles;
        }
        Configuration conf = HadoopUtils.newConfiguration();
        FileSystem copyFromFs = FileSystem.get((URI)copyFrom.getFsURI(), (Configuration)conf);
        FileSystem copyToFs = FileSystem.get((URI)copyTo.getFsURI(), (Configuration)conf);
        Collection<FileStatus> allFilesInSource = copyFrom.getFiles();
        Collection<FileStatus> allFilesInTarget = copyTo.getFiles();
        HashSet copyFromFileStatuses = Sets.newHashSet(allFilesInSource);
        HashMap copyToFileMap = Maps.newHashMap();
        for (FileStatus f : allFilesInTarget) {
            copyToFileMap.put(PathUtils.getPathWithoutSchemeAndAuthority((Path)f.getPath()), f);
        }
        ArrayList deletedPaths = Lists.newArrayList();
        boolean watermarkMetadataCopied = false;
        boolean deleteTargetIfNotExistOnSource = this.rc.isDeleteTargetIfNotExistOnSource();
        for (FileStatus originFileStatus : copyFromFileStatuses) {
            Path relative = PathUtils.relativizePath((Path)PathUtils.getPathWithoutSchemeAndAuthority((Path)originFileStatus.getPath()), (Path)PathUtils.getPathWithoutSchemeAndAuthority((Path)copyFrom.getDatasetPath()));
            Path newPath = new Path(copyTo.getDatasetPath(), relative);
            if (relative.toString().equals("_metadata")) {
                watermarkMetadataCopied = true;
            }
            if (copyToFileMap.containsKey(newPath) && ((FileStatus)copyToFileMap.get(newPath)).getLen() == originFileStatus.getLen() && ((FileStatus)copyToFileMap.get(newPath)).getModificationTime() > originFileStatus.getModificationTime()) {
                log.debug("Copy from timestamp older than copy to timestamp, skipped copy {} for dataset with metadata {}", (Object)originFileStatus.getPath(), (Object)this.rc.getMetaData());
            } else {
                if (copyToFileMap.containsKey(newPath)) {
                    deletedPaths.add(newPath);
                }
                CopyableFile copyableFile = CopyableFile.fromOriginAndDestination(copyFromFs, originFileStatus, copyToFs.makeQualified(newPath), copyConfiguration).fileSet(PathUtils.getPathWithoutSchemeAndAuthority((Path)copyTo.getDatasetPath()).toString()).build();
                copyableFile.setFsDatasets(copyFromFs, copyToFs);
                copyableFiles.add(copyableFile);
            }
            copyToFileMap.remove(newPath);
        }
        if (deleteTargetIfNotExistOnSource) {
            deletedPaths.addAll(copyToFileMap.keySet());
        }
        if (!deletedPaths.isEmpty()) {
            DeleteFileCommitStep deleteCommitStep = DeleteFileCommitStep.fromPaths(copyToFs, deletedPaths, this.props);
            copyableFiles.add(new PrePublishStep(copyTo.getDatasetPath().toString(), Maps.newHashMap(), deleteCommitStep, 0));
        }
        if (!watermarkMetadataCopied && copyFrom.getWatermark().isPresent()) {
            copyableFiles.add(new PostPublishStep(copyTo.getDatasetPath().toString(), Maps.newHashMap(), new WatermarkMetadataGenerationCommitStep(copyTo.getFsURI().toString(), copyTo.getDatasetPath(), (Watermark)copyFrom.getWatermark().get()), 1));
        }
        return copyableFiles;
    }
}

