/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.data.management.conversion.hive.watermarker;

import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarker;
import org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarkerFactory;
import org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableLevelWatermarker
implements HiveSourceWatermarker {
    private static final Logger log = LoggerFactory.getLogger(TableLevelWatermarker.class);
    public static final Gson GSON = new Gson();
    protected Map<String, LongWatermark> tableWatermarks = Maps.newHashMap();

    public TableLevelWatermarker(State state) {
        if (state instanceof SourceState) {
            SourceState sourceState = (SourceState)state;
            for (Map.Entry datasetWorkUnitStates : sourceState.getPreviousWorkUnitStatesByDatasetUrns().entrySet()) {
                ImmutableList previousWatermarks = FluentIterable.from((Iterable)((Iterable)datasetWorkUnitStates.getValue())).filter(Predicates.not(PartitionLevelWatermarker.WATERMARK_WORKUNIT_PREDICATE)).transform((Function)new Function<WorkUnitState, LongWatermark>(){

                    public LongWatermark apply(WorkUnitState w) {
                        return (LongWatermark)w.getActualHighWatermark(LongWatermark.class);
                    }
                }).toList();
                if (previousWatermarks.isEmpty()) continue;
                this.tableWatermarks.put((String)datasetWorkUnitStates.getKey(), (LongWatermark)Collections.min(previousWatermarks));
            }
            log.debug("Loaded table watermarks from previous state " + this.tableWatermarks);
        }
    }

    @Override
    public LongWatermark getPreviousHighWatermark(Table table) {
        if (this.tableWatermarks.containsKey(table.getCompleteName())) {
            return this.tableWatermarks.get(table.getCompleteName());
        }
        return new LongWatermark(0L);
    }

    @Override
    public LongWatermark getPreviousHighWatermark(Partition partition) {
        return this.getPreviousHighWatermark(partition.getTable());
    }

    @Override
    public void onTableProcessBegin(Table table, long tableProcessTime) {
    }

    @Override
    public void onPartitionProcessBegin(Partition partition, long partitionProcessTime, long partitionUpdateTime) {
    }

    @Override
    public void onGetWorkunitsEnd(List<WorkUnit> workunits) {
    }

    @Override
    public LongWatermark getExpectedHighWatermark(Table table, long tableProcessTime) {
        return new LongWatermark(tableProcessTime);
    }

    @Override
    public LongWatermark getExpectedHighWatermark(Partition partition, long tableProcessTime, long partitionProcessTime) {
        return this.getExpectedHighWatermark(partition.getTable(), tableProcessTime);
    }

    @Override
    public void setActualHighWatermark(WorkUnitState wus) {
        wus.setActualHighWatermark(wus.getWorkunit().getExpectedHighWatermark(LongWatermark.class));
    }

    public static class Factory
    implements HiveSourceWatermarkerFactory {
        @Override
        public TableLevelWatermarker createFromState(State state) {
            return new TableLevelWatermarker(state);
        }
    }
}

