/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.data.management.copy;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.UnmodifiableIterator;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopyableDatasetBase;
import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata;
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.CopyableGlobDatasetFinder;
import org.apache.gobblin.data.management.copy.FileAwareInputStream;
import org.apache.gobblin.data.management.copy.extractor.EmptyExtractor;
import org.apache.gobblin.data.management.copy.extractor.FileAwareInputStreamExtractor;
import org.apache.gobblin.data.management.copy.hive.HiveDataset;
import org.apache.gobblin.data.management.copy.prioritization.FileSetComparator;
import org.apache.gobblin.data.management.copy.replication.ConfigBasedDataset;
import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
import org.apache.gobblin.data.management.copy.watermark.CopyableFileWatermarkGenerator;
import org.apache.gobblin.data.management.copy.watermark.CopyableFileWatermarkHelper;
import org.apache.gobblin.data.management.dataset.DatasetUtils;
import org.apache.gobblin.data.management.partition.CopyableDatasetRequestor;
import org.apache.gobblin.data.management.partition.FileSet;
import org.apache.gobblin.data.management.partition.FileSetResourceEstimator;
import org.apache.gobblin.dataset.DatasetsFinder;
import org.apache.gobblin.dataset.IterableDatasetFinder;
import org.apache.gobblin.dataset.IterableDatasetFinderImpl;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.WatermarkInterval;
import org.apache.gobblin.source.extractor.extract.AbstractSource;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitWeighter;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.WriterUtils;
import org.apache.gobblin.util.binpacking.FieldWeighter;
import org.apache.gobblin.util.binpacking.WorstFitDecreasingBinPacking;
import org.apache.gobblin.util.deprecation.DeprecationUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
import org.apache.gobblin.util.guid.Guid;
import org.apache.gobblin.util.guid.HasGuid;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.util.request_allocation.AllocatedRequestsIterator;
import org.apache.gobblin.util.request_allocation.GreedyAllocator;
import org.apache.gobblin.util.request_allocation.HierarchicalAllocator;
import org.apache.gobblin.util.request_allocation.HierarchicalPrioritizer;
import org.apache.gobblin.util.request_allocation.PriorityIterableBasedRequestAllocator;
import org.apache.gobblin.util.request_allocation.RequestAllocator;
import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
import org.apache.gobblin.util.request_allocation.RequestAllocatorUtils;
import org.apache.gobblin.util.request_allocation.ResourceEstimator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CopySource
extends AbstractSource<String, FileAwareInputStream> {
    private static final Logger log = LoggerFactory.getLogger(CopySource.class);
    public static final String DEFAULT_DATASET_PROFILE_CLASS_KEY = CopyableGlobDatasetFinder.class.getCanonicalName();
    public static final String SERIALIZED_COPYABLE_FILE = "gobblin.copy.serialized.copyable.file";
    public static final String COPY_ENTITY_CLASS = "gobblin.copy.copy.entity.class";
    public static final String SERIALIZED_COPYABLE_DATASET = "gobblin.copy.serialized.copyable.datasets";
    public static final String WORK_UNIT_GUID = "gobblin.copy.work.unit.guid";
    public static final String MAX_CONCURRENT_LISTING_SERVICES = "gobblin.copy.max.concurrent.listing.services";
    public static final int DEFAULT_MAX_CONCURRENT_LISTING_SERVICES = 20;
    public static final String MAX_FILES_COPIED_KEY = "gobblin.copy.max.files.copied";
    public static final String SIMULATE = "gobblin.copy.simulate";
    public static final String MAX_SIZE_MULTI_WORKUNITS = "gobblin.copy.binPacking.maxSizePerBin";
    public static final String MAX_WORK_UNITS_PER_BIN = "gobblin.copy.binPacking.maxWorkUnitsPerBin";
    public static final String REQUESTS_EXCEEDING_AVAILABLE_RESOURCE_POOL_EVENT_NAME = "RequestsExceedingAvailableResourcePoolEvent";
    public static final String REQUESTS_DROPPED_EVENT_NAME = "RequestsDroppedEvent";
    public static final String REQUESTS_REJECTED_DUE_TO_INSUFFICIENT_EVICTION_EVENT_NAME = "RequestsRejectedDueToInsufficientEvictionEvent";
    public static final String REQUESTS_REJECTED_WITH_LOW_PRIORITY_EVENT_NAME = "RequestsRejectedWithLowPriorityEvent";
    public static final String FILESET_NAME = "fileset.name";
    public static final String FILESET_TOTAL_ENTITIES = "fileset.total.entities";
    public static final String FILESET_TOTAL_SIZE_IN_BYTES = "fileset.total.size";
    public static final String SCHEMA_CHECK_ENABLED = "shcema.check.enabled";
    public static final String DATASET_STAGING_DIR_PATH = "dataset.staging.dir.path";
    public static final String DATASET_STAGING_PATH = "dataset.staging.path";
    public static final boolean DEFAULT_SCHEMA_CHECK_ENABLED = false;
    private static final String WORK_UNIT_WEIGHT = "gobblin.copy.workUnitWeight";
    private final WorkUnitWeighter weighter = new FieldWeighter("gobblin.copy.workUnitWeight");
    public MetricContext metricContext;
    public EventSubmitter eventSubmitter;
    protected Optional<LineageInfo> lineageInfo;

    public List<WorkUnit> getWorkunits(final SourceState state) {
        this.metricContext = Instrumented.getMetricContext((State)state, CopySource.class);
        this.lineageInfo = LineageInfo.getLineageInfo((SharedResourcesBroker)state.getBroker());
        try {
            DeprecationUtils.renameDeprecatedKeys((State)state, (String)"gobblin.copy.prioritization.maxCopy.copyEntities", (List)Lists.newArrayList((Object[])new String[]{MAX_FILES_COPIED_KEY}));
            FileSystem sourceFs = HadoopUtils.getSourceFileSystem((State)state);
            final FileSystem targetFs = HadoopUtils.getWriterFileSystem((State)state, (int)1, (int)0);
            state.setProp("sourceCluster", (Object)sourceFs.getUri());
            state.setProp("destinationCluster", (Object)targetFs.getUri());
            log.info("Identified source file system at {} and target file system at {}.", (Object)sourceFs.getUri(), (Object)targetFs.getUri());
            long maxSizePerBin = state.getPropAsLong(MAX_SIZE_MULTI_WORKUNITS, 0L);
            long maxWorkUnitsPerMultiWorkUnit = state.getPropAsLong(MAX_WORK_UNITS_PER_BIN, 50L);
            final long minWorkUnitWeight = Math.max(1L, maxSizePerBin / maxWorkUnitsPerMultiWorkUnit);
            final Optional<CopyableFileWatermarkGenerator> watermarkGenerator = CopyableFileWatermarkHelper.getCopyableFileWatermarkGenerator((State)state);
            int maxThreads = state.getPropAsInt(MAX_CONCURRENT_LISTING_SERVICES, 20);
            CopyConfiguration copyConfiguration = CopyConfiguration.builder(targetFs, state.getProperties()).build();
            this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, "gobblin.copy").build();
            DatasetsFinder datasetFinder = DatasetUtils.instantiateDatasetFinder(state.getProperties(), sourceFs, DEFAULT_DATASET_PROFILE_CLASS_KEY, this.eventSubmitter, state);
            IterableDatasetFinder iterableDatasetFinder = datasetFinder instanceof IterableDatasetFinder ? (IterableDatasetFinder)datasetFinder : new IterableDatasetFinderImpl(datasetFinder);
            Iterator requestorIteratorWithNulls = Iterators.transform((Iterator)iterableDatasetFinder.getDatasetsIterator(), (Function)new CopyableDatasetRequestor.Factory(targetFs, copyConfiguration, log));
            UnmodifiableIterator requestorIterator = Iterators.filter((Iterator)requestorIteratorWithNulls, (Predicate)Predicates.notNull());
            final SetMultimap workUnitsMap = Multimaps.synchronizedSetMultimap((SetMultimap)HashMultimap.create());
            RequestAllocator<FileSet<CopyEntity>> allocator = this.createRequestAllocator(copyConfiguration, maxThreads);
            AllocatedRequestsIterator prioritizedFileSets = allocator.allocateRequests((Iterator)requestorIterator, copyConfiguration.getMaxToCopy());
            this.submitUnfulfilledRequestEvents(allocator);
            final String filesetWuGeneratorAlias = state.getProp("copy.source.fileset.wu.generator.class", FileSetWorkUnitGenerator.class.getName());
            Iterator callableIterator = Iterators.transform((Iterator)prioritizedFileSets, (Function)new Function<FileSet<CopyEntity>, Callable<Void>>(){

                @Nullable
                public Callable<Void> apply(FileSet<CopyEntity> input) {
                    try {
                        return (Callable)GobblinConstructorUtils.invokeLongestConstructor((Class)new ClassAliasResolver(FileSetWorkUnitGenerator.class).resolveClass(filesetWuGeneratorAlias), (Object[])new Object[]{input.getDataset(), input, state, targetFs, workUnitsMap, watermarkGenerator, minWorkUnitWeight, CopySource.this.lineageInfo});
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Cannot create workunits generator", e);
                    }
                }
            });
            try {
                List futures = new IteratorExecutor(callableIterator, maxThreads, ExecutorsUtils.newDaemonThreadFactory((Optional)Optional.of((Object)log), (Optional)Optional.of((Object)"Copy-file-listing-pool-%d"))).execute();
                for (Future future : futures) {
                    try {
                        future.get();
                    }
                    catch (ExecutionException exc) {
                        log.error("Failed to get work units for dataset.", exc.getCause());
                    }
                }
            }
            catch (InterruptedException ie) {
                log.error("Retrieval of work units was interrupted. Aborting.");
                return Lists.newArrayList();
            }
            log.info(String.format("Created %s workunits ", workUnitsMap.size()));
            copyConfiguration.getCopyContext().logCacheStatistics();
            if (state.contains(SIMULATE) && state.getPropAsBoolean(SIMULATE)) {
                log.info("Simulate mode enabled. Will not execute the copy.");
                for (Map.Entry entry : workUnitsMap.asMap().entrySet()) {
                    log.info(String.format("Actions for dataset %s file set %s.", ((FileSet)entry.getKey()).getDataset().datasetURN(), ((FileSet)entry.getKey()).getName()));
                    for (WorkUnit workUnit : (Collection)entry.getValue()) {
                        try {
                            CopyEntity copyEntity = CopySource.deserializeCopyEntity((State)workUnit);
                            log.info(copyEntity.explain());
                        }
                        catch (Exception e) {
                            log.info("Cannot deserialize CopyEntity from wu : {}", (Object)workUnit.toString());
                        }
                    }
                }
                return Lists.newArrayList();
            }
            List workUnits = new WorstFitDecreasingBinPacking(maxSizePerBin).pack((List)Lists.newArrayList((Iterable)workUnitsMap.values()), this.weighter);
            log.info(String.format("Bin packed work units. Initial work units: %d, packed work units: %d, max weight per bin: %d, max work units per bin: %d.", workUnitsMap.size(), workUnits.size(), maxSizePerBin, maxWorkUnitsPerMultiWorkUnit));
            return ImmutableList.copyOf((Collection)workUnits);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void submitUnfulfilledRequestEventsHelper(List<FileSet<CopyEntity>> fileSetList, String eventName) {
        for (FileSet<CopyEntity> fileSet : fileSetList) {
            GobblinTrackingEvent event = GobblinTrackingEvent.newBuilder().setName(eventName).setNamespace(CopySource.class.getName()).setMetadata((Map)ImmutableMap.builder().put((Object)"dataset.urn", (Object)fileSet.getDataset().getUrn()).put((Object)FILESET_TOTAL_ENTITIES, (Object)Integer.toString(fileSet.getTotalEntities())).put((Object)FILESET_TOTAL_SIZE_IN_BYTES, (Object)Long.toString(fileSet.getTotalSizeInBytes())).put((Object)FILESET_NAME, (Object)fileSet.getName()).build()).build();
            this.metricContext.submitEvent(event);
        }
    }

    private void submitUnfulfilledRequestEvents(RequestAllocator<FileSet<CopyEntity>> allocator) {
        if (PriorityIterableBasedRequestAllocator.class.isAssignableFrom(allocator.getClass())) {
            PriorityIterableBasedRequestAllocator priorityIterableBasedRequestAllocator = (PriorityIterableBasedRequestAllocator)allocator;
            this.submitUnfulfilledRequestEventsHelper(priorityIterableBasedRequestAllocator.getRequestsExceedingAvailableResourcePool(), REQUESTS_EXCEEDING_AVAILABLE_RESOURCE_POOL_EVENT_NAME);
            this.submitUnfulfilledRequestEventsHelper(priorityIterableBasedRequestAllocator.getRequestsRejectedDueToInsufficientEviction(), REQUESTS_REJECTED_DUE_TO_INSUFFICIENT_EVICTION_EVENT_NAME);
            this.submitUnfulfilledRequestEventsHelper(priorityIterableBasedRequestAllocator.getRequestsRejectedWithLowPriority(), REQUESTS_REJECTED_WITH_LOW_PRIORITY_EVENT_NAME);
            this.submitUnfulfilledRequestEventsHelper(priorityIterableBasedRequestAllocator.getRequestsDropped(), REQUESTS_DROPPED_EVENT_NAME);
        }
    }

    private RequestAllocator<FileSet<CopyEntity>> createRequestAllocator(CopyConfiguration copyConfiguration, int maxThreads) {
        Optional<FileSetComparator> prioritizer = copyConfiguration.getPrioritizer();
        RequestAllocatorConfig.Builder configBuilder = RequestAllocatorConfig.builder((ResourceEstimator)new FileSetResourceEstimator()).allowParallelization(maxThreads).storeRejectedRequests(copyConfiguration.getStoreRejectedRequestsSetting()).withLimitedScopeConfig(copyConfiguration.getPrioritizationConfig());
        if (!prioritizer.isPresent()) {
            return new GreedyAllocator(configBuilder.build());
        }
        configBuilder.withPrioritizer((Comparator)prioritizer.get());
        if (prioritizer.get() instanceof HierarchicalPrioritizer) {
            return new HierarchicalAllocator.Factory().createRequestAllocator(configBuilder.build());
        }
        return RequestAllocatorUtils.inferFromConfig((RequestAllocatorConfig)configBuilder.build());
    }

    public Extractor<String, FileAwareInputStream> getExtractor(WorkUnitState state) throws IOException {
        Class<?> copyEntityClass = CopySource.getCopyEntityClass((State)state);
        if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
            CopyableFile copyEntity = (CopyableFile)CopySource.deserializeCopyEntity((State)state);
            return this.extractorForCopyableFile(HadoopUtils.getSourceFileSystem((State)state), copyEntity, state);
        }
        return new EmptyExtractor<String, FileAwareInputStream>("empty");
    }

    protected Extractor<String, FileAwareInputStream> extractorForCopyableFile(FileSystem fs, CopyableFile cf, WorkUnitState state) throws IOException {
        return new FileAwareInputStreamExtractor(fs, cf, state);
    }

    public void shutdown(SourceState state) {
    }

    @Deprecated
    protected FileSystem getSourceFileSystem(State state) throws IOException {
        Configuration conf = HadoopUtils.getConfFromState((State)state, (Optional)Optional.of((Object)"source.filebased.encrypted"));
        String uri = state.getProp("source.filebased.fs.uri", "file:///");
        return HadoopUtils.getOptionallyThrottledFileSystem((FileSystem)FileSystem.get((URI)URI.create(uri), (Configuration)conf), (State)state);
    }

    @Deprecated
    private static FileSystem getTargetFileSystem(State state) throws IOException {
        return HadoopUtils.getOptionallyThrottledFileSystem((FileSystem)WriterUtils.getWriterFS((State)state, (int)1, (int)0), (State)state);
    }

    private static void setWorkUnitWeight(WorkUnit workUnit, CopyEntity copyEntity, long minWeight) {
        long weight = 0L;
        if (copyEntity instanceof CopyableFile) {
            weight = ((CopyableFile)copyEntity).getOrigin().getLen();
        }
        weight = Math.max(weight, minWeight);
        workUnit.setProp(WORK_UNIT_WEIGHT, (Object)Long.toString(weight));
    }

    private static void computeAndSetWorkUnitGuid(WorkUnit workUnit) throws IOException {
        Guid guid = Guid.fromStrings((String[])new String[]{workUnit.contains("converter.classes") ? workUnit.getProp("converter.classes") : ""});
        CopySource.setWorkUnitGuid((State)workUnit, guid.append(new HasGuid[]{CopySource.deserializeCopyEntity((State)workUnit)}));
    }

    public static void setWorkUnitGuid(State state, Guid guid) {
        state.setProp(WORK_UNIT_GUID, (Object)guid.toString());
    }

    public static Optional<Guid> getWorkUnitGuid(State state) throws IOException {
        if (state.contains(WORK_UNIT_GUID)) {
            return Optional.of((Object)Guid.deserialize((String)state.getProp(WORK_UNIT_GUID)));
        }
        return Optional.absent();
    }

    public static void serializeCopyEntity(State state, CopyEntity copyEntity) {
        state.setProp(SERIALIZED_COPYABLE_FILE, (Object)CopyEntity.serialize(copyEntity));
        state.setProp(COPY_ENTITY_CLASS, (Object)copyEntity.getClass().getName());
    }

    public static Class<?> getCopyEntityClass(State state) throws IOException {
        try {
            return Class.forName(state.getProp(COPY_ENTITY_CLASS));
        }
        catch (ClassNotFoundException cnfe) {
            throw new IOException(cnfe);
        }
    }

    public static CopyEntity deserializeCopyEntity(State state) {
        return CopyEntity.deserialize(state.getProp(SERIALIZED_COPYABLE_FILE));
    }

    public static void serializeCopyableDataset(State state, CopyableDatasetMetadata copyableDataset) {
        state.setProp(SERIALIZED_COPYABLE_DATASET, (Object)copyableDataset.serialize());
    }

    public static CopyableDatasetMetadata deserializeCopyableDataset(State state) {
        return CopyableDatasetMetadata.deserialize(state.getProp(SERIALIZED_COPYABLE_DATASET));
    }

    @Alias(value="FileSetWorkUnitGenerator")
    public static class FileSetWorkUnitGenerator
    implements Callable<Void> {
        protected final CopyableDatasetBase copyableDataset;
        protected final FileSet<CopyEntity> fileSet;
        protected final State state;
        protected final FileSystem targetFs;
        protected final SetMultimap<FileSet<CopyEntity>, WorkUnit> workUnitList;
        protected final Optional<CopyableFileWatermarkGenerator> watermarkGenerator;
        protected final long minWorkUnitWeight;
        protected final Optional<LineageInfo> lineageInfo;

        @Override
        public Void call() {
            try {
                String extractId = this.fileSet.getName().replace(':', '_');
                Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, "gobblin.copy", extractId);
                ArrayList workUnitsForPartition = Lists.newArrayList();
                for (CopyEntity copyEntity : this.fileSet.getFiles()) {
                    CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(this.copyableDataset);
                    CopyEntity.DatasetAndPartition datasetAndPartition = copyEntity.getDatasetAndPartition(metadata);
                    WorkUnit workUnit = new WorkUnit(extract);
                    workUnit.addAll(this.state);
                    if (this.copyableDataset instanceof ConfigBasedDataset && ((ConfigBasedDataset)this.copyableDataset).schemaCheckEnabled()) {
                        workUnit.setProp(CopySource.SCHEMA_CHECK_ENABLED, (Object)true);
                        if (((ConfigBasedDataset)this.copyableDataset).getExpectedSchema() != null) {
                            workUnit.setProp("gobblin.copy.expectedSchema", (Object)((ConfigBasedDataset)this.copyableDataset).getExpectedSchema());
                        }
                    }
                    if (this.copyableDataset instanceof HiveDataset && this.state.getPropAsBoolean("dataset.staging.dir.used", false)) {
                        workUnit.setProp(CopySource.DATASET_STAGING_DIR_PATH, (Object)((HiveDataset)this.copyableDataset).getProperties().getProperty(CopySource.DATASET_STAGING_PATH));
                    }
                    CopySource.serializeCopyEntity((State)workUnit, copyEntity);
                    CopySource.serializeCopyableDataset((State)workUnit, metadata);
                    GobblinMetrics.addCustomTagToState((State)workUnit, (Tag)new Tag("datasetUrn", (Object)this.copyableDataset.datasetURN()));
                    workUnit.setProp("dataset.urn", (Object)datasetAndPartition.toString());
                    workUnit.setProp("event.sla.datasetUrn", (Object)this.copyableDataset.datasetURN());
                    workUnit.setProp("event.sla.partition", (Object)copyEntity.getFileSet());
                    CopySource.setWorkUnitWeight(workUnit, copyEntity, this.minWorkUnitWeight);
                    this.setWorkUnitWatermark(workUnit, this.watermarkGenerator, copyEntity);
                    CopySource.computeAndSetWorkUnitGuid(workUnit);
                    this.addLineageInfo(copyEntity, workUnit);
                    if (copyEntity instanceof CopyableFile && DistcpFileSplitter.allowSplit(this.state, this.targetFs)) {
                        workUnitsForPartition.addAll(DistcpFileSplitter.splitFile((CopyableFile)copyEntity, workUnit, this.targetFs));
                        continue;
                    }
                    workUnitsForPartition.add(workUnit);
                }
                this.workUnitList.putAll(this.fileSet, (Iterable)workUnitsForPartition);
                return null;
            }
            catch (IOException ioe) {
                throw new RuntimeException("Failed to generate work units for dataset " + this.copyableDataset.datasetURN(), ioe);
            }
        }

        private void setWorkUnitWatermark(WorkUnit workUnit, Optional<CopyableFileWatermarkGenerator> watermarkGenerator, CopyEntity copyEntity) throws IOException {
            Optional<WatermarkInterval> watermarkIntervalOptional;
            if (copyEntity instanceof CopyableFile && (watermarkIntervalOptional = CopyableFileWatermarkHelper.getCopyableFileWatermark((CopyableFile)copyEntity, watermarkGenerator)).isPresent()) {
                workUnit.setWatermarkInterval((WatermarkInterval)watermarkIntervalOptional.get());
            }
        }

        private void addLineageInfo(CopyEntity copyEntity, WorkUnit workUnit) {
            if (copyEntity instanceof CopyableFile) {
                CopyableFile copyableFile = (CopyableFile)copyEntity;
                if (this.lineageInfo.isPresent() && copyableFile.getSourceData() != null && copyableFile.getDestinationData() != null) {
                    ((LineageInfo)this.lineageInfo.get()).setSource(copyableFile.getSourceData(), (State)workUnit);
                }
            }
        }

        @ConstructorProperties(value={"copyableDataset", "fileSet", "state", "targetFs", "workUnitList", "watermarkGenerator", "minWorkUnitWeight", "lineageInfo"})
        public FileSetWorkUnitGenerator(CopyableDatasetBase copyableDataset, FileSet<CopyEntity> fileSet, State state, FileSystem targetFs, SetMultimap<FileSet<CopyEntity>, WorkUnit> workUnitList, Optional<CopyableFileWatermarkGenerator> watermarkGenerator, long minWorkUnitWeight, Optional<LineageInfo> lineageInfo) {
            this.copyableDataset = copyableDataset;
            this.fileSet = fileSet;
            this.state = state;
            this.targetFs = targetFs;
            this.workUnitList = workUnitList;
            this.watermarkGenerator = watermarkGenerator;
            this.minWorkUnitWeight = minWorkUnitWeight;
            this.lineageInfo = lineageInfo;
        }
    }
}

