/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.data.management.source;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.PeekingIterator;
import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Spliterator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.data.management.source.DatasetFinderSource;
import org.apache.gobblin.dataset.Dataset;
import org.apache.gobblin.dataset.IterableDatasetFinder;
import org.apache.gobblin.dataset.PartitionableDataset;
import org.apache.gobblin.dataset.URNIdentified;
import org.apache.gobblin.dataset.comparators.URNLexicographicalComparator;
import org.apache.gobblin.runtime.task.NoopTask;
import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class LoopingDatasetFinderSource<S, D>
extends DatasetFinderSource<S, D> {
    private static final Logger log = LoggerFactory.getLogger(LoopingDatasetFinderSource.class);
    public static final String MAX_WORK_UNITS_PER_RUN_KEY = "gobblin.source.loopingDatasetFinderSource.maxWorkUnitsPerRun";
    public static final int MAX_WORK_UNITS_PER_RUN = 10;
    public static final String DATASET_PARTITION_DELIMITER = "@";
    protected static final String DATASET_URN = "gobblin.source.loopingDatasetFinderSource.datasetUrn";
    protected static final String PARTITION_URN = "gobblin.source.loopingDatasetFinderSource.partitionUrn";
    protected static final String END_OF_DATASETS_KEY = "gobblin.source.loopingDatasetFinderSource.endOfDatasets";
    protected static final String GLOBAL_WATERMARK_DATASET_KEY = "gobblin.source.loopingDatasetFinderSource.globalWatermarkDataset";
    private final URNLexicographicalComparator lexicographicalComparator = new URNLexicographicalComparator();
    protected boolean isDatasetStateStoreEnabled;

    public LoopingDatasetFinderSource(boolean drilldownIntoPartitions) {
        super(drilldownIntoPartitions);
    }

    @Override
    public List<WorkUnit> getWorkunits(SourceState state) {
        return Lists.newArrayList((Iterable)this.getWorkunitStream(state).getMaterializedWorkUnitCollection());
    }

    @Override
    public WorkUnitStream getWorkunitStream(SourceState state) {
        return this.getWorkunitStream(state, false);
    }

    public WorkUnitStream getWorkunitStream(SourceState state, boolean isDatasetStateStoreEnabled) {
        this.isDatasetStateStoreEnabled = isDatasetStateStoreEnabled;
        try {
            int maximumWorkUnits = state.getPropAsInt(MAX_WORK_UNITS_PER_RUN_KEY, 10);
            Preconditions.checkArgument((maximumWorkUnits > 0 ? 1 : 0) != 0, (Object)"Max work units must be greater than 0!");
            List previousWorkUnitStates = this.isDatasetStateStoreEnabled ? state.getPreviousWorkUnitStates("__globalDatasetWatermark") : state.getPreviousWorkUnitStates();
            Optional<Object> maxWorkUnit = Optional.empty();
            for (WorkUnitState workUnitState : previousWorkUnitStates) {
                if (!workUnitState.getPropAsBoolean(GLOBAL_WATERMARK_DATASET_KEY, false)) continue;
                maxWorkUnit = Optional.of(workUnitState);
                break;
            }
            IterableDatasetFinder datasetsFinder = this.createDatasetsFinder(state);
            Stream datasetStream = datasetsFinder.getDatasetsStream(4, (Comparator)this.lexicographicalComparator);
            datasetStream = this.sortStreamLexicographically(datasetStream);
            String previousDatasetUrnWatermark = null;
            String previousPartitionUrnWatermark = null;
            if (maxWorkUnit.isPresent() && !((WorkUnitState)maxWorkUnit.get()).getPropAsBoolean(END_OF_DATASETS_KEY, false)) {
                previousDatasetUrnWatermark = ((WorkUnitState)maxWorkUnit.get()).getProp(DATASET_URN);
                previousPartitionUrnWatermark = ((WorkUnitState)maxWorkUnit.get()).getProp(PARTITION_URN);
            }
            return new BasicWorkUnitStream.Builder(this.getWorkUnitIterator(datasetStream.iterator(), previousDatasetUrnWatermark, previousPartitionUrnWatermark, maximumWorkUnits)).setFiniteStream(true).build();
        }
        catch (IOException ioe) {
            throw new RuntimeException(ioe);
        }
    }

    protected Iterator<WorkUnit> getWorkUnitIterator(Iterator<Dataset> datasetIterator, String previousDatasetUrnWatermark, @Nullable String previousPartitionUrnWatermark, int maximumWorkUnits) throws IOException {
        return new DeepIterator(datasetIterator, previousDatasetUrnWatermark, previousPartitionUrnWatermark, maximumWorkUnits);
    }

    private <T extends URNIdentified> Stream<T> sortStreamLexicographically(Stream<T> inputStream) {
        Spliterator spliterator = inputStream.spliterator();
        if (spliterator.hasCharacteristics(4) && spliterator.getComparator().equals(this.lexicographicalComparator)) {
            return StreamSupport.stream(spliterator, false);
        }
        return StreamSupport.stream(spliterator, false).sorted(this.lexicographicalComparator);
    }

    protected class DeepIterator
    extends AbstractIterator<WorkUnit> {
        protected final Iterator<Dataset> baseIterator;
        protected final int maxWorkUnits;
        protected int generatedWorkUnits = 0;
        protected Dataset previousDataset;
        private Iterator<PartitionableDataset.DatasetPartition> currentPartitionIterator;
        private PartitionableDataset.DatasetPartition previousPartition;

        public DeepIterator(Iterator<Dataset> baseIterator, String previousDatasetUrnWatermark, String previousPartitionUrnWatermark, int maxWorkUnits) throws IOException {
            this.maxWorkUnits = maxWorkUnits;
            this.baseIterator = baseIterator;
            Dataset equalDataset = (Dataset)this.advanceUntilLargerThan(Iterators.peekingIterator(this.baseIterator), previousDatasetUrnWatermark);
            if (LoopingDatasetFinderSource.this.drilldownIntoPartitions && equalDataset != null && equalDataset instanceof PartitionableDataset) {
                this.currentPartitionIterator = this.getPartitionIterator((PartitionableDataset)equalDataset);
                this.advanceUntilLargerThan(Iterators.peekingIterator(this.currentPartitionIterator), previousPartitionUrnWatermark);
            } else {
                this.currentPartitionIterator = Iterators.emptyIterator();
            }
        }

        @Nullable
        private <T extends URNIdentified> T advanceUntilLargerThan(PeekingIterator<T> it, String reference) {
            if (reference == null) {
                return null;
            }
            int comparisonResult = -1;
            while (it.hasNext() && (comparisonResult = LoopingDatasetFinderSource.this.lexicographicalComparator.compare((URNIdentified)it.peek(), reference)) < 0) {
                it.next();
            }
            return (T)(comparisonResult == 0 ? (URNIdentified)it.next() : null);
        }

        private Iterator<PartitionableDataset.DatasetPartition> getPartitionIterator(PartitionableDataset dataset) {
            try {
                this.currentPartitionIterator = LoopingDatasetFinderSource.this.sortStreamLexicographically(dataset.getPartitions(4, (Comparator)LoopingDatasetFinderSource.this.lexicographicalComparator)).iterator();
                return this.currentPartitionIterator;
            }
            catch (IOException ioe) {
                log.error("Failed to get partitions for dataset " + dataset.getUrn());
                return Iterators.emptyIterator();
            }
        }

        protected WorkUnit computeNext() {
            if (this.generatedWorkUnits == this.maxWorkUnits) {
                ++this.generatedWorkUnits;
                return this.generateNoopWorkUnit();
            }
            if (this.generatedWorkUnits > this.maxWorkUnits) {
                return (WorkUnit)this.endOfData();
            }
            WorkUnit resultWU = this.doComputeNext();
            if (resultWU == null) {
                resultWU = this.generateNoopWorkUnit();
                this.generatedWorkUnits = Integer.MAX_VALUE;
                resultWU.setProp(LoopingDatasetFinderSource.END_OF_DATASETS_KEY, (Object)true);
            }
            return resultWU;
        }

        protected WorkUnit doComputeNext() {
            while (this.baseIterator.hasNext() || this.currentPartitionIterator.hasNext()) {
                WorkUnit workUnit;
                if (this.currentPartitionIterator != null && this.currentPartitionIterator.hasNext()) {
                    PartitionableDataset.DatasetPartition partition = this.currentPartitionIterator.next();
                    workUnit = LoopingDatasetFinderSource.this.workUnitForDatasetPartition(partition);
                    if (workUnit == null) continue;
                    this.addDatasetInfoToWorkUnit(workUnit, partition.getDataset());
                    this.addPartitionInfoToWorkUnit(workUnit, partition);
                    this.previousDataset = partition.getDataset();
                    this.previousPartition = partition;
                    ++this.generatedWorkUnits;
                    return workUnit;
                }
                Dataset dataset = this.baseIterator.next();
                if (LoopingDatasetFinderSource.this.drilldownIntoPartitions && dataset instanceof PartitionableDataset) {
                    this.currentPartitionIterator = this.getPartitionIterator((PartitionableDataset)dataset);
                    continue;
                }
                workUnit = LoopingDatasetFinderSource.this.workUnitForDataset(dataset);
                if (workUnit == null) continue;
                this.addDatasetInfoToWorkUnit(workUnit, dataset);
                this.previousDataset = dataset;
                ++this.generatedWorkUnits;
                return workUnit;
            }
            return null;
        }

        protected void addDatasetInfoToWorkUnit(WorkUnit workUnit, Dataset dataset) {
            if (LoopingDatasetFinderSource.this.isDatasetStateStoreEnabled) {
                workUnit.setProp("dataset.urn", (Object)dataset.getUrn());
            }
        }

        private void addPartitionInfoToWorkUnit(WorkUnit workUnit, PartitionableDataset.DatasetPartition partition) {
            if (LoopingDatasetFinderSource.this.isDatasetStateStoreEnabled) {
                workUnit.setProp("dataset.urn", (Object)Joiner.on((String)LoopingDatasetFinderSource.DATASET_PARTITION_DELIMITER).join((Object)partition.getDataset().getUrn(), (Object)partition.getUrn(), new Object[0]));
            }
        }

        private WorkUnit generateNoopWorkUnit() {
            WorkUnit workUnit = NoopTask.noopWorkunit();
            workUnit.setProp(LoopingDatasetFinderSource.GLOBAL_WATERMARK_DATASET_KEY, (Object)true);
            if (this.previousDataset != null) {
                workUnit.setProp(LoopingDatasetFinderSource.DATASET_URN, (Object)this.previousDataset.getUrn());
            }
            if (LoopingDatasetFinderSource.this.drilldownIntoPartitions && this.previousPartition != null) {
                workUnit.setProp(LoopingDatasetFinderSource.PARTITION_URN, (Object)this.previousPartition.getUrn());
            }
            if (LoopingDatasetFinderSource.this.isDatasetStateStoreEnabled) {
                workUnit.setProp("dataset.urn", (Object)"__globalDatasetWatermark");
            }
            return workUnit;
        }
    }
}

